diff --git a/plugin/pkg/scheduler/algorithm/listers.go b/plugin/pkg/scheduler/algorithm/listers.go index 690b0a3f793..343a27c7946 100644 --- a/plugin/pkg/scheduler/algorithm/listers.go +++ b/plugin/pkg/scheduler/algorithm/listers.go @@ -76,7 +76,7 @@ func (f FakeServiceLister) List(labels.Selector) ([]*api.Service, error) { return f, nil } -// GetPodServices gets the services that have the selector that match the labels on the given pod +// GetPodServices gets the services that have the selector that match the labels on the given pod. func (f FakeServiceLister) GetPodServices(pod *api.Pod) (services []*api.Service, err error) { var selector labels.Selector @@ -91,10 +91,6 @@ func (f FakeServiceLister) GetPodServices(pod *api.Pod) (services []*api.Service services = append(services, service) } } - if len(services) == 0 { - err = fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) - } - return } diff --git a/plugin/pkg/scheduler/algorithm/predicates/metadata.go b/plugin/pkg/scheduler/algorithm/predicates/metadata.go new file mode 100644 index 00000000000..7c04d80a4fc --- /dev/null +++ b/plugin/pkg/scheduler/algorithm/predicates/metadata.go @@ -0,0 +1,59 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package predicates + +import ( + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" +) + +type PredicateMetadataFactory struct { + podLister algorithm.PodLister +} + +func NewPredicateMetadataFactory(podLister algorithm.PodLister) algorithm.MetadataProducer { + factory := &PredicateMetadataFactory{ + podLister, + } + return factory.GetMetadata +} + +// GetMetadata returns the predicateMetadata used which will be used by various predicates. +func (pfactory *PredicateMetadataFactory) GetMetadata(pod *api.Pod, nodeNameToInfoMap map[string]*schedulercache.NodeInfo) interface{} { + // If we cannot compute metadata, just return nil + if pod == nil { + return nil + } + matchingTerms, err := getMatchingAntiAffinityTerms(pod, nodeNameToInfoMap) + if err != nil { + return nil + } + predicateMetadata := &predicateMetadata{ + pod: pod, + podBestEffort: isPodBestEffort(pod), + podRequest: GetResourceRequest(pod), + podPorts: GetUsedPorts(pod), + matchingAntiAffinityTerms: matchingTerms, + } + for predicateName, precomputeFunc := range predicatePrecomputations { + glog.V(4).Info("Precompute: %v", predicateName) + precomputeFunc(predicateMetadata) + } + return predicateMetadata +} diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index 53ed83e1d34..086c6665b5b 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -36,6 +36,19 @@ import ( "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) +// predicatePrecomputations: Helper types/variables... +type PredicateMetadataModifier func(pm *predicateMetadata) + +var predicatePrecomputeRegisterLock sync.Mutex +var predicatePrecomputations map[string]PredicateMetadataModifier = make(map[string]PredicateMetadataModifier) + +func RegisterPredicatePrecomputation(predicateName string, precomp PredicateMetadataModifier) { + predicatePrecomputeRegisterLock.Lock() + defer predicatePrecomputeRegisterLock.Unlock() + predicatePrecomputations[predicateName] = precomp +} + +// Other types for predicate functions... type NodeInfo interface { GetNodeInfo(nodeID string) (*api.Node, error) } @@ -67,34 +80,21 @@ func (c *CachedNodeInfo) GetNodeInfo(id string) (*api.Node, error) { return node.(*api.Node), nil } -// predicateMetadata is a type that is passed as metadata for predicate functions -type predicateMetadata struct { - podBestEffort bool - podRequest *schedulercache.Resource - podPorts map[int]bool - matchingAntiAffinityTerms []matchingPodAntiAffinityTerm -} - +// Note that predicateMetdata and matchingPodAntiAffinityTerm need to be declared in the same file +// due to the way declarations are processed in predicate declaration unit tests. type matchingPodAntiAffinityTerm struct { term *api.PodAffinityTerm node *api.Node } -func PredicateMetadata(pod *api.Pod, nodeInfoMap map[string]*schedulercache.NodeInfo) interface{} { - // If we cannot compute metadata, just return nil - if pod == nil { - return nil - } - matchingTerms, err := getMatchingAntiAffinityTerms(pod, nodeInfoMap) - if err != nil { - return nil - } - return &predicateMetadata{ - podBestEffort: isPodBestEffort(pod), - podRequest: GetResourceRequest(pod), - podPorts: GetUsedPorts(pod), - matchingAntiAffinityTerms: matchingTerms, - } +type predicateMetadata struct { + pod *api.Pod + podBestEffort bool + podRequest *schedulercache.Resource + podPorts map[int]bool + matchingAntiAffinityTerms []matchingPodAntiAffinityTerm + serviceAffinityMatchingPodList []*api.Pod + serviceAffinityMatchingPodServices []*api.Service } func isVolumeConflict(volume api.Volume, pod *api.Pod) bool { @@ -627,20 +627,42 @@ type ServiceAffinity struct { labels []string } -func NewServiceAffinityPredicate(podLister algorithm.PodLister, serviceLister algorithm.ServiceLister, nodeInfo NodeInfo, labels []string) algorithm.FitPredicate { +// serviceAffinityPrecomputation should be run once by the scheduler before looping through the Predicate. It is a helper function that +// only should be referenced by NewServiceAffinityPredicate. +func (s *ServiceAffinity) serviceAffinityPrecomputation(pm *predicateMetadata) { + if pm.pod == nil { + glog.Errorf("Cannot precompute service affinity, a pod is required to caluculate service affinity.") + return + } + + var errSvc, errList error + // Store services which match the pod. + pm.serviceAffinityMatchingPodServices, errSvc = s.serviceLister.GetPodServices(pm.pod) + selector := CreateSelectorFromLabels(pm.pod.Labels) + // consider only the pods that belong to the same namespace + allMatches, errList := s.podLister.List(selector) + + // In the future maybe we will return them as part of the function. + if errSvc != nil || errList != nil { + glog.Errorf("Some Error were found while precomputing svc affinity: \nservices:%v , \npods:%v", errSvc, errList) + } + pm.serviceAffinityMatchingPodList = FilterPodsByNamespace(allMatches, pm.pod.Namespace) +} + +func NewServiceAffinityPredicate(podLister algorithm.PodLister, serviceLister algorithm.ServiceLister, nodeInfo NodeInfo, labels []string) (algorithm.FitPredicate, PredicateMetadataModifier) { affinity := &ServiceAffinity{ podLister: podLister, serviceLister: serviceLister, nodeInfo: nodeInfo, labels: labels, } - return affinity.CheckServiceAffinity + return affinity.checkServiceAffinity, affinity.serviceAffinityPrecomputation } -// The checkServiceAffinity predicate matches nodes in such a way to force that -// ServiceAffinity.labels are homogenous for pods added to a node. -// (i.e. it returns true IFF this pod can be added to this node, such -// that all other pods in the same service are running on nodes w/ +// checkServiceAffinity is a predicate which matches nodes in such a way to force that +// ServiceAffinity.labels are homogenous for pods that are scheduled to a node. +// (i.e. it returns true IFF this pod can be added to this node such that all other pods in +// the same service are running on nodes with // the exact same ServiceAffinity.label values). // // Details: @@ -650,46 +672,47 @@ func NewServiceAffinityPredicate(podLister algorithm.PodLister, serviceLister al // the match. // Otherwise: // Create an "implicit selector" which gaurantees pods will land on nodes with similar values -// for the affinity labels. +// for the affinity labels. +// // To do this, we "reverse engineer" a selector by introspecting existing pods running under the same service+namespace. // These backfilled labels in the selector "L" are defined like so: // - L is a label that the ServiceAffinity object needs as a matching constraints. // - L is not defined in the pod itself already. // - and SOME pod, from a service, in the same namespace, ALREADY scheduled onto a node, has a matching value. -func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { +// +// WARNING: This Predicate is NOT gauranteed to work if some of the predicateMetadata data isn't precomputed... +// For that reason it is not exported, i.e. it is highlhy coupled to the implementation of the FitPredicate construction. +func (s *ServiceAffinity) checkServiceAffinity(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { + var services []*api.Service + var pods []*api.Pod + if pm, ok := meta.(*predicateMetadata); ok && (pm.serviceAffinityMatchingPodList != nil || pm.serviceAffinityMatchingPodServices != nil) { + services = pm.serviceAffinityMatchingPodServices + pods = pm.serviceAffinityMatchingPodList + } else { + // Make the predicate resilient in case metadata is missing. + pm = &predicateMetadata{pod: pod} + s.serviceAffinityPrecomputation(pm) + pods, services = pm.serviceAffinityMatchingPodList, pm.serviceAffinityMatchingPodServices + } node := nodeInfo.Node() if node == nil { return false, nil, fmt.Errorf("node not found") } - // check if the pod being scheduled has the affinity labels specified in its NodeSelector affinityLabels := FindLabelsInSet(s.labels, labels.Set(pod.Spec.NodeSelector)) - - // Introspect services IFF we didn't predefine all the affinity labels in the pod itself. + // Step 1: If we don't have all constraints, introspect nodes to find the missing constraints. if len(s.labels) > len(affinityLabels) { - services, err := s.serviceLister.GetPodServices(pod) - if err == nil && len(services) > 0 { - // just use the first service and get the other pods within the service - // TODO: a separate predicate can be created that tries to handle all services for the pod - selector := labels.SelectorFromSet(services[0].Spec.Selector) - servicePods, err := s.podLister.List(selector) - if err != nil { - return false, nil, err - } - // consider only the pods that belong to the same namespace - nsServicePods := FilterPodsByNamespace(servicePods, pod.Namespace) - if len(nsServicePods) > 0 { - // consider any service pod and fetch the node its hosted on - otherNode, err := s.nodeInfo.GetNodeInfo(nsServicePods[0].Spec.NodeName) + if len(services) > 0 { + if len(pods) > 0 { + nodeWithAffinityLabels, err := s.nodeInfo.GetNodeInfo(pods[0].Spec.NodeName) if err != nil { return false, nil, err } - AddUnsetLabelsToMap(affinityLabels, s.labels, labels.Set(otherNode.Labels)) + AddUnsetLabelsToMap(affinityLabels, s.labels, labels.Set(nodeWithAffinityLabels.Labels)) } } } - - // check if the node matches the selector + // Step 2: Finally complete the affinity predicate based on whatever set of predicates we were able to find. if CreateSelectorFromLabels(affinityLabels).Matches(labels.Set(node.Labels)) { return true, nil, nil } diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go index 7c308e7ae77..9c394ea34ce 100755 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go @@ -119,6 +119,11 @@ func newResourceInitPod(pod *api.Pod, usage ...schedulercache.Resource) *api.Pod return pod } +func PredicateMetadata(p *api.Pod, nodeInfo map[string]*schedulercache.NodeInfo) interface{} { + pm := PredicateMetadataFactory{algorithm.FakePodLister{p}} + return pm.GetMetadata(p, nodeInfo) +} + func TestPodFitsResources(t *testing.T) { enoughPodsTests := []struct { pod *api.Pod @@ -233,7 +238,6 @@ func TestPodFitsResources(t *testing.T) { for _, test := range enoughPodsTests { node := api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32)}} test.nodeInfo.SetNode(&node) - fits, reasons, err := PodFitsResources(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo) if err != nil { t.Errorf("%s: unexpected error: %v", test.test, err) @@ -289,7 +293,6 @@ func TestPodFitsResources(t *testing.T) { for _, test := range notEnoughPodsTests { node := api.Node{Status: api.NodeStatus{Capacity: api.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 0, 1)}} test.nodeInfo.SetNode(&node) - fits, reasons, err := PodFitsResources(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo) if err != nil { t.Errorf("%s: unexpected error: %v", test.test, err) @@ -1310,22 +1313,38 @@ func TestServiceAffinity(t *testing.T) { }, } expectedFailureReasons := []algorithm.PredicateFailureReason{ErrServiceAffinityViolated} - for _, test := range tests { - nodes := []api.Node{node1, node2, node3, node4, node5} - serviceAffinity := ServiceAffinity{algorithm.FakePodLister(test.pods), algorithm.FakeServiceLister(test.services), FakeNodeListInfo(nodes), test.labels} - nodeInfo := schedulercache.NewNodeInfo() - nodeInfo.SetNode(test.node) - fits, reasons, err := serviceAffinity.CheckServiceAffinity(test.pod, PredicateMetadata(test.pod, nil), nodeInfo) - if err != nil { - t.Errorf("%s: unexpected error: %v", test.test, err) - } - if !fits && !reflect.DeepEqual(reasons, expectedFailureReasons) { - t.Errorf("%s: unexpected failure reasons: %v, want: %v", test.test, reasons, expectedFailureReasons) - } - if fits != test.fits { - t.Errorf("%s: expected: %v got %v", test.test, test.fits, fits) + testIt := func(skipPrecompute bool) { + nodes := []api.Node{node1, node2, node3, node4, node5} + nodeInfo := schedulercache.NewNodeInfo() + nodeInfo.SetNode(test.node) + nodeInfoMap := map[string]*schedulercache.NodeInfo{test.node.Name: nodeInfo} + // Reimplementing the logic that the scheduler implements: Any time it makes a predicate, it registers any precomputations. + predicate, precompute := NewServiceAffinityPredicate(algorithm.FakePodLister(test.pods), algorithm.FakeServiceLister(test.services), FakeNodeListInfo(nodes), test.labels) + // Register a precomputation or Rewrite the precomputation to a no-op, depending on the state we want to test. + RegisterPredicatePrecomputation("checkServiceAffinity-unitTestPredicate", func(pm *predicateMetadata) { + if !skipPrecompute { + precompute(pm) + } + }) + if pmeta, ok := (PredicateMetadata(test.pod, nodeInfoMap)).(*predicateMetadata); ok { + fits, reasons, err := predicate(test.pod, pmeta, nodeInfo) + if err != nil { + t.Errorf("%s: unexpected error: %v", test.test, err) + } + if !fits && !reflect.DeepEqual(reasons, expectedFailureReasons) { + t.Errorf("%s: unexpected failure reasons: %v, want: %v", test.test, reasons, expectedFailureReasons) + } + if fits != test.fits { + t.Errorf("%s: expected: %v got %v", test.test, test.fits, fits) + } + } else { + t.Errorf("Error casting.") + } } + + testIt(false) // Confirm that the predicate works without precomputed data (resilience) + testIt(true) // Confirm that the predicate works with the precomputed data (better performance) } } @@ -1586,7 +1605,6 @@ func TestEBSVolumeCountConflicts(t *testing.T) { } return "", false }, - FilterPersistentVolume: func(pv *api.PersistentVolume) (string, bool) { if pv.Spec.AWSElasticBlockStore != nil { return pv.Spec.AWSElasticBlockStore.VolumeID, true @@ -1652,7 +1670,7 @@ func TestPredicatesRegistered(t *testing.T) { if err == nil { functions = append(functions, fileFunctions...) } else { - t.Errorf("unexpected error when parsing %s", filePath) + t.Errorf("unexpected error %s when parsing %s", err, filePath) } } diff --git a/plugin/pkg/scheduler/algorithm/predicates/utils_test.go b/plugin/pkg/scheduler/algorithm/predicates/utils_test.go index d3d32e12bfb..32729304cfd 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/utils_test.go +++ b/plugin/pkg/scheduler/algorithm/predicates/utils_test.go @@ -49,7 +49,11 @@ func ExampleFindLabelsInSet() { }, }, - {}, // a third pod which will have no effect on anything. + { + ObjectMeta: api.ObjectMeta{ + Name: "pod3ThatWeWontSee", + }, + }, } fmt.Println(FindLabelsInSet([]string{"label1", "label2", "label3"}, nsPods[0].ObjectMeta.Labels)["label3"]) AddUnsetLabelsToMap(labelSubset, []string{"label1", "label2", "label3"}, nsPods[0].ObjectMeta.Labels) diff --git a/plugin/pkg/scheduler/algorithm/types.go b/plugin/pkg/scheduler/algorithm/types.go index f0983774692..8034df6b866 100644 --- a/plugin/pkg/scheduler/algorithm/types.go +++ b/plugin/pkg/scheduler/algorithm/types.go @@ -54,6 +54,7 @@ type PriorityConfig struct { Weight int } +// EmptyMetadataProducer returns a no-op MetadataProducer type. func EmptyMetadataProducer(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) interface{} { return nil } diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index 3c9fcf1517e..cc73967e51a 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -66,6 +66,11 @@ func init() { return priorities.PriorityMetadata }) + factory.RegisterPredicateMetadataProducerFactory( + func(args factory.PluginFactoryArgs) algorithm.MetadataProducer { + return predicates.NewPredicateMetadataFactory(args.PodLister) + }) + // EqualPriority is a prioritizer function that gives an equal weight of one to all nodes // Register the priority function so that its available // but do not include it as part of the default priorities diff --git a/plugin/pkg/scheduler/extender_test.go b/plugin/pkg/scheduler/extender_test.go index 88b10249674..db82927c9f2 100644 --- a/plugin/pkg/scheduler/extender_test.go +++ b/plugin/pkg/scheduler/extender_test.go @@ -291,8 +291,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { cache.AddNode(&api.Node{ObjectMeta: api.ObjectMeta{Name: name}}) } scheduler := NewGenericScheduler( - cache, test.predicates, algorithm.EmptyMetadataProducer, - test.prioritizers, extenders) + cache, test.predicates, algorithm.EmptyMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders) machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes))) if test.expectsErr { if err == nil { diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 3cc793a054e..1ac0e2a73f3 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -363,10 +363,13 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, return nil, err } + predicateMetaProducer, err := f.GetPredicateMetadataProducer() + if err != nil { + return nil, err + } + f.Run() - - algo := scheduler.NewGenericScheduler(f.schedulerCache, predicateFuncs, priorityMetaProducer, priorityConfigs, extenders) - + algo := scheduler.NewGenericScheduler(f.schedulerCache, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders) podBackoff := podBackoff{ perPodBackoff: map[types.NamespacedName]*backoffEntry{}, clock: realClock{}, @@ -408,6 +411,14 @@ func (f *ConfigFactory) GetPriorityMetadataProducer() (algorithm.MetadataProduce return getPriorityMetadataProducer(*pluginArgs) } +func (f *ConfigFactory) GetPredicateMetadataProducer() (algorithm.MetadataProducer, error) { + pluginArgs, err := f.getPluginArgs() + if err != nil { + return nil, err + } + return getPredicateMetadataProducer(*pluginArgs) +} + func (f *ConfigFactory) GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error) { pluginArgs, err := f.getPluginArgs() if err != nil { diff --git a/plugin/pkg/scheduler/factory/plugins.go b/plugin/pkg/scheduler/factory/plugins.go index 9d5952764c7..aec6d93baa9 100644 --- a/plugin/pkg/scheduler/factory/plugins.go +++ b/plugin/pkg/scheduler/factory/plugins.go @@ -78,7 +78,8 @@ var ( algorithmProviderMap = make(map[string]AlgorithmProviderConfig) // Registered metadata producers - priorityMetadataProducer MetadataProducerFactory + priorityMetadataProducer MetadataProducerFactory + predicateMetadataProducer MetadataProducerFactory // get equivalence pod function getEquivalencePodFunc algorithm.GetEquivalencePodFunc = nil @@ -121,12 +122,16 @@ func RegisterCustomFitPredicate(policy schedulerapi.PredicatePolicy) string { if policy.Argument != nil { if policy.Argument.ServiceAffinity != nil { predicateFactory = func(args PluginFactoryArgs) algorithm.FitPredicate { - return predicates.NewServiceAffinityPredicate( + predicate, precomputationFunction := predicates.NewServiceAffinityPredicate( args.PodLister, args.ServiceLister, args.NodeInfo, policy.Argument.ServiceAffinity.Labels, ) + + // Once we generate the predicate we should also Register the Precomputation + predicates.RegisterPredicatePrecomputation(policy.Name, precomputationFunction) + return predicate } } else if policy.Argument.LabelsPresence != nil { predicateFactory = func(args PluginFactoryArgs) algorithm.FitPredicate { @@ -163,6 +168,12 @@ func RegisterPriorityMetadataProducerFactory(factory MetadataProducerFactory) { priorityMetadataProducer = factory } +func RegisterPredicateMetadataProducerFactory(factory MetadataProducerFactory) { + schedulerFactoryMutex.Lock() + defer schedulerFactoryMutex.Unlock() + predicateMetadataProducer = factory +} + // DEPRECATED // Use Map-Reduce pattern for priority functions. // Registers a priority function with the algorithm registry. Returns the name, @@ -312,6 +323,16 @@ func getPriorityMetadataProducer(args PluginFactoryArgs) (algorithm.MetadataProd return priorityMetadataProducer(args), nil } +func getPredicateMetadataProducer(args PluginFactoryArgs) (algorithm.MetadataProducer, error) { + schedulerFactoryMutex.Lock() + defer schedulerFactoryMutex.Unlock() + + if predicateMetadataProducer == nil { + return algorithm.EmptyMetadataProducer, nil + } + return predicateMetadataProducer(args), nil +} + func getPriorityFunctionConfigs(names sets.String, args PluginFactoryArgs) ([]algorithm.PriorityConfig, error) { schedulerFactoryMutex.Lock() defer schedulerFactoryMutex.Unlock() diff --git a/plugin/pkg/scheduler/generic_scheduler.go b/plugin/pkg/scheduler/generic_scheduler.go index 3a315687112..649d88b7f70 100644 --- a/plugin/pkg/scheduler/generic_scheduler.go +++ b/plugin/pkg/scheduler/generic_scheduler.go @@ -61,14 +61,15 @@ func (f *FitError) Error() string { } type genericScheduler struct { - cache schedulercache.Cache - predicates map[string]algorithm.FitPredicate - priorityMetaProducer algorithm.MetadataProducer - prioritizers []algorithm.PriorityConfig - extenders []algorithm.SchedulerExtender - pods algorithm.PodLister - lastNodeIndexLock sync.Mutex - lastNodeIndex uint64 + cache schedulercache.Cache + predicates map[string]algorithm.FitPredicate + priorityMetaProducer algorithm.MetadataProducer + predicateMetaProducer algorithm.MetadataProducer + prioritizers []algorithm.PriorityConfig + extenders []algorithm.SchedulerExtender + pods algorithm.PodLister + lastNodeIndexLock sync.Mutex + lastNodeIndex uint64 cachedNodeInfoMap map[string]*schedulercache.NodeInfo @@ -104,7 +105,7 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe // TODO(harryz) Check if equivalenceCache is enabled and call scheduleWithEquivalenceClass here trace.Step("Computing predicates") - filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders) + filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer) if err != nil { return "", err } @@ -153,7 +154,9 @@ func findNodesThatFit( nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node, predicateFuncs map[string]algorithm.FitPredicate, - extenders []algorithm.SchedulerExtender) ([]*api.Node, FailedPredicateMap, error) { + extenders []algorithm.SchedulerExtender, + metadataProducer algorithm.MetadataProducer, +) ([]*api.Node, FailedPredicateMap, error) { var filtered []*api.Node failedPredicateMap := FailedPredicateMap{} @@ -163,11 +166,12 @@ func findNodesThatFit( // Create filtered list with enough space to avoid growing it // and allow assigning. filtered = make([]*api.Node, len(nodes)) - meta := predicates.PredicateMetadata(pod, nodeNameToInfo) errs := []error{} - var predicateResultLock sync.Mutex var filteredLen int32 + + // We can use the same metadata producer for all nodes. + meta := metadataProducer(pod, nodeNameToInfo) checkNode := func(i int) { nodeName := nodes[i].Name fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs) @@ -381,15 +385,17 @@ func EqualPriorityMap(_ *api.Pod, _ interface{}, nodeInfo *schedulercache.NodeIn func NewGenericScheduler( cache schedulercache.Cache, predicates map[string]algorithm.FitPredicate, - priorityMetaProducer algorithm.MetadataProducer, + predicateMetaProducer algorithm.MetadataProducer, prioritizers []algorithm.PriorityConfig, + priorityMetaProducer algorithm.MetadataProducer, extenders []algorithm.SchedulerExtender) algorithm.ScheduleAlgorithm { return &genericScheduler{ - cache: cache, - predicates: predicates, - priorityMetaProducer: priorityMetaProducer, - prioritizers: prioritizers, - extenders: extenders, - cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo), + cache: cache, + predicates: predicates, + predicateMetaProducer: predicateMetaProducer, + prioritizers: prioritizers, + priorityMetaProducer: priorityMetaProducer, + extenders: extenders, + cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo), } } diff --git a/plugin/pkg/scheduler/generic_scheduler_test.go b/plugin/pkg/scheduler/generic_scheduler_test.go index 9c651f47510..52b4aa95b1c 100644 --- a/plugin/pkg/scheduler/generic_scheduler_test.go +++ b/plugin/pkg/scheduler/generic_scheduler_test.go @@ -277,8 +277,7 @@ func TestGenericScheduler(t *testing.T) { }, }, }, - pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "2"}}, - + pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "2"}}, prioritizers: []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}}, nodes: []string{"1", "2"}, expectsErr: true, @@ -302,8 +301,8 @@ func TestGenericScheduler(t *testing.T) { } scheduler := NewGenericScheduler( - cache, test.predicates, algorithm.EmptyMetadataProducer, - test.prioritizers, []algorithm.SchedulerExtender{}) + cache, test.predicates, algorithm.EmptyMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, + []algorithm.SchedulerExtender{}) machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes))) if !reflect.DeepEqual(err, test.wErr) { @@ -323,7 +322,7 @@ func TestFindFitAllError(t *testing.T) { "2": schedulercache.NewNodeInfo(), "1": schedulercache.NewNodeInfo(), } - _, predicateMap, err := findNodesThatFit(&api.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil) + _, predicateMap, err := findNodesThatFit(&api.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyMetadataProducer) if err != nil { t.Errorf("unexpected error: %v", err) @@ -357,7 +356,7 @@ func TestFindFitSomeError(t *testing.T) { nodeNameToInfo[name].SetNode(&api.Node{ObjectMeta: api.ObjectMeta{Name: name}}) } - _, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil) + _, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyMetadataProducer) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go index 86e5eb3a36e..6ba817cca74 100644 --- a/plugin/pkg/scheduler/scheduler_test.go +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -388,6 +388,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache. predicateMap, algorithm.EmptyMetadataProducer, []algorithm.PriorityConfig{}, + algorithm.EmptyMetadataProducer, []algorithm.SchedulerExtender{}) bindingChan := make(chan *api.Binding, 1) errChan := make(chan error, 1)