diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 1c7cf34dc27..56f52b4f072 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -70,9 +70,8 @@ var ( noDiskConflictSet = sets.NewString("NoDiskConflict") ) -// ConfigFactory is the default implementation of the scheduler.Configurator interface. -// TODO make this private if possible, so that only its interface is externally used. -type ConfigFactory struct { +// configFactory is the default implementation of the scheduler.Configurator interface. +type configFactory struct { client clientset.Interface // queue for pods that need scheduling podQueue *cache.FIFO @@ -137,7 +136,7 @@ func NewConfigFactory( stopEverything := make(chan struct{}) schedulerCache := schedulercache.New(30*time.Second, stopEverything) - c := &ConfigFactory{ + c := &configFactory{ client: client, podLister: schedulerCache, podQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc), @@ -256,7 +255,7 @@ func NewConfigFactory( return c } -func (c *ConfigFactory) onPvAdd(obj interface{}) { +func (c *configFactory) onPvAdd(obj interface{}) { if c.enableEquivalenceClassCache { pv, ok := obj.(*v1.PersistentVolume) if !ok { @@ -267,7 +266,7 @@ func (c *ConfigFactory) onPvAdd(obj interface{}) { } } -func (c *ConfigFactory) onPvDelete(obj interface{}) { +func (c *configFactory) onPvDelete(obj interface{}) { if c.enableEquivalenceClassCache { var pv *v1.PersistentVolume switch t := obj.(type) { @@ -288,7 +287,7 @@ func (c *ConfigFactory) onPvDelete(obj interface{}) { } } -func (c *ConfigFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) { +func (c *configFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) { invalidPredicates := sets.NewString("MaxPDVolumeCountPredicate") if pv.Spec.AWSElasticBlockStore != nil { invalidPredicates.Insert("MaxEBSVolumeCount") @@ -302,7 +301,7 @@ func (c *ConfigFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) { c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) } -func (c *ConfigFactory) onPvcAdd(obj interface{}) { +func (c *configFactory) onPvcAdd(obj interface{}) { if c.enableEquivalenceClassCache { pvc, ok := obj.(*v1.PersistentVolumeClaim) if !ok { @@ -313,7 +312,7 @@ func (c *ConfigFactory) onPvcAdd(obj interface{}) { } } -func (c *ConfigFactory) onPvcDelete(obj interface{}) { +func (c *configFactory) onPvcDelete(obj interface{}) { if c.enableEquivalenceClassCache { var pvc *v1.PersistentVolumeClaim switch t := obj.(type) { @@ -334,19 +333,19 @@ func (c *ConfigFactory) onPvcDelete(obj interface{}) { } } -func (c *ConfigFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim) { +func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim) { if pvc.Spec.VolumeName != "" { c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(maxPDVolumeCountPredicateSet) } } -func (c *ConfigFactory) onServiceAdd(obj interface{}) { +func (c *configFactory) onServiceAdd(obj interface{}) { if c.enableEquivalenceClassCache { c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet) } } -func (c *ConfigFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) { +func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) { if c.enableEquivalenceClassCache { // TODO(resouer) We may need to invalidate this for specified group of pods only oldService := oldObj.(*v1.Service) @@ -357,36 +356,36 @@ func (c *ConfigFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) } } -func (c *ConfigFactory) onServiceDelete(obj interface{}) { +func (c *configFactory) onServiceDelete(obj interface{}) { if c.enableEquivalenceClassCache { c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet) } } // GetNodeStore provides the cache to the nodes, mostly internal use, but may also be called by mock-tests. -func (c *ConfigFactory) GetNodeLister() corelisters.NodeLister { +func (c *configFactory) GetNodeLister() corelisters.NodeLister { return c.nodeLister } -func (c *ConfigFactory) GetHardPodAffinitySymmetricWeight() int { +func (c *configFactory) GetHardPodAffinitySymmetricWeight() int { return c.hardPodAffinitySymmetricWeight } -func (f *ConfigFactory) GetSchedulerName() string { +func (f *configFactory) GetSchedulerName() string { return f.schedulerName } // GetClient provides a kubernetes client, mostly internal use, but may also be called by mock-tests. -func (f *ConfigFactory) GetClient() clientset.Interface { +func (f *configFactory) GetClient() clientset.Interface { return f.client } // GetScheduledPodListerIndexer provides a pod lister, mostly internal use, but may also be called by mock-tests. -func (c *ConfigFactory) GetScheduledPodLister() corelisters.PodLister { +func (c *configFactory) GetScheduledPodLister() corelisters.PodLister { return c.scheduledPodLister } -func (c *ConfigFactory) addPodToCache(obj interface{}) { +func (c *configFactory) addPodToCache(obj interface{}) { pod, ok := obj.(*v1.Pod) if !ok { glog.Errorf("cannot convert to *v1.Pod: %v", obj) @@ -400,7 +399,7 @@ func (c *ConfigFactory) addPodToCache(obj interface{}) { // handled optimistically in InvalidateCachedPredicateItemForPodAdd. } -func (c *ConfigFactory) updatePodInCache(oldObj, newObj interface{}) { +func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) { oldPod, ok := oldObj.(*v1.Pod) if !ok { glog.Errorf("cannot convert oldObj to *v1.Pod: %v", oldObj) @@ -419,7 +418,7 @@ func (c *ConfigFactory) updatePodInCache(oldObj, newObj interface{}) { c.invalidateCachedPredicatesOnUpdatePod(newPod, oldPod) } -func (c *ConfigFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, oldPod *v1.Pod) { +func (c *configFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, oldPod *v1.Pod) { if c.enableEquivalenceClassCache { // if the pod does not have binded node, updating equivalence cache is meaningless; // if pod's binded node has been changed, that case should be handled by pod add & delete. @@ -440,7 +439,7 @@ func (c *ConfigFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, ol } } -func (c *ConfigFactory) deletePodFromCache(obj interface{}) { +func (c *configFactory) deletePodFromCache(obj interface{}) { var pod *v1.Pod switch t := obj.(type) { case *v1.Pod: @@ -463,7 +462,7 @@ func (c *ConfigFactory) deletePodFromCache(obj interface{}) { c.invalidateCachedPredicatesOnDeletePod(pod) } -func (c *ConfigFactory) invalidateCachedPredicatesOnDeletePod(pod *v1.Pod) { +func (c *configFactory) invalidateCachedPredicatesOnDeletePod(pod *v1.Pod) { if c.enableEquivalenceClassCache { // part of this case is the same as pod add. c.equivalencePodCache.InvalidateCachedPredicateItemForPodAdd(pod, pod.Spec.NodeName) @@ -484,7 +483,7 @@ func (c *ConfigFactory) invalidateCachedPredicatesOnDeletePod(pod *v1.Pod) { } } -func (c *ConfigFactory) addNodeToCache(obj interface{}) { +func (c *configFactory) addNodeToCache(obj interface{}) { node, ok := obj.(*v1.Node) if !ok { glog.Errorf("cannot convert to *v1.Node: %v", obj) @@ -498,7 +497,7 @@ func (c *ConfigFactory) addNodeToCache(obj interface{}) { // NOTE: add a new node does not affect existing predicates in equivalence cache } -func (c *ConfigFactory) updateNodeInCache(oldObj, newObj interface{}) { +func (c *configFactory) updateNodeInCache(oldObj, newObj interface{}) { oldNode, ok := oldObj.(*v1.Node) if !ok { glog.Errorf("cannot convert oldObj to *v1.Node: %v", oldObj) @@ -517,7 +516,7 @@ func (c *ConfigFactory) updateNodeInCache(oldObj, newObj interface{}) { c.invalidateCachedPredicatesOnNodeUpdate(newNode, oldNode) } -func (c *ConfigFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node, oldNode *v1.Node) { +func (c *configFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node, oldNode *v1.Node) { if c.enableEquivalenceClassCache { // Begin to update equivalence cache based on node update // TODO(resouer): think about lazily initialize this set @@ -581,7 +580,7 @@ func (c *ConfigFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node, } } -func (c *ConfigFactory) deleteNodeFromCache(obj interface{}) { +func (c *configFactory) deleteNodeFromCache(obj interface{}) { var node *v1.Node switch t := obj.(type) { case *v1.Node: @@ -606,12 +605,12 @@ func (c *ConfigFactory) deleteNodeFromCache(obj interface{}) { } // Create creates a scheduler with the default algorithm provider. -func (f *ConfigFactory) Create() (*scheduler.Config, error) { +func (f *configFactory) Create() (*scheduler.Config, error) { return f.CreateFromProvider(DefaultProvider) } // Creates a scheduler from the name of a registered algorithm provider. -func (f *ConfigFactory) CreateFromProvider(providerName string) (*scheduler.Config, error) { +func (f *configFactory) CreateFromProvider(providerName string) (*scheduler.Config, error) { glog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName) provider, err := GetAlgorithmProvider(providerName) if err != nil { @@ -622,7 +621,7 @@ func (f *ConfigFactory) CreateFromProvider(providerName string) (*scheduler.Conf } // Creates a scheduler from the configuration file -func (f *ConfigFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler.Config, error) { +func (f *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler.Config, error) { glog.V(2).Infof("Creating scheduler from configuration: %v", policy) // validate the policy configuration @@ -662,7 +661,7 @@ func (f *ConfigFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler } // getBinder returns an extender that supports bind or a default binder. -func (f *ConfigFactory) getBinder(extenders []algorithm.SchedulerExtender) scheduler.Binder { +func (f *configFactory) getBinder(extenders []algorithm.SchedulerExtender) scheduler.Binder { for i := range extenders { if extenders[i].IsBinder() { return extenders[i] @@ -672,7 +671,7 @@ func (f *ConfigFactory) getBinder(extenders []algorithm.SchedulerExtender) sched } // Creates a scheduler from a set of registered fit predicate keys and priority keys. -func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) { +func (f *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) { glog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys) if f.GetHardPodAffinitySymmetricWeight() < 1 || f.GetHardPodAffinitySymmetricWeight() > 100 { @@ -735,7 +734,7 @@ func (n *nodeLister) List() ([]*v1.Node, error) { return n.NodeLister.List(labels.Everything()) } -func (f *ConfigFactory) GetPriorityFunctionConfigs(priorityKeys sets.String) ([]algorithm.PriorityConfig, error) { +func (f *configFactory) GetPriorityFunctionConfigs(priorityKeys sets.String) ([]algorithm.PriorityConfig, error) { pluginArgs, err := f.getPluginArgs() if err != nil { return nil, err @@ -744,7 +743,7 @@ func (f *ConfigFactory) GetPriorityFunctionConfigs(priorityKeys sets.String) ([] return getPriorityFunctionConfigs(priorityKeys, *pluginArgs) } -func (f *ConfigFactory) GetPriorityMetadataProducer() (algorithm.MetadataProducer, error) { +func (f *configFactory) GetPriorityMetadataProducer() (algorithm.MetadataProducer, error) { pluginArgs, err := f.getPluginArgs() if err != nil { return nil, err @@ -753,7 +752,7 @@ func (f *ConfigFactory) GetPriorityMetadataProducer() (algorithm.MetadataProduce return getPriorityMetadataProducer(*pluginArgs) } -func (f *ConfigFactory) GetPredicateMetadataProducer() (algorithm.PredicateMetadataProducer, error) { +func (f *configFactory) GetPredicateMetadataProducer() (algorithm.PredicateMetadataProducer, error) { pluginArgs, err := f.getPluginArgs() if err != nil { return nil, err @@ -761,7 +760,7 @@ func (f *ConfigFactory) GetPredicateMetadataProducer() (algorithm.PredicateMetad return getPredicateMetadataProducer(*pluginArgs) } -func (f *ConfigFactory) GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error) { +func (f *configFactory) GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error) { pluginArgs, err := f.getPluginArgs() if err != nil { return nil, err @@ -770,7 +769,7 @@ func (f *ConfigFactory) GetPredicates(predicateKeys sets.String) (map[string]alg return getFitPredicateFunctions(predicateKeys, *pluginArgs) } -func (f *ConfigFactory) getPluginArgs() (*PluginFactoryArgs, error) { +func (f *configFactory) getPluginArgs() (*PluginFactoryArgs, error) { return &PluginFactoryArgs{ PodLister: f.podLister, ServiceLister: f.serviceLister, @@ -785,7 +784,7 @@ func (f *ConfigFactory) getPluginArgs() (*PluginFactoryArgs, error) { }, nil } -func (f *ConfigFactory) getNextPod() *v1.Pod { +func (f *configFactory) getNextPod() *v1.Pod { for { pod := cache.Pop(f.podQueue).(*v1.Pod) if f.ResponsibleForPod(pod) { @@ -795,7 +794,7 @@ func (f *ConfigFactory) getNextPod() *v1.Pod { } } -func (f *ConfigFactory) ResponsibleForPod(pod *v1.Pod) bool { +func (f *configFactory) ResponsibleForPod(pod *v1.Pod) bool { return f.schedulerName == pod.Spec.SchedulerName } @@ -901,7 +900,7 @@ func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) core } } -func (factory *ConfigFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue *cache.FIFO) func(pod *v1.Pod, err error) { +func (factory *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue *cache.FIFO) func(pod *v1.Pod, err error) { return func(pod *v1.Pod, err error) { if err == core.ErrNoNodesAvailable { glog.V(4).Infof("Unable to schedule %v %v: no nodes are registered to the cluster; waiting", pod.Namespace, pod.Name)