PredicateMetadata factory and optimization, Cleaned up some comments,

Comments addressed, Make emptyMetadataProducer a func to avoid casting,
FakeSvcLister: remove error return for len(svc)=0.  New test for
predicatePrecomp to make method semantics explictly enforced when meta
is missing. Precompute wrapper.
This commit is contained in:
jayunit100 2016-10-12 12:03:01 -04:00
parent 66a0aa64c2
commit 08cff0157d
13 changed files with 250 additions and 107 deletions

View File

@ -76,7 +76,7 @@ func (f FakeServiceLister) List(labels.Selector) ([]*api.Service, error) {
return f, nil return f, nil
} }
// GetPodServices gets the services that have the selector that match the labels on the given pod // GetPodServices gets the services that have the selector that match the labels on the given pod.
func (f FakeServiceLister) GetPodServices(pod *api.Pod) (services []*api.Service, err error) { func (f FakeServiceLister) GetPodServices(pod *api.Pod) (services []*api.Service, err error) {
var selector labels.Selector var selector labels.Selector
@ -91,10 +91,6 @@ func (f FakeServiceLister) GetPodServices(pod *api.Pod) (services []*api.Service
services = append(services, service) services = append(services, service)
} }
} }
if len(services) == 0 {
err = fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}
return return
} }

View File

@ -0,0 +1,59 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package predicates
import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
type PredicateMetadataFactory struct {
podLister algorithm.PodLister
}
func NewPredicateMetadataFactory(podLister algorithm.PodLister) algorithm.MetadataProducer {
factory := &PredicateMetadataFactory{
podLister,
}
return factory.GetMetadata
}
// GetMetadata returns the predicateMetadata used which will be used by various predicates.
func (pfactory *PredicateMetadataFactory) GetMetadata(pod *api.Pod, nodeNameToInfoMap map[string]*schedulercache.NodeInfo) interface{} {
// If we cannot compute metadata, just return nil
if pod == nil {
return nil
}
matchingTerms, err := getMatchingAntiAffinityTerms(pod, nodeNameToInfoMap)
if err != nil {
return nil
}
predicateMetadata := &predicateMetadata{
pod: pod,
podBestEffort: isPodBestEffort(pod),
podRequest: GetResourceRequest(pod),
podPorts: GetUsedPorts(pod),
matchingAntiAffinityTerms: matchingTerms,
}
for predicateName, precomputeFunc := range predicatePrecomputations {
glog.V(4).Info("Precompute: %v", predicateName)
precomputeFunc(predicateMetadata)
}
return predicateMetadata
}

View File

@ -36,6 +36,19 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
) )
// predicatePrecomputations: Helper types/variables...
type PredicateMetadataModifier func(pm *predicateMetadata)
var predicatePrecomputeRegisterLock sync.Mutex
var predicatePrecomputations map[string]PredicateMetadataModifier = make(map[string]PredicateMetadataModifier)
func RegisterPredicatePrecomputation(predicateName string, precomp PredicateMetadataModifier) {
predicatePrecomputeRegisterLock.Lock()
defer predicatePrecomputeRegisterLock.Unlock()
predicatePrecomputations[predicateName] = precomp
}
// Other types for predicate functions...
type NodeInfo interface { type NodeInfo interface {
GetNodeInfo(nodeID string) (*api.Node, error) GetNodeInfo(nodeID string) (*api.Node, error)
} }
@ -67,34 +80,21 @@ func (c *CachedNodeInfo) GetNodeInfo(id string) (*api.Node, error) {
return node.(*api.Node), nil return node.(*api.Node), nil
} }
// predicateMetadata is a type that is passed as metadata for predicate functions // Note that predicateMetdata and matchingPodAntiAffinityTerm need to be declared in the same file
type predicateMetadata struct { // due to the way declarations are processed in predicate declaration unit tests.
podBestEffort bool
podRequest *schedulercache.Resource
podPorts map[int]bool
matchingAntiAffinityTerms []matchingPodAntiAffinityTerm
}
type matchingPodAntiAffinityTerm struct { type matchingPodAntiAffinityTerm struct {
term *api.PodAffinityTerm term *api.PodAffinityTerm
node *api.Node node *api.Node
} }
func PredicateMetadata(pod *api.Pod, nodeInfoMap map[string]*schedulercache.NodeInfo) interface{} { type predicateMetadata struct {
// If we cannot compute metadata, just return nil pod *api.Pod
if pod == nil { podBestEffort bool
return nil podRequest *schedulercache.Resource
} podPorts map[int]bool
matchingTerms, err := getMatchingAntiAffinityTerms(pod, nodeInfoMap) matchingAntiAffinityTerms []matchingPodAntiAffinityTerm
if err != nil { serviceAffinityMatchingPodList []*api.Pod
return nil serviceAffinityMatchingPodServices []*api.Service
}
return &predicateMetadata{
podBestEffort: isPodBestEffort(pod),
podRequest: GetResourceRequest(pod),
podPorts: GetUsedPorts(pod),
matchingAntiAffinityTerms: matchingTerms,
}
} }
func isVolumeConflict(volume api.Volume, pod *api.Pod) bool { func isVolumeConflict(volume api.Volume, pod *api.Pod) bool {
@ -627,20 +627,42 @@ type ServiceAffinity struct {
labels []string labels []string
} }
func NewServiceAffinityPredicate(podLister algorithm.PodLister, serviceLister algorithm.ServiceLister, nodeInfo NodeInfo, labels []string) algorithm.FitPredicate { // serviceAffinityPrecomputation should be run once by the scheduler before looping through the Predicate. It is a helper function that
// only should be referenced by NewServiceAffinityPredicate.
func (s *ServiceAffinity) serviceAffinityPrecomputation(pm *predicateMetadata) {
if pm.pod == nil {
glog.Errorf("Cannot precompute service affinity, a pod is required to caluculate service affinity.")
return
}
var errSvc, errList error
// Store services which match the pod.
pm.serviceAffinityMatchingPodServices, errSvc = s.serviceLister.GetPodServices(pm.pod)
selector := CreateSelectorFromLabels(pm.pod.Labels)
// consider only the pods that belong to the same namespace
allMatches, errList := s.podLister.List(selector)
// In the future maybe we will return them as part of the function.
if errSvc != nil || errList != nil {
glog.Errorf("Some Error were found while precomputing svc affinity: \nservices:%v , \npods:%v", errSvc, errList)
}
pm.serviceAffinityMatchingPodList = FilterPodsByNamespace(allMatches, pm.pod.Namespace)
}
func NewServiceAffinityPredicate(podLister algorithm.PodLister, serviceLister algorithm.ServiceLister, nodeInfo NodeInfo, labels []string) (algorithm.FitPredicate, PredicateMetadataModifier) {
affinity := &ServiceAffinity{ affinity := &ServiceAffinity{
podLister: podLister, podLister: podLister,
serviceLister: serviceLister, serviceLister: serviceLister,
nodeInfo: nodeInfo, nodeInfo: nodeInfo,
labels: labels, labels: labels,
} }
return affinity.CheckServiceAffinity return affinity.checkServiceAffinity, affinity.serviceAffinityPrecomputation
} }
// The checkServiceAffinity predicate matches nodes in such a way to force that // checkServiceAffinity is a predicate which matches nodes in such a way to force that
// ServiceAffinity.labels are homogenous for pods added to a node. // ServiceAffinity.labels are homogenous for pods that are scheduled to a node.
// (i.e. it returns true IFF this pod can be added to this node, such // (i.e. it returns true IFF this pod can be added to this node such that all other pods in
// that all other pods in the same service are running on nodes w/ // the same service are running on nodes with
// the exact same ServiceAffinity.label values). // the exact same ServiceAffinity.label values).
// //
// Details: // Details:
@ -650,46 +672,47 @@ func NewServiceAffinityPredicate(podLister algorithm.PodLister, serviceLister al
// the match. // the match.
// Otherwise: // Otherwise:
// Create an "implicit selector" which gaurantees pods will land on nodes with similar values // Create an "implicit selector" which gaurantees pods will land on nodes with similar values
// for the affinity labels. // for the affinity labels.
//
// To do this, we "reverse engineer" a selector by introspecting existing pods running under the same service+namespace. // To do this, we "reverse engineer" a selector by introspecting existing pods running under the same service+namespace.
// These backfilled labels in the selector "L" are defined like so: // These backfilled labels in the selector "L" are defined like so:
// - L is a label that the ServiceAffinity object needs as a matching constraints. // - L is a label that the ServiceAffinity object needs as a matching constraints.
// - L is not defined in the pod itself already. // - L is not defined in the pod itself already.
// - and SOME pod, from a service, in the same namespace, ALREADY scheduled onto a node, has a matching value. // - and SOME pod, from a service, in the same namespace, ALREADY scheduled onto a node, has a matching value.
func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { //
// WARNING: This Predicate is NOT gauranteed to work if some of the predicateMetadata data isn't precomputed...
// For that reason it is not exported, i.e. it is highlhy coupled to the implementation of the FitPredicate construction.
func (s *ServiceAffinity) checkServiceAffinity(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
var services []*api.Service
var pods []*api.Pod
if pm, ok := meta.(*predicateMetadata); ok && (pm.serviceAffinityMatchingPodList != nil || pm.serviceAffinityMatchingPodServices != nil) {
services = pm.serviceAffinityMatchingPodServices
pods = pm.serviceAffinityMatchingPodList
} else {
// Make the predicate resilient in case metadata is missing.
pm = &predicateMetadata{pod: pod}
s.serviceAffinityPrecomputation(pm)
pods, services = pm.serviceAffinityMatchingPodList, pm.serviceAffinityMatchingPodServices
}
node := nodeInfo.Node() node := nodeInfo.Node()
if node == nil { if node == nil {
return false, nil, fmt.Errorf("node not found") return false, nil, fmt.Errorf("node not found")
} }
// check if the pod being scheduled has the affinity labels specified in its NodeSelector // check if the pod being scheduled has the affinity labels specified in its NodeSelector
affinityLabels := FindLabelsInSet(s.labels, labels.Set(pod.Spec.NodeSelector)) affinityLabels := FindLabelsInSet(s.labels, labels.Set(pod.Spec.NodeSelector))
// Step 1: If we don't have all constraints, introspect nodes to find the missing constraints.
// Introspect services IFF we didn't predefine all the affinity labels in the pod itself.
if len(s.labels) > len(affinityLabels) { if len(s.labels) > len(affinityLabels) {
services, err := s.serviceLister.GetPodServices(pod) if len(services) > 0 {
if err == nil && len(services) > 0 { if len(pods) > 0 {
// just use the first service and get the other pods within the service nodeWithAffinityLabels, err := s.nodeInfo.GetNodeInfo(pods[0].Spec.NodeName)
// TODO: a separate predicate can be created that tries to handle all services for the pod
selector := labels.SelectorFromSet(services[0].Spec.Selector)
servicePods, err := s.podLister.List(selector)
if err != nil {
return false, nil, err
}
// consider only the pods that belong to the same namespace
nsServicePods := FilterPodsByNamespace(servicePods, pod.Namespace)
if len(nsServicePods) > 0 {
// consider any service pod and fetch the node its hosted on
otherNode, err := s.nodeInfo.GetNodeInfo(nsServicePods[0].Spec.NodeName)
if err != nil { if err != nil {
return false, nil, err return false, nil, err
} }
AddUnsetLabelsToMap(affinityLabels, s.labels, labels.Set(otherNode.Labels)) AddUnsetLabelsToMap(affinityLabels, s.labels, labels.Set(nodeWithAffinityLabels.Labels))
} }
} }
} }
// Step 2: Finally complete the affinity predicate based on whatever set of predicates we were able to find.
// check if the node matches the selector
if CreateSelectorFromLabels(affinityLabels).Matches(labels.Set(node.Labels)) { if CreateSelectorFromLabels(affinityLabels).Matches(labels.Set(node.Labels)) {
return true, nil, nil return true, nil, nil
} }

View File

@ -119,6 +119,11 @@ func newResourceInitPod(pod *api.Pod, usage ...schedulercache.Resource) *api.Pod
return pod return pod
} }
func PredicateMetadata(p *api.Pod, nodeInfo map[string]*schedulercache.NodeInfo) interface{} {
pm := PredicateMetadataFactory{algorithm.FakePodLister{p}}
return pm.GetMetadata(p, nodeInfo)
}
func TestPodFitsResources(t *testing.T) { func TestPodFitsResources(t *testing.T) {
enoughPodsTests := []struct { enoughPodsTests := []struct {
pod *api.Pod pod *api.Pod
@ -233,7 +238,6 @@ func TestPodFitsResources(t *testing.T) {
for _, test := range enoughPodsTests { for _, test := range enoughPodsTests {
node := api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32)}} node := api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32)}}
test.nodeInfo.SetNode(&node) test.nodeInfo.SetNode(&node)
fits, reasons, err := PodFitsResources(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo) fits, reasons, err := PodFitsResources(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if err != nil { if err != nil {
t.Errorf("%s: unexpected error: %v", test.test, err) t.Errorf("%s: unexpected error: %v", test.test, err)
@ -289,7 +293,6 @@ func TestPodFitsResources(t *testing.T) {
for _, test := range notEnoughPodsTests { for _, test := range notEnoughPodsTests {
node := api.Node{Status: api.NodeStatus{Capacity: api.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 0, 1)}} node := api.Node{Status: api.NodeStatus{Capacity: api.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 0, 1)}}
test.nodeInfo.SetNode(&node) test.nodeInfo.SetNode(&node)
fits, reasons, err := PodFitsResources(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo) fits, reasons, err := PodFitsResources(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if err != nil { if err != nil {
t.Errorf("%s: unexpected error: %v", test.test, err) t.Errorf("%s: unexpected error: %v", test.test, err)
@ -1310,22 +1313,38 @@ func TestServiceAffinity(t *testing.T) {
}, },
} }
expectedFailureReasons := []algorithm.PredicateFailureReason{ErrServiceAffinityViolated} expectedFailureReasons := []algorithm.PredicateFailureReason{ErrServiceAffinityViolated}
for _, test := range tests { for _, test := range tests {
nodes := []api.Node{node1, node2, node3, node4, node5} testIt := func(skipPrecompute bool) {
serviceAffinity := ServiceAffinity{algorithm.FakePodLister(test.pods), algorithm.FakeServiceLister(test.services), FakeNodeListInfo(nodes), test.labels} nodes := []api.Node{node1, node2, node3, node4, node5}
nodeInfo := schedulercache.NewNodeInfo() nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(test.node) nodeInfo.SetNode(test.node)
fits, reasons, err := serviceAffinity.CheckServiceAffinity(test.pod, PredicateMetadata(test.pod, nil), nodeInfo) nodeInfoMap := map[string]*schedulercache.NodeInfo{test.node.Name: nodeInfo}
if err != nil { // Reimplementing the logic that the scheduler implements: Any time it makes a predicate, it registers any precomputations.
t.Errorf("%s: unexpected error: %v", test.test, err) predicate, precompute := NewServiceAffinityPredicate(algorithm.FakePodLister(test.pods), algorithm.FakeServiceLister(test.services), FakeNodeListInfo(nodes), test.labels)
} // Register a precomputation or Rewrite the precomputation to a no-op, depending on the state we want to test.
if !fits && !reflect.DeepEqual(reasons, expectedFailureReasons) { RegisterPredicatePrecomputation("checkServiceAffinity-unitTestPredicate", func(pm *predicateMetadata) {
t.Errorf("%s: unexpected failure reasons: %v, want: %v", test.test, reasons, expectedFailureReasons) if !skipPrecompute {
} precompute(pm)
if fits != test.fits { }
t.Errorf("%s: expected: %v got %v", test.test, test.fits, fits) })
if pmeta, ok := (PredicateMetadata(test.pod, nodeInfoMap)).(*predicateMetadata); ok {
fits, reasons, err := predicate(test.pod, pmeta, nodeInfo)
if err != nil {
t.Errorf("%s: unexpected error: %v", test.test, err)
}
if !fits && !reflect.DeepEqual(reasons, expectedFailureReasons) {
t.Errorf("%s: unexpected failure reasons: %v, want: %v", test.test, reasons, expectedFailureReasons)
}
if fits != test.fits {
t.Errorf("%s: expected: %v got %v", test.test, test.fits, fits)
}
} else {
t.Errorf("Error casting.")
}
} }
testIt(false) // Confirm that the predicate works without precomputed data (resilience)
testIt(true) // Confirm that the predicate works with the precomputed data (better performance)
} }
} }
@ -1586,7 +1605,6 @@ func TestEBSVolumeCountConflicts(t *testing.T) {
} }
return "", false return "", false
}, },
FilterPersistentVolume: func(pv *api.PersistentVolume) (string, bool) { FilterPersistentVolume: func(pv *api.PersistentVolume) (string, bool) {
if pv.Spec.AWSElasticBlockStore != nil { if pv.Spec.AWSElasticBlockStore != nil {
return pv.Spec.AWSElasticBlockStore.VolumeID, true return pv.Spec.AWSElasticBlockStore.VolumeID, true
@ -1652,7 +1670,7 @@ func TestPredicatesRegistered(t *testing.T) {
if err == nil { if err == nil {
functions = append(functions, fileFunctions...) functions = append(functions, fileFunctions...)
} else { } else {
t.Errorf("unexpected error when parsing %s", filePath) t.Errorf("unexpected error %s when parsing %s", err, filePath)
} }
} }

View File

@ -49,7 +49,11 @@ func ExampleFindLabelsInSet() {
}, },
}, },
{}, // a third pod which will have no effect on anything. {
ObjectMeta: api.ObjectMeta{
Name: "pod3ThatWeWontSee",
},
},
} }
fmt.Println(FindLabelsInSet([]string{"label1", "label2", "label3"}, nsPods[0].ObjectMeta.Labels)["label3"]) fmt.Println(FindLabelsInSet([]string{"label1", "label2", "label3"}, nsPods[0].ObjectMeta.Labels)["label3"])
AddUnsetLabelsToMap(labelSubset, []string{"label1", "label2", "label3"}, nsPods[0].ObjectMeta.Labels) AddUnsetLabelsToMap(labelSubset, []string{"label1", "label2", "label3"}, nsPods[0].ObjectMeta.Labels)

View File

@ -54,6 +54,7 @@ type PriorityConfig struct {
Weight int Weight int
} }
// EmptyMetadataProducer returns a no-op MetadataProducer type.
func EmptyMetadataProducer(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) interface{} { func EmptyMetadataProducer(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) interface{} {
return nil return nil
} }

View File

@ -66,6 +66,11 @@ func init() {
return priorities.PriorityMetadata return priorities.PriorityMetadata
}) })
factory.RegisterPredicateMetadataProducerFactory(
func(args factory.PluginFactoryArgs) algorithm.MetadataProducer {
return predicates.NewPredicateMetadataFactory(args.PodLister)
})
// EqualPriority is a prioritizer function that gives an equal weight of one to all nodes // EqualPriority is a prioritizer function that gives an equal weight of one to all nodes
// Register the priority function so that its available // Register the priority function so that its available
// but do not include it as part of the default priorities // but do not include it as part of the default priorities

View File

@ -291,8 +291,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
cache.AddNode(&api.Node{ObjectMeta: api.ObjectMeta{Name: name}}) cache.AddNode(&api.Node{ObjectMeta: api.ObjectMeta{Name: name}})
} }
scheduler := NewGenericScheduler( scheduler := NewGenericScheduler(
cache, test.predicates, algorithm.EmptyMetadataProducer, cache, test.predicates, algorithm.EmptyMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders)
test.prioritizers, extenders)
machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes))) machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes)))
if test.expectsErr { if test.expectsErr {
if err == nil { if err == nil {

View File

@ -363,10 +363,13 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
return nil, err return nil, err
} }
predicateMetaProducer, err := f.GetPredicateMetadataProducer()
if err != nil {
return nil, err
}
f.Run() f.Run()
algo := scheduler.NewGenericScheduler(f.schedulerCache, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders)
algo := scheduler.NewGenericScheduler(f.schedulerCache, predicateFuncs, priorityMetaProducer, priorityConfigs, extenders)
podBackoff := podBackoff{ podBackoff := podBackoff{
perPodBackoff: map[types.NamespacedName]*backoffEntry{}, perPodBackoff: map[types.NamespacedName]*backoffEntry{},
clock: realClock{}, clock: realClock{},
@ -408,6 +411,14 @@ func (f *ConfigFactory) GetPriorityMetadataProducer() (algorithm.MetadataProduce
return getPriorityMetadataProducer(*pluginArgs) return getPriorityMetadataProducer(*pluginArgs)
} }
func (f *ConfigFactory) GetPredicateMetadataProducer() (algorithm.MetadataProducer, error) {
pluginArgs, err := f.getPluginArgs()
if err != nil {
return nil, err
}
return getPredicateMetadataProducer(*pluginArgs)
}
func (f *ConfigFactory) GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error) { func (f *ConfigFactory) GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error) {
pluginArgs, err := f.getPluginArgs() pluginArgs, err := f.getPluginArgs()
if err != nil { if err != nil {

View File

@ -78,7 +78,8 @@ var (
algorithmProviderMap = make(map[string]AlgorithmProviderConfig) algorithmProviderMap = make(map[string]AlgorithmProviderConfig)
// Registered metadata producers // Registered metadata producers
priorityMetadataProducer MetadataProducerFactory priorityMetadataProducer MetadataProducerFactory
predicateMetadataProducer MetadataProducerFactory
// get equivalence pod function // get equivalence pod function
getEquivalencePodFunc algorithm.GetEquivalencePodFunc = nil getEquivalencePodFunc algorithm.GetEquivalencePodFunc = nil
@ -121,12 +122,16 @@ func RegisterCustomFitPredicate(policy schedulerapi.PredicatePolicy) string {
if policy.Argument != nil { if policy.Argument != nil {
if policy.Argument.ServiceAffinity != nil { if policy.Argument.ServiceAffinity != nil {
predicateFactory = func(args PluginFactoryArgs) algorithm.FitPredicate { predicateFactory = func(args PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewServiceAffinityPredicate( predicate, precomputationFunction := predicates.NewServiceAffinityPredicate(
args.PodLister, args.PodLister,
args.ServiceLister, args.ServiceLister,
args.NodeInfo, args.NodeInfo,
policy.Argument.ServiceAffinity.Labels, policy.Argument.ServiceAffinity.Labels,
) )
// Once we generate the predicate we should also Register the Precomputation
predicates.RegisterPredicatePrecomputation(policy.Name, precomputationFunction)
return predicate
} }
} else if policy.Argument.LabelsPresence != nil { } else if policy.Argument.LabelsPresence != nil {
predicateFactory = func(args PluginFactoryArgs) algorithm.FitPredicate { predicateFactory = func(args PluginFactoryArgs) algorithm.FitPredicate {
@ -163,6 +168,12 @@ func RegisterPriorityMetadataProducerFactory(factory MetadataProducerFactory) {
priorityMetadataProducer = factory priorityMetadataProducer = factory
} }
func RegisterPredicateMetadataProducerFactory(factory MetadataProducerFactory) {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
predicateMetadataProducer = factory
}
// DEPRECATED // DEPRECATED
// Use Map-Reduce pattern for priority functions. // Use Map-Reduce pattern for priority functions.
// Registers a priority function with the algorithm registry. Returns the name, // Registers a priority function with the algorithm registry. Returns the name,
@ -312,6 +323,16 @@ func getPriorityMetadataProducer(args PluginFactoryArgs) (algorithm.MetadataProd
return priorityMetadataProducer(args), nil return priorityMetadataProducer(args), nil
} }
func getPredicateMetadataProducer(args PluginFactoryArgs) (algorithm.MetadataProducer, error) {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
if predicateMetadataProducer == nil {
return algorithm.EmptyMetadataProducer, nil
}
return predicateMetadataProducer(args), nil
}
func getPriorityFunctionConfigs(names sets.String, args PluginFactoryArgs) ([]algorithm.PriorityConfig, error) { func getPriorityFunctionConfigs(names sets.String, args PluginFactoryArgs) ([]algorithm.PriorityConfig, error) {
schedulerFactoryMutex.Lock() schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock() defer schedulerFactoryMutex.Unlock()

View File

@ -61,14 +61,15 @@ func (f *FitError) Error() string {
} }
type genericScheduler struct { type genericScheduler struct {
cache schedulercache.Cache cache schedulercache.Cache
predicates map[string]algorithm.FitPredicate predicates map[string]algorithm.FitPredicate
priorityMetaProducer algorithm.MetadataProducer priorityMetaProducer algorithm.MetadataProducer
prioritizers []algorithm.PriorityConfig predicateMetaProducer algorithm.MetadataProducer
extenders []algorithm.SchedulerExtender prioritizers []algorithm.PriorityConfig
pods algorithm.PodLister extenders []algorithm.SchedulerExtender
lastNodeIndexLock sync.Mutex pods algorithm.PodLister
lastNodeIndex uint64 lastNodeIndexLock sync.Mutex
lastNodeIndex uint64
cachedNodeInfoMap map[string]*schedulercache.NodeInfo cachedNodeInfoMap map[string]*schedulercache.NodeInfo
@ -104,7 +105,7 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe
// TODO(harryz) Check if equivalenceCache is enabled and call scheduleWithEquivalenceClass here // TODO(harryz) Check if equivalenceCache is enabled and call scheduleWithEquivalenceClass here
trace.Step("Computing predicates") trace.Step("Computing predicates")
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders) filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -153,7 +154,9 @@ func findNodesThatFit(
nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeNameToInfo map[string]*schedulercache.NodeInfo,
nodes []*api.Node, nodes []*api.Node,
predicateFuncs map[string]algorithm.FitPredicate, predicateFuncs map[string]algorithm.FitPredicate,
extenders []algorithm.SchedulerExtender) ([]*api.Node, FailedPredicateMap, error) { extenders []algorithm.SchedulerExtender,
metadataProducer algorithm.MetadataProducer,
) ([]*api.Node, FailedPredicateMap, error) {
var filtered []*api.Node var filtered []*api.Node
failedPredicateMap := FailedPredicateMap{} failedPredicateMap := FailedPredicateMap{}
@ -163,11 +166,12 @@ func findNodesThatFit(
// Create filtered list with enough space to avoid growing it // Create filtered list with enough space to avoid growing it
// and allow assigning. // and allow assigning.
filtered = make([]*api.Node, len(nodes)) filtered = make([]*api.Node, len(nodes))
meta := predicates.PredicateMetadata(pod, nodeNameToInfo)
errs := []error{} errs := []error{}
var predicateResultLock sync.Mutex var predicateResultLock sync.Mutex
var filteredLen int32 var filteredLen int32
// We can use the same metadata producer for all nodes.
meta := metadataProducer(pod, nodeNameToInfo)
checkNode := func(i int) { checkNode := func(i int) {
nodeName := nodes[i].Name nodeName := nodes[i].Name
fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs) fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs)
@ -381,15 +385,17 @@ func EqualPriorityMap(_ *api.Pod, _ interface{}, nodeInfo *schedulercache.NodeIn
func NewGenericScheduler( func NewGenericScheduler(
cache schedulercache.Cache, cache schedulercache.Cache,
predicates map[string]algorithm.FitPredicate, predicates map[string]algorithm.FitPredicate,
priorityMetaProducer algorithm.MetadataProducer, predicateMetaProducer algorithm.MetadataProducer,
prioritizers []algorithm.PriorityConfig, prioritizers []algorithm.PriorityConfig,
priorityMetaProducer algorithm.MetadataProducer,
extenders []algorithm.SchedulerExtender) algorithm.ScheduleAlgorithm { extenders []algorithm.SchedulerExtender) algorithm.ScheduleAlgorithm {
return &genericScheduler{ return &genericScheduler{
cache: cache, cache: cache,
predicates: predicates, predicates: predicates,
priorityMetaProducer: priorityMetaProducer, predicateMetaProducer: predicateMetaProducer,
prioritizers: prioritizers, prioritizers: prioritizers,
extenders: extenders, priorityMetaProducer: priorityMetaProducer,
cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo), extenders: extenders,
cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo),
} }
} }

View File

@ -277,8 +277,7 @@ func TestGenericScheduler(t *testing.T) {
}, },
}, },
}, },
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "2"}}, pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "2"}},
prioritizers: []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}}, prioritizers: []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}},
nodes: []string{"1", "2"}, nodes: []string{"1", "2"},
expectsErr: true, expectsErr: true,
@ -302,8 +301,8 @@ func TestGenericScheduler(t *testing.T) {
} }
scheduler := NewGenericScheduler( scheduler := NewGenericScheduler(
cache, test.predicates, algorithm.EmptyMetadataProducer, cache, test.predicates, algorithm.EmptyMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer,
test.prioritizers, []algorithm.SchedulerExtender{}) []algorithm.SchedulerExtender{})
machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes))) machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes)))
if !reflect.DeepEqual(err, test.wErr) { if !reflect.DeepEqual(err, test.wErr) {
@ -323,7 +322,7 @@ func TestFindFitAllError(t *testing.T) {
"2": schedulercache.NewNodeInfo(), "2": schedulercache.NewNodeInfo(),
"1": schedulercache.NewNodeInfo(), "1": schedulercache.NewNodeInfo(),
} }
_, predicateMap, err := findNodesThatFit(&api.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil) _, predicateMap, err := findNodesThatFit(&api.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyMetadataProducer)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
@ -357,7 +356,7 @@ func TestFindFitSomeError(t *testing.T) {
nodeNameToInfo[name].SetNode(&api.Node{ObjectMeta: api.ObjectMeta{Name: name}}) nodeNameToInfo[name].SetNode(&api.Node{ObjectMeta: api.ObjectMeta{Name: name}})
} }
_, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil) _, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyMetadataProducer)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }

View File

@ -388,6 +388,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.
predicateMap, predicateMap,
algorithm.EmptyMetadataProducer, algorithm.EmptyMetadataProducer,
[]algorithm.PriorityConfig{}, []algorithm.PriorityConfig{},
algorithm.EmptyMetadataProducer,
[]algorithm.SchedulerExtender{}) []algorithm.SchedulerExtender{})
bindingChan := make(chan *api.Binding, 1) bindingChan := make(chan *api.Binding, 1)
errChan := make(chan error, 1) errChan := make(chan error, 1)