mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 12:43:23 +00:00
Merge pull request #52903 from wackxu/mpi
Automatic merge from submit-queue (batch tested with PRs 53507, 53772, 52903, 53543). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Make configFactory private **What this PR does / why we need it**: Fix TODO make this private if possible, so that only its interface is externally used. I have check the use of configFactory and it is safe to make it private. **Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes # **Release note**: ```release-note NONE ```
This commit is contained in:
commit
46ae7c4a30
@ -70,9 +70,8 @@ var (
|
||||
noDiskConflictSet = sets.NewString("NoDiskConflict")
|
||||
)
|
||||
|
||||
// ConfigFactory is the default implementation of the scheduler.Configurator interface.
|
||||
// TODO make this private if possible, so that only its interface is externally used.
|
||||
type ConfigFactory struct {
|
||||
// configFactory is the default implementation of the scheduler.Configurator interface.
|
||||
type configFactory struct {
|
||||
client clientset.Interface
|
||||
// queue for pods that need scheduling
|
||||
podQueue *cache.FIFO
|
||||
@ -137,7 +136,7 @@ func NewConfigFactory(
|
||||
stopEverything := make(chan struct{})
|
||||
schedulerCache := schedulercache.New(30*time.Second, stopEverything)
|
||||
|
||||
c := &ConfigFactory{
|
||||
c := &configFactory{
|
||||
client: client,
|
||||
podLister: schedulerCache,
|
||||
podQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc),
|
||||
@ -256,7 +255,7 @@ func NewConfigFactory(
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) onPvAdd(obj interface{}) {
|
||||
func (c *configFactory) onPvAdd(obj interface{}) {
|
||||
if c.enableEquivalenceClassCache {
|
||||
pv, ok := obj.(*v1.PersistentVolume)
|
||||
if !ok {
|
||||
@ -267,7 +266,7 @@ func (c *ConfigFactory) onPvAdd(obj interface{}) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) onPvDelete(obj interface{}) {
|
||||
func (c *configFactory) onPvDelete(obj interface{}) {
|
||||
if c.enableEquivalenceClassCache {
|
||||
var pv *v1.PersistentVolume
|
||||
switch t := obj.(type) {
|
||||
@ -288,7 +287,7 @@ func (c *ConfigFactory) onPvDelete(obj interface{}) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) {
|
||||
func (c *configFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) {
|
||||
invalidPredicates := sets.NewString("MaxPDVolumeCountPredicate")
|
||||
if pv.Spec.AWSElasticBlockStore != nil {
|
||||
invalidPredicates.Insert("MaxEBSVolumeCount")
|
||||
@ -302,7 +301,7 @@ func (c *ConfigFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) {
|
||||
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) onPvcAdd(obj interface{}) {
|
||||
func (c *configFactory) onPvcAdd(obj interface{}) {
|
||||
if c.enableEquivalenceClassCache {
|
||||
pvc, ok := obj.(*v1.PersistentVolumeClaim)
|
||||
if !ok {
|
||||
@ -313,7 +312,7 @@ func (c *ConfigFactory) onPvcAdd(obj interface{}) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) onPvcDelete(obj interface{}) {
|
||||
func (c *configFactory) onPvcDelete(obj interface{}) {
|
||||
if c.enableEquivalenceClassCache {
|
||||
var pvc *v1.PersistentVolumeClaim
|
||||
switch t := obj.(type) {
|
||||
@ -334,19 +333,19 @@ func (c *ConfigFactory) onPvcDelete(obj interface{}) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim) {
|
||||
func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim) {
|
||||
if pvc.Spec.VolumeName != "" {
|
||||
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(maxPDVolumeCountPredicateSet)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) onServiceAdd(obj interface{}) {
|
||||
func (c *configFactory) onServiceAdd(obj interface{}) {
|
||||
if c.enableEquivalenceClassCache {
|
||||
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) {
|
||||
func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) {
|
||||
if c.enableEquivalenceClassCache {
|
||||
// TODO(resouer) We may need to invalidate this for specified group of pods only
|
||||
oldService := oldObj.(*v1.Service)
|
||||
@ -357,36 +356,36 @@ func (c *ConfigFactory) onServiceUpdate(oldObj interface{}, newObj interface{})
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) onServiceDelete(obj interface{}) {
|
||||
func (c *configFactory) onServiceDelete(obj interface{}) {
|
||||
if c.enableEquivalenceClassCache {
|
||||
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet)
|
||||
}
|
||||
}
|
||||
|
||||
// GetNodeStore provides the cache to the nodes, mostly internal use, but may also be called by mock-tests.
|
||||
func (c *ConfigFactory) GetNodeLister() corelisters.NodeLister {
|
||||
func (c *configFactory) GetNodeLister() corelisters.NodeLister {
|
||||
return c.nodeLister
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) GetHardPodAffinitySymmetricWeight() int {
|
||||
func (c *configFactory) GetHardPodAffinitySymmetricWeight() int {
|
||||
return c.hardPodAffinitySymmetricWeight
|
||||
}
|
||||
|
||||
func (f *ConfigFactory) GetSchedulerName() string {
|
||||
func (f *configFactory) GetSchedulerName() string {
|
||||
return f.schedulerName
|
||||
}
|
||||
|
||||
// GetClient provides a kubernetes client, mostly internal use, but may also be called by mock-tests.
|
||||
func (f *ConfigFactory) GetClient() clientset.Interface {
|
||||
func (f *configFactory) GetClient() clientset.Interface {
|
||||
return f.client
|
||||
}
|
||||
|
||||
// GetScheduledPodListerIndexer provides a pod lister, mostly internal use, but may also be called by mock-tests.
|
||||
func (c *ConfigFactory) GetScheduledPodLister() corelisters.PodLister {
|
||||
func (c *configFactory) GetScheduledPodLister() corelisters.PodLister {
|
||||
return c.scheduledPodLister
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) addPodToCache(obj interface{}) {
|
||||
func (c *configFactory) addPodToCache(obj interface{}) {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
glog.Errorf("cannot convert to *v1.Pod: %v", obj)
|
||||
@ -400,7 +399,7 @@ func (c *ConfigFactory) addPodToCache(obj interface{}) {
|
||||
// handled optimistically in InvalidateCachedPredicateItemForPodAdd.
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) updatePodInCache(oldObj, newObj interface{}) {
|
||||
func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) {
|
||||
oldPod, ok := oldObj.(*v1.Pod)
|
||||
if !ok {
|
||||
glog.Errorf("cannot convert oldObj to *v1.Pod: %v", oldObj)
|
||||
@ -419,7 +418,7 @@ func (c *ConfigFactory) updatePodInCache(oldObj, newObj interface{}) {
|
||||
c.invalidateCachedPredicatesOnUpdatePod(newPod, oldPod)
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, oldPod *v1.Pod) {
|
||||
func (c *configFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, oldPod *v1.Pod) {
|
||||
if c.enableEquivalenceClassCache {
|
||||
// if the pod does not have binded node, updating equivalence cache is meaningless;
|
||||
// if pod's binded node has been changed, that case should be handled by pod add & delete.
|
||||
@ -440,7 +439,7 @@ func (c *ConfigFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, ol
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) deletePodFromCache(obj interface{}) {
|
||||
func (c *configFactory) deletePodFromCache(obj interface{}) {
|
||||
var pod *v1.Pod
|
||||
switch t := obj.(type) {
|
||||
case *v1.Pod:
|
||||
@ -463,7 +462,7 @@ func (c *ConfigFactory) deletePodFromCache(obj interface{}) {
|
||||
c.invalidateCachedPredicatesOnDeletePod(pod)
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) invalidateCachedPredicatesOnDeletePod(pod *v1.Pod) {
|
||||
func (c *configFactory) invalidateCachedPredicatesOnDeletePod(pod *v1.Pod) {
|
||||
if c.enableEquivalenceClassCache {
|
||||
// part of this case is the same as pod add.
|
||||
c.equivalencePodCache.InvalidateCachedPredicateItemForPodAdd(pod, pod.Spec.NodeName)
|
||||
@ -484,7 +483,7 @@ func (c *ConfigFactory) invalidateCachedPredicatesOnDeletePod(pod *v1.Pod) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) addNodeToCache(obj interface{}) {
|
||||
func (c *configFactory) addNodeToCache(obj interface{}) {
|
||||
node, ok := obj.(*v1.Node)
|
||||
if !ok {
|
||||
glog.Errorf("cannot convert to *v1.Node: %v", obj)
|
||||
@ -498,7 +497,7 @@ func (c *ConfigFactory) addNodeToCache(obj interface{}) {
|
||||
// NOTE: add a new node does not affect existing predicates in equivalence cache
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) updateNodeInCache(oldObj, newObj interface{}) {
|
||||
func (c *configFactory) updateNodeInCache(oldObj, newObj interface{}) {
|
||||
oldNode, ok := oldObj.(*v1.Node)
|
||||
if !ok {
|
||||
glog.Errorf("cannot convert oldObj to *v1.Node: %v", oldObj)
|
||||
@ -517,7 +516,7 @@ func (c *ConfigFactory) updateNodeInCache(oldObj, newObj interface{}) {
|
||||
c.invalidateCachedPredicatesOnNodeUpdate(newNode, oldNode)
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node, oldNode *v1.Node) {
|
||||
func (c *configFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node, oldNode *v1.Node) {
|
||||
if c.enableEquivalenceClassCache {
|
||||
// Begin to update equivalence cache based on node update
|
||||
// TODO(resouer): think about lazily initialize this set
|
||||
@ -581,7 +580,7 @@ func (c *ConfigFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ConfigFactory) deleteNodeFromCache(obj interface{}) {
|
||||
func (c *configFactory) deleteNodeFromCache(obj interface{}) {
|
||||
var node *v1.Node
|
||||
switch t := obj.(type) {
|
||||
case *v1.Node:
|
||||
@ -606,12 +605,12 @@ func (c *ConfigFactory) deleteNodeFromCache(obj interface{}) {
|
||||
}
|
||||
|
||||
// Create creates a scheduler with the default algorithm provider.
|
||||
func (f *ConfigFactory) Create() (*scheduler.Config, error) {
|
||||
func (f *configFactory) Create() (*scheduler.Config, error) {
|
||||
return f.CreateFromProvider(DefaultProvider)
|
||||
}
|
||||
|
||||
// Creates a scheduler from the name of a registered algorithm provider.
|
||||
func (f *ConfigFactory) CreateFromProvider(providerName string) (*scheduler.Config, error) {
|
||||
func (f *configFactory) CreateFromProvider(providerName string) (*scheduler.Config, error) {
|
||||
glog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
|
||||
provider, err := GetAlgorithmProvider(providerName)
|
||||
if err != nil {
|
||||
@ -622,7 +621,7 @@ func (f *ConfigFactory) CreateFromProvider(providerName string) (*scheduler.Conf
|
||||
}
|
||||
|
||||
// Creates a scheduler from the configuration file
|
||||
func (f *ConfigFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler.Config, error) {
|
||||
func (f *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler.Config, error) {
|
||||
glog.V(2).Infof("Creating scheduler from configuration: %v", policy)
|
||||
|
||||
// validate the policy configuration
|
||||
@ -662,7 +661,7 @@ func (f *ConfigFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler
|
||||
}
|
||||
|
||||
// getBinder returns an extender that supports bind or a default binder.
|
||||
func (f *ConfigFactory) getBinder(extenders []algorithm.SchedulerExtender) scheduler.Binder {
|
||||
func (f *configFactory) getBinder(extenders []algorithm.SchedulerExtender) scheduler.Binder {
|
||||
for i := range extenders {
|
||||
if extenders[i].IsBinder() {
|
||||
return extenders[i]
|
||||
@ -672,7 +671,7 @@ func (f *ConfigFactory) getBinder(extenders []algorithm.SchedulerExtender) sched
|
||||
}
|
||||
|
||||
// Creates a scheduler from a set of registered fit predicate keys and priority keys.
|
||||
func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) {
|
||||
func (f *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) {
|
||||
glog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys)
|
||||
|
||||
if f.GetHardPodAffinitySymmetricWeight() < 1 || f.GetHardPodAffinitySymmetricWeight() > 100 {
|
||||
@ -735,7 +734,7 @@ func (n *nodeLister) List() ([]*v1.Node, error) {
|
||||
return n.NodeLister.List(labels.Everything())
|
||||
}
|
||||
|
||||
func (f *ConfigFactory) GetPriorityFunctionConfigs(priorityKeys sets.String) ([]algorithm.PriorityConfig, error) {
|
||||
func (f *configFactory) GetPriorityFunctionConfigs(priorityKeys sets.String) ([]algorithm.PriorityConfig, error) {
|
||||
pluginArgs, err := f.getPluginArgs()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -744,7 +743,7 @@ func (f *ConfigFactory) GetPriorityFunctionConfigs(priorityKeys sets.String) ([]
|
||||
return getPriorityFunctionConfigs(priorityKeys, *pluginArgs)
|
||||
}
|
||||
|
||||
func (f *ConfigFactory) GetPriorityMetadataProducer() (algorithm.MetadataProducer, error) {
|
||||
func (f *configFactory) GetPriorityMetadataProducer() (algorithm.MetadataProducer, error) {
|
||||
pluginArgs, err := f.getPluginArgs()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -753,7 +752,7 @@ func (f *ConfigFactory) GetPriorityMetadataProducer() (algorithm.MetadataProduce
|
||||
return getPriorityMetadataProducer(*pluginArgs)
|
||||
}
|
||||
|
||||
func (f *ConfigFactory) GetPredicateMetadataProducer() (algorithm.PredicateMetadataProducer, error) {
|
||||
func (f *configFactory) GetPredicateMetadataProducer() (algorithm.PredicateMetadataProducer, error) {
|
||||
pluginArgs, err := f.getPluginArgs()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -761,7 +760,7 @@ func (f *ConfigFactory) GetPredicateMetadataProducer() (algorithm.PredicateMetad
|
||||
return getPredicateMetadataProducer(*pluginArgs)
|
||||
}
|
||||
|
||||
func (f *ConfigFactory) GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error) {
|
||||
func (f *configFactory) GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error) {
|
||||
pluginArgs, err := f.getPluginArgs()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -770,7 +769,7 @@ func (f *ConfigFactory) GetPredicates(predicateKeys sets.String) (map[string]alg
|
||||
return getFitPredicateFunctions(predicateKeys, *pluginArgs)
|
||||
}
|
||||
|
||||
func (f *ConfigFactory) getPluginArgs() (*PluginFactoryArgs, error) {
|
||||
func (f *configFactory) getPluginArgs() (*PluginFactoryArgs, error) {
|
||||
return &PluginFactoryArgs{
|
||||
PodLister: f.podLister,
|
||||
ServiceLister: f.serviceLister,
|
||||
@ -785,7 +784,7 @@ func (f *ConfigFactory) getPluginArgs() (*PluginFactoryArgs, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (f *ConfigFactory) getNextPod() *v1.Pod {
|
||||
func (f *configFactory) getNextPod() *v1.Pod {
|
||||
for {
|
||||
pod := cache.Pop(f.podQueue).(*v1.Pod)
|
||||
if f.ResponsibleForPod(pod) {
|
||||
@ -795,7 +794,7 @@ func (f *ConfigFactory) getNextPod() *v1.Pod {
|
||||
}
|
||||
}
|
||||
|
||||
func (f *ConfigFactory) ResponsibleForPod(pod *v1.Pod) bool {
|
||||
func (f *configFactory) ResponsibleForPod(pod *v1.Pod) bool {
|
||||
return f.schedulerName == pod.Spec.SchedulerName
|
||||
}
|
||||
|
||||
@ -901,7 +900,7 @@ func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) core
|
||||
}
|
||||
}
|
||||
|
||||
func (factory *ConfigFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue *cache.FIFO) func(pod *v1.Pod, err error) {
|
||||
func (factory *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue *cache.FIFO) func(pod *v1.Pod, err error) {
|
||||
return func(pod *v1.Pod, err error) {
|
||||
if err == core.ErrNoNodesAvailable {
|
||||
glog.V(4).Infof("Unable to schedule %v %v: no nodes are registered to the cluster; waiting", pod.Namespace, pod.Name)
|
||||
|
Loading…
Reference in New Issue
Block a user