diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index ac80f5ae8d7..5f46e22ece7 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -68,9 +68,8 @@ var ( noDiskConflictSet = sets.NewString("NoDiskConflict") ) -// ConfigFactory is the default implementation of the scheduler.Configurator interface. -// TODO make this private if possible, so that only its interface is externally used. -type ConfigFactory struct { +// configFactory is the default implementation of the scheduler.Configurator interface. +type configFactory struct { client clientset.Interface // queue for pods that need scheduling podQueue *cache.FIFO @@ -135,7 +134,7 @@ func NewConfigFactory( stopEverything := make(chan struct{}) schedulerCache := schedulercache.New(30*time.Second, stopEverything) - c := &ConfigFactory{ + c := &configFactory{ client: client, podLister: schedulerCache, podQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc), @@ -259,7 +258,7 @@ func NewConfigFactory( return c } -func (c *ConfigFactory) onPvAdd(obj interface{}) { +func (c *configFactory) onPvAdd(obj interface{}) { if c.enableEquivalenceClassCache { pv, ok := obj.(*v1.PersistentVolume) if !ok { @@ -270,7 +269,7 @@ func (c *ConfigFactory) onPvAdd(obj interface{}) { } } -func (c *ConfigFactory) onPvDelete(obj interface{}) { +func (c *configFactory) onPvDelete(obj interface{}) { if c.enableEquivalenceClassCache { var pv *v1.PersistentVolume switch t := obj.(type) { @@ -291,7 +290,7 @@ func (c *ConfigFactory) onPvDelete(obj interface{}) { } } -func (c *ConfigFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) { +func (c *configFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) { invalidPredicates := sets.NewString("MaxPDVolumeCountPredicate") if pv.Spec.AWSElasticBlockStore != nil { invalidPredicates.Insert("MaxEBSVolumeCount") @@ -305,7 +304,7 @@ func (c *ConfigFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) { c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) } -func (c *ConfigFactory) onPvcAdd(obj interface{}) { +func (c *configFactory) onPvcAdd(obj interface{}) { if c.enableEquivalenceClassCache { pvc, ok := obj.(*v1.PersistentVolumeClaim) if !ok { @@ -316,7 +315,7 @@ func (c *ConfigFactory) onPvcAdd(obj interface{}) { } } -func (c *ConfigFactory) onPvcDelete(obj interface{}) { +func (c *configFactory) onPvcDelete(obj interface{}) { if c.enableEquivalenceClassCache { var pvc *v1.PersistentVolumeClaim switch t := obj.(type) { @@ -337,19 +336,19 @@ func (c *ConfigFactory) onPvcDelete(obj interface{}) { } } -func (c *ConfigFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim) { +func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim) { if pvc.Spec.VolumeName != "" { c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(maxPDVolumeCountPredicateSet) } } -func (c *ConfigFactory) onServiceAdd(obj interface{}) { +func (c *configFactory) onServiceAdd(obj interface{}) { if c.enableEquivalenceClassCache { c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet) } } -func (c *ConfigFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) { +func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) { if c.enableEquivalenceClassCache { // TODO(resouer) We may need to invalidate this for specified group of pods only oldService := oldObj.(*v1.Service) @@ -360,36 +359,36 @@ func (c *ConfigFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) } } -func (c *ConfigFactory) onServiceDelete(obj interface{}) { +func (c *configFactory) onServiceDelete(obj interface{}) { if c.enableEquivalenceClassCache { c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet) } } // GetNodeStore provides the cache to the nodes, mostly internal use, but may also be called by mock-tests. -func (c *ConfigFactory) GetNodeLister() corelisters.NodeLister { +func (c *configFactory) GetNodeLister() corelisters.NodeLister { return c.nodeLister } -func (c *ConfigFactory) GetHardPodAffinitySymmetricWeight() int { +func (c *configFactory) GetHardPodAffinitySymmetricWeight() int { return c.hardPodAffinitySymmetricWeight } -func (f *ConfigFactory) GetSchedulerName() string { +func (f *configFactory) GetSchedulerName() string { return f.schedulerName } // GetClient provides a kubernetes client, mostly internal use, but may also be called by mock-tests. -func (f *ConfigFactory) GetClient() clientset.Interface { +func (f *configFactory) GetClient() clientset.Interface { return f.client } // GetScheduledPodListerIndexer provides a pod lister, mostly internal use, but may also be called by mock-tests. -func (c *ConfigFactory) GetScheduledPodLister() corelisters.PodLister { +func (c *configFactory) GetScheduledPodLister() corelisters.PodLister { return c.scheduledPodLister } -func (c *ConfigFactory) addPodToCache(obj interface{}) { +func (c *configFactory) addPodToCache(obj interface{}) { pod, ok := obj.(*v1.Pod) if !ok { glog.Errorf("cannot convert to *v1.Pod: %v", obj) @@ -403,7 +402,7 @@ func (c *ConfigFactory) addPodToCache(obj interface{}) { // handled optimistically in InvalidateCachedPredicateItemForPodAdd. } -func (c *ConfigFactory) updatePodInCache(oldObj, newObj interface{}) { +func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) { oldPod, ok := oldObj.(*v1.Pod) if !ok { glog.Errorf("cannot convert oldObj to *v1.Pod: %v", oldObj) @@ -422,7 +421,7 @@ func (c *ConfigFactory) updatePodInCache(oldObj, newObj interface{}) { c.invalidateCachedPredicatesOnUpdatePod(newPod, oldPod) } -func (c *ConfigFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, oldPod *v1.Pod) { +func (c *configFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, oldPod *v1.Pod) { if c.enableEquivalenceClassCache { // if the pod does not have binded node, updating equivalence cache is meaningless; // if pod's binded node has been changed, that case should be handled by pod add & delete. @@ -443,7 +442,7 @@ func (c *ConfigFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, ol } } -func (c *ConfigFactory) deletePodFromCache(obj interface{}) { +func (c *configFactory) deletePodFromCache(obj interface{}) { var pod *v1.Pod switch t := obj.(type) { case *v1.Pod: @@ -466,7 +465,7 @@ func (c *ConfigFactory) deletePodFromCache(obj interface{}) { c.invalidateCachedPredicatesOnDeletePod(pod) } -func (c *ConfigFactory) invalidateCachedPredicatesOnDeletePod(pod *v1.Pod) { +func (c *configFactory) invalidateCachedPredicatesOnDeletePod(pod *v1.Pod) { if c.enableEquivalenceClassCache { // part of this case is the same as pod add. c.equivalencePodCache.InvalidateCachedPredicateItemForPodAdd(pod, pod.Spec.NodeName) @@ -487,7 +486,7 @@ func (c *ConfigFactory) invalidateCachedPredicatesOnDeletePod(pod *v1.Pod) { } } -func (c *ConfigFactory) addNodeToCache(obj interface{}) { +func (c *configFactory) addNodeToCache(obj interface{}) { node, ok := obj.(*v1.Node) if !ok { glog.Errorf("cannot convert to *v1.Node: %v", obj) @@ -501,7 +500,7 @@ func (c *ConfigFactory) addNodeToCache(obj interface{}) { // NOTE: add a new node does not affect existing predicates in equivalence cache } -func (c *ConfigFactory) updateNodeInCache(oldObj, newObj interface{}) { +func (c *configFactory) updateNodeInCache(oldObj, newObj interface{}) { oldNode, ok := oldObj.(*v1.Node) if !ok { glog.Errorf("cannot convert oldObj to *v1.Node: %v", oldObj) @@ -520,7 +519,7 @@ func (c *ConfigFactory) updateNodeInCache(oldObj, newObj interface{}) { c.invalidateCachedPredicatesOnNodeUpdate(newNode, oldNode) } -func (c *ConfigFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node, oldNode *v1.Node) { +func (c *configFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node, oldNode *v1.Node) { if c.enableEquivalenceClassCache { // Begin to update equivalence cache based on node update // TODO(resouer): think about lazily initialize this set @@ -582,7 +581,7 @@ func (c *ConfigFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node, } } -func (c *ConfigFactory) deleteNodeFromCache(obj interface{}) { +func (c *configFactory) deleteNodeFromCache(obj interface{}) { var node *v1.Node switch t := obj.(type) { case *v1.Node: @@ -607,12 +606,12 @@ func (c *ConfigFactory) deleteNodeFromCache(obj interface{}) { } // Create creates a scheduler with the default algorithm provider. -func (f *ConfigFactory) Create() (*scheduler.Config, error) { +func (f *configFactory) Create() (*scheduler.Config, error) { return f.CreateFromProvider(DefaultProvider) } // Creates a scheduler from the name of a registered algorithm provider. -func (f *ConfigFactory) CreateFromProvider(providerName string) (*scheduler.Config, error) { +func (f *configFactory) CreateFromProvider(providerName string) (*scheduler.Config, error) { glog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName) provider, err := GetAlgorithmProvider(providerName) if err != nil { @@ -623,7 +622,7 @@ func (f *ConfigFactory) CreateFromProvider(providerName string) (*scheduler.Conf } // Creates a scheduler from the configuration file -func (f *ConfigFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler.Config, error) { +func (f *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler.Config, error) { glog.V(2).Infof("Creating scheduler from configuration: %v", policy) // validate the policy configuration @@ -663,7 +662,7 @@ func (f *ConfigFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler } // getBinder returns an extender that supports bind or a default binder. -func (f *ConfigFactory) getBinder(extenders []algorithm.SchedulerExtender) scheduler.Binder { +func (f *configFactory) getBinder(extenders []algorithm.SchedulerExtender) scheduler.Binder { for i := range extenders { if extenders[i].IsBinder() { return extenders[i] @@ -673,7 +672,7 @@ func (f *ConfigFactory) getBinder(extenders []algorithm.SchedulerExtender) sched } // Creates a scheduler from a set of registered fit predicate keys and priority keys. -func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) { +func (f *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) { glog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys) if f.GetHardPodAffinitySymmetricWeight() < 1 || f.GetHardPodAffinitySymmetricWeight() > 100 { @@ -736,7 +735,7 @@ func (n *nodeLister) List() ([]*v1.Node, error) { return n.NodeLister.List(labels.Everything()) } -func (f *ConfigFactory) GetPriorityFunctionConfigs(priorityKeys sets.String) ([]algorithm.PriorityConfig, error) { +func (f *configFactory) GetPriorityFunctionConfigs(priorityKeys sets.String) ([]algorithm.PriorityConfig, error) { pluginArgs, err := f.getPluginArgs() if err != nil { return nil, err @@ -745,7 +744,7 @@ func (f *ConfigFactory) GetPriorityFunctionConfigs(priorityKeys sets.String) ([] return getPriorityFunctionConfigs(priorityKeys, *pluginArgs) } -func (f *ConfigFactory) GetPriorityMetadataProducer() (algorithm.MetadataProducer, error) { +func (f *configFactory) GetPriorityMetadataProducer() (algorithm.MetadataProducer, error) { pluginArgs, err := f.getPluginArgs() if err != nil { return nil, err @@ -754,7 +753,7 @@ func (f *ConfigFactory) GetPriorityMetadataProducer() (algorithm.MetadataProduce return getPriorityMetadataProducer(*pluginArgs) } -func (f *ConfigFactory) GetPredicateMetadataProducer() (algorithm.PredicateMetadataProducer, error) { +func (f *configFactory) GetPredicateMetadataProducer() (algorithm.PredicateMetadataProducer, error) { pluginArgs, err := f.getPluginArgs() if err != nil { return nil, err @@ -762,7 +761,7 @@ func (f *ConfigFactory) GetPredicateMetadataProducer() (algorithm.PredicateMetad return getPredicateMetadataProducer(*pluginArgs) } -func (f *ConfigFactory) GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error) { +func (f *configFactory) GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error) { pluginArgs, err := f.getPluginArgs() if err != nil { return nil, err @@ -771,7 +770,7 @@ func (f *ConfigFactory) GetPredicates(predicateKeys sets.String) (map[string]alg return getFitPredicateFunctions(predicateKeys, *pluginArgs) } -func (f *ConfigFactory) getPluginArgs() (*PluginFactoryArgs, error) { +func (f *configFactory) getPluginArgs() (*PluginFactoryArgs, error) { return &PluginFactoryArgs{ PodLister: f.podLister, ServiceLister: f.serviceLister, @@ -786,7 +785,7 @@ func (f *ConfigFactory) getPluginArgs() (*PluginFactoryArgs, error) { }, nil } -func (f *ConfigFactory) getNextPod() *v1.Pod { +func (f *configFactory) getNextPod() *v1.Pod { for { pod := cache.Pop(f.podQueue).(*v1.Pod) if f.ResponsibleForPod(pod) { @@ -796,7 +795,7 @@ func (f *ConfigFactory) getNextPod() *v1.Pod { } } -func (f *ConfigFactory) ResponsibleForPod(pod *v1.Pod) bool { +func (f *configFactory) ResponsibleForPod(pod *v1.Pod) bool { return f.schedulerName == pod.Spec.SchedulerName } @@ -902,7 +901,7 @@ func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) core } } -func (factory *ConfigFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue *cache.FIFO) func(pod *v1.Pod, err error) { +func (factory *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue *cache.FIFO) func(pod *v1.Pod, err error) { return func(pod *v1.Pod, err error) { if err == core.ErrNoNodesAvailable { glog.V(4).Infof("Unable to schedule %v %v: no nodes are registered to the cluster; waiting", pod.Namespace, pod.Name)