From e4389707302ef38669191a8d1efd379ee1e3b8ec Mon Sep 17 00:00:00 2001 From: Cong Liu Date: Wed, 6 Nov 2019 13:10:06 -0500 Subject: [PATCH] Aggregate mulitple NodePreference custom priorities to a single score plugin. --- .../priorities/selector_spreading.go | 72 +++-- .../priorities/selector_spreading_test.go | 25 +- pkg/scheduler/algorithm_factory.go | 47 ++- .../apis/config/testing/compatibility_test.go | 17 +- pkg/scheduler/factory_test.go | 60 ++-- .../framework/plugins/default_registry.go | 6 + .../framework/plugins/serviceaffinity/BUILD | 3 + .../serviceaffinity/service_affinity.go | 50 +++- .../serviceaffinity/service_affinity_test.go | 270 ++++++++++++++++++ 9 files changed, 433 insertions(+), 117 deletions(-) diff --git a/pkg/scheduler/algorithm/priorities/selector_spreading.go b/pkg/scheduler/algorithm/priorities/selector_spreading.go index 0370c5065d3..7938d55baca 100644 --- a/pkg/scheduler/algorithm/priorities/selector_spreading.go +++ b/pkg/scheduler/algorithm/priorities/selector_spreading.go @@ -166,34 +166,19 @@ func (s *SelectorSpread) CalculateSpreadPriorityReduce(pod *v1.Pod, meta interfa type ServiceAntiAffinity struct { podLister schedulerlisters.PodLister serviceLister corelisters.ServiceLister - label string + labels []string } // NewServiceAntiAffinityPriority creates a ServiceAntiAffinity. -func NewServiceAntiAffinityPriority(podLister schedulerlisters.PodLister, serviceLister corelisters.ServiceLister, label string) (PriorityMapFunction, PriorityReduceFunction) { +func NewServiceAntiAffinityPriority(podLister schedulerlisters.PodLister, serviceLister corelisters.ServiceLister, labels []string) (PriorityMapFunction, PriorityReduceFunction) { antiAffinity := &ServiceAntiAffinity{ podLister: podLister, serviceLister: serviceLister, - label: label, + labels: labels, } return antiAffinity.CalculateAntiAffinityPriorityMap, antiAffinity.CalculateAntiAffinityPriorityReduce } -// Classifies nodes into ones with labels and without labels. -func (s *ServiceAntiAffinity) getNodeClassificationByLabels(nodes []*v1.Node) (map[string]string, []string) { - labeledNodes := map[string]string{} - nonLabeledNodes := []string{} - for _, node := range nodes { - if labels.Set(node.Labels).Has(s.label) { - label := labels.Set(node.Labels).Get(s.label) - labeledNodes[node.Name] = label - } else { - nonLabeledNodes = append(nonLabeledNodes, node.Name) - } - } - return labeledNodes, nonLabeledNodes -} - // countMatchingPods cout pods based on namespace and matching all selectors func countMatchingPods(namespace string, selectors []labels.Selector, nodeInfo *schedulernodeinfo.NodeInfo) int { if nodeInfo.Pods() == nil || len(nodeInfo.Pods()) == 0 || len(selectors) == 0 { @@ -250,43 +235,68 @@ func (s *ServiceAntiAffinity) CalculateAntiAffinityPriorityMap(pod *v1.Pod, meta // CalculateAntiAffinityPriorityReduce computes each node score with the same value for a particular label. // The label to be considered is provided to the struct (ServiceAntiAffinity). func (s *ServiceAntiAffinity) CalculateAntiAffinityPriorityReduce(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister, result framework.NodeScoreList) error { + reduceResult := make([]float64, len(result)) + for _, label := range s.labels { + if err := s.updateNodeScoresForLabel(sharedLister, result, reduceResult, label); err != nil { + return err + } + } + + // Update the result after all labels have been evaluated. + for i, nodeScore := range reduceResult { + result[i].Score = int64(nodeScore) + } + return nil +} + +// updateNodeScoresForLabel updates the node scores for a single label. Note it does not update the +// original result from the map phase directly, but instead updates the reduceResult, which is used +// to update the original result finally. This makes sure that each call to updateNodeScoresForLabel +// receives the same mapResult to work with. +// Why are doing this? This is a workaround for the migration from priorities to score plugins. +// Historically the priority is designed to handle only one label, and multiple priorities are configured +// to work with multiple labels. Using multiple plugins is not allowed in the new framework. Therefore +// we need to modify the old priority to be able to handle multiple labels so that it can be mapped +// to a single plugin. This will be deprecated soon. +func (s *ServiceAntiAffinity) updateNodeScoresForLabel(sharedLister schedulerlisters.SharedLister, mapResult framework.NodeScoreList, reduceResult []float64, label string) error { var numServicePods int64 - var label string + var labelValue string podCounts := map[string]int64{} labelNodesStatus := map[string]string{} maxPriorityFloat64 := float64(framework.MaxNodeScore) - for _, hostPriority := range result { + for _, hostPriority := range mapResult { numServicePods += hostPriority.Score nodeInfo, err := sharedLister.NodeInfos().Get(hostPriority.Name) if err != nil { return err } - if !labels.Set(nodeInfo.Node().Labels).Has(s.label) { + if !labels.Set(nodeInfo.Node().Labels).Has(label) { continue } - label = labels.Set(nodeInfo.Node().Labels).Get(s.label) - labelNodesStatus[hostPriority.Name] = label - podCounts[label] += hostPriority.Score + labelValue = labels.Set(nodeInfo.Node().Labels).Get(label) + labelNodesStatus[hostPriority.Name] = labelValue + podCounts[labelValue] += hostPriority.Score } //score int - scale of 0-maxPriority // 0 being the lowest priority and maxPriority being the highest - for i, hostPriority := range result { - label, ok := labelNodesStatus[hostPriority.Name] + for i, hostPriority := range mapResult { + labelValue, ok := labelNodesStatus[hostPriority.Name] if !ok { - result[i].Name = hostPriority.Name - result[i].Score = 0 continue } // initializing to the default/max node score of maxPriority fScore := maxPriorityFloat64 if numServicePods > 0 { - fScore = maxPriorityFloat64 * (float64(numServicePods-podCounts[label]) / float64(numServicePods)) + fScore = maxPriorityFloat64 * (float64(numServicePods-podCounts[labelValue]) / float64(numServicePods)) } - result[i].Name = hostPriority.Name - result[i].Score = int64(fScore) + // The score of current label only accounts for 1/len(s.labels) of the total score. + // The policy API definition only allows a single label to be configured, associated with a weight. + // This is compensated by the fact that the total weight is the sum of all weights configured + // in each policy config. + reduceResult[i] += fScore / float64(len(s.labels)) } return nil diff --git a/pkg/scheduler/algorithm/priorities/selector_spreading_test.go b/pkg/scheduler/algorithm/priorities/selector_spreading_test.go index 5760bc1e7c2..a43b79f03c5 100644 --- a/pkg/scheduler/algorithm/priorities/selector_spreading_test.go +++ b/pkg/scheduler/algorithm/priorities/selector_spreading_test.go @@ -773,7 +773,7 @@ func TestZoneSpreadPriority(t *testing.T) { t.Run(test.name, func(t *testing.T) { nodes := makeLabeledNodeList(labeledNodes) snapshot := nodeinfosnapshot.NewSnapshot(test.pods, nodes) - zoneSpread := ServiceAntiAffinity{podLister: snapshot.Pods(), serviceLister: fakelisters.ServiceLister(test.services), label: "zone"} + zoneSpread := ServiceAntiAffinity{podLister: snapshot.Pods(), serviceLister: fakelisters.ServiceLister(test.services), labels: []string{"zone"}} metaDataProducer := NewPriorityMetadataFactory( fakelisters.ServiceLister(test.services), @@ -799,29 +799,6 @@ func TestZoneSpreadPriority(t *testing.T) { } } -func TestGetNodeClassificationByLabels(t *testing.T) { - const machine01 = "machine01" - const machine02 = "machine02" - const zoneA = "zoneA" - zone1 := map[string]string{ - "zone": zoneA, - } - labeledNodes := map[string]map[string]string{ - machine01: zone1, - } - expectedNonLabeledNodes := []string{machine02} - serviceAffinity := ServiceAntiAffinity{label: "zone"} - newLabeledNodes, noNonLabeledNodes := serviceAffinity.getNodeClassificationByLabels(makeLabeledNodeList(labeledNodes)) - noLabeledNodes, newnonLabeledNodes := serviceAffinity.getNodeClassificationByLabels(makeNodeList(expectedNonLabeledNodes)) - label, _ := newLabeledNodes[machine01] - if label != zoneA && len(noNonLabeledNodes) != 0 { - t.Errorf("Expected only labeled node with label zoneA and no noNonLabeledNodes") - } - if len(noLabeledNodes) != 0 && newnonLabeledNodes[0] != machine02 { - t.Errorf("Expected only non labelled nodes") - } -} - func makeLabeledNodeList(nodeMap map[string]map[string]string) []*v1.Node { nodes := make([]*v1.Node, 0, len(nodeMap)) for nodeName, labels := range nodeMap { diff --git a/pkg/scheduler/algorithm_factory.go b/pkg/scheduler/algorithm_factory.go index debb4c90e2e..22cdf2ea876 100644 --- a/pkg/scheduler/algorithm_factory.go +++ b/pkg/scheduler/algorithm_factory.go @@ -284,14 +284,14 @@ func RegisterCustomFitPredicate(policy schedulerapi.PredicatePolicy, pluginArgs pluginArgs.ServiceAffinityArgs = &serviceaffinity.Args{} } - pluginArgs.ServiceAffinityArgs.Labels = append(pluginArgs.ServiceAffinityArgs.Labels, policy.Argument.ServiceAffinity.Labels...) + pluginArgs.ServiceAffinityArgs.AffinityLabels = append(pluginArgs.ServiceAffinityArgs.AffinityLabels, policy.Argument.ServiceAffinity.Labels...) predicateFactory = func(args PluginFactoryArgs) predicates.FitPredicate { predicate, precomputationFunction := predicates.NewServiceAffinityPredicate( args.NodeInfoLister, args.PodLister, args.ServiceLister, - pluginArgs.ServiceAffinityArgs.Labels, + pluginArgs.ServiceAffinityArgs.AffinityLabels, ) // Once we generate the predicate we should also Register the Precomputation @@ -394,7 +394,7 @@ func RegisterPriorityConfigFactory(name string, pcf PriorityConfigFactory) strin // RegisterCustomPriorityFunction registers a custom priority function with the algorithm registry. // Returns the name, with which the priority function was registered. -func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy, args *plugins.ConfigProducerArgs) string { +func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy, configProducerArgs *plugins.ConfigProducerArgs) string { var pcf *PriorityConfigFactory name := policy.Name @@ -403,49 +403,68 @@ func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy, args *pl // generate the priority function, if a custom priority is requested if policy.Argument != nil { if policy.Argument.ServiceAntiAffinity != nil { + // We use the ServiceAffinity plugin name for all ServiceAffinity custom priorities. + // It may get called multiple times but we essentially only register one instance of + // ServiceAffinity priority. + // This name is then used to find the registered plugin and run the plugin instead of the priority. + name = serviceaffinity.Name + + if configProducerArgs.ServiceAffinityArgs == nil { + configProducerArgs.ServiceAffinityArgs = &serviceaffinity.Args{} + } + configProducerArgs.ServiceAffinityArgs.AntiAffinityLabelsPreference = append(configProducerArgs.ServiceAffinityArgs.AntiAffinityLabelsPreference, policy.Argument.ServiceAntiAffinity.Label) + + weight := policy.Weight + schedulerFactoryMutex.RLock() + if existing, ok := priorityFunctionMap[name]; ok { + // If there are n ServiceAffinity priorities in the policy, the weight for the corresponding + // score plugin is n*(weight of each priority). + weight += existing.Weight + } + schedulerFactoryMutex.RUnlock() pcf = &PriorityConfigFactory{ MapReduceFunction: func(args PluginFactoryArgs) (priorities.PriorityMapFunction, priorities.PriorityReduceFunction) { return priorities.NewServiceAntiAffinityPriority( args.PodLister, args.ServiceLister, - policy.Argument.ServiceAntiAffinity.Label, + configProducerArgs.ServiceAffinityArgs.AntiAffinityLabelsPreference, ) }, - Weight: policy.Weight, + Weight: weight, } } else if policy.Argument.LabelPreference != nil { // We use the NodeLabel plugin name for all NodeLabel custom priorities. // It may get called multiple times but we essentially only register one instance of NodeLabel priority. // This name is then used to find the registered plugin and run the plugin instead of the priority. name = nodelabel.Name - if args.NodeLabelArgs == nil { - args.NodeLabelArgs = &nodelabel.Args{} + if configProducerArgs.NodeLabelArgs == nil { + configProducerArgs.NodeLabelArgs = &nodelabel.Args{} } if policy.Argument.LabelPreference.Presence { - args.NodeLabelArgs.PresentLabelsPreference = append(args.NodeLabelArgs.PresentLabelsPreference, policy.Argument.LabelPreference.Label) + configProducerArgs.NodeLabelArgs.PresentLabelsPreference = append(configProducerArgs.NodeLabelArgs.PresentLabelsPreference, policy.Argument.LabelPreference.Label) } else { - args.NodeLabelArgs.AbsentLabelsPreference = append(args.NodeLabelArgs.AbsentLabelsPreference, policy.Argument.LabelPreference.Label) + configProducerArgs.NodeLabelArgs.AbsentLabelsPreference = append(configProducerArgs.NodeLabelArgs.AbsentLabelsPreference, policy.Argument.LabelPreference.Label) } - schedulerFactoryMutex.RLock() weight := policy.Weight + schedulerFactoryMutex.RLock() if existing, ok := priorityFunctionMap[name]; ok { // If there are n NodeLabel priority configured in the policy, the weight for the corresponding // priority is n*(weight of each priority in policy). weight += existing.Weight } + schedulerFactoryMutex.RUnlock() pcf = &PriorityConfigFactory{ MapReduceFunction: func(_ PluginFactoryArgs) (priorities.PriorityMapFunction, priorities.PriorityReduceFunction) { return priorities.NewNodeLabelPriority( - args.NodeLabelArgs.PresentLabelsPreference, - args.NodeLabelArgs.AbsentLabelsPreference, + configProducerArgs.NodeLabelArgs.PresentLabelsPreference, + configProducerArgs.NodeLabelArgs.AbsentLabelsPreference, ) }, Weight: weight, } - schedulerFactoryMutex.RUnlock() } else if policy.Argument.RequestedToCapacityRatioArguments != nil { scoringFunctionShape, resources := buildScoringFunctionShapeFromRequestedToCapacityRatioArguments(policy.Argument.RequestedToCapacityRatioArguments) - args.RequestedToCapacityRatioArgs = &requestedtocapacityratio.Args{ + configProducerArgs.RequestedToCapacityRatioArgs = &requestedtocapacityratio.Args{ FunctionShape: scoringFunctionShape, ResourceToWeightMap: resources, } diff --git a/pkg/scheduler/apis/config/testing/compatibility_test.go b/pkg/scheduler/apis/config/testing/compatibility_test.go index e6eff445530..af982242031 100644 --- a/pkg/scheduler/apis/config/testing/compatibility_test.go +++ b/pkg/scheduler/apis/config/testing/compatibility_test.go @@ -101,7 +101,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { ), wantPrioritizers: sets.NewString( "ServiceSpreadingPriority", - "TestServiceAntiAffinity", ), wantPlugins: map[string][]config.Plugin{ "FilterPlugin": { @@ -116,6 +115,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "ScorePlugin": { {Name: "NodeResourcesLeastAllocated", Weight: 1}, {Name: "NodeLabel", Weight: 4}, + {Name: "ServiceAffinity", Weight: 3}, }, }, }, @@ -144,10 +144,8 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {"name": "TestLabelPreference", "weight": 4, "argument": {"labelPreference": {"label": "bar", "presence":true}}} ] }`, - wantPredicates: sets.NewString(), - wantPrioritizers: sets.NewString( - "TestServiceAntiAffinity", - ), + wantPredicates: sets.NewString(), + wantPrioritizers: sets.NewString(), wantPlugins: map[string][]config.Plugin{ "FilterPlugin": { {Name: "NodeUnschedulable"}, @@ -165,6 +163,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "NodeResourcesLeastAllocated", Weight: 2}, {Name: "NodeLabel", Weight: 4}, {Name: "DefaultPodTopologySpread", Weight: 2}, + {Name: "ServiceAffinity", Weight: 3}, }, }, }, @@ -198,10 +197,8 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {"name": "TestLabelPreference", "weight": 4, "argument": {"labelPreference": {"label": "bar", "presence":true}}} ] }`, - wantPredicates: sets.NewString(), - wantPrioritizers: sets.NewString( - "TestServiceAntiAffinity", - ), + wantPredicates: sets.NewString(), + wantPrioritizers: sets.NewString(), wantPlugins: map[string][]config.Plugin{ "FilterPlugin": { {Name: "NodeUnschedulable"}, @@ -225,6 +222,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "NodeAffinity", Weight: 2}, {Name: "NodeLabel", Weight: 4}, {Name: "DefaultPodTopologySpread", Weight: 2}, + {Name: "ServiceAffinity", Weight: 3}, }, }, }, @@ -1241,6 +1239,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "NodeResourcesMostAllocated": "MostRequestedPriority", "RequestedToCapacityRatio": "RequestedToCapacityRatioPriority", "NodeLabel": "TestLabelPreference", + "ServiceAffinity": "TestServiceAntiAffinity", } for _, tc := range testcases { diff --git a/pkg/scheduler/factory_test.go b/pkg/scheduler/factory_test.go index 9c2ab1a7150..832161fc146 100644 --- a/pkg/scheduler/factory_test.go +++ b/pkg/scheduler/factory_test.go @@ -100,6 +100,7 @@ func TestCreateFromConfig(t *testing.T) { ], "priorities" : [ {"name" : "RackSpread", "weight" : 3, "argument" : {"serviceAntiAffinity" : {"label" : "rack"}}}, + {"name" : "ZoneSpread", "weight" : 3, "argument" : {"serviceAntiAffinity" : {"label" : "zone"}}}, {"name" : "LabelPreference1", "weight" : 3, "argument" : {"labelPreference" : {"label" : "l1", "presence": true}}}, {"name" : "LabelPreference2", "weight" : 3, "argument" : {"labelPreference" : {"label" : "l2", "presence": false}}}, {"name" : "PriorityOne", "weight" : 2}, @@ -119,40 +120,33 @@ func TestCreateFromConfig(t *testing.T) { } // Verify that node label predicate/priority are converted to framework plugins. - if _, ok := findPlugin(nodelabel.Name, "FilterPlugin", conf); !ok { - t.Fatalf("NodeLabel plugin not exist in framework.") - } - nodeLabelScorePlugin, ok := findPlugin(nodelabel.Name, "ScorePlugin", conf) - if !ok { - t.Fatalf("NodeLabel plugin not exist in framework.") - } - if nodeLabelScorePlugin.Weight != 6 { - t.Errorf("Wrong weight. Got: %v, want: 6", nodeLabelScorePlugin.Weight) - } - // Verify that the policy config is converted to plugin config for node label predicate/priority. - nodeLabelConfig := findPluginConfig(nodelabel.Name, conf) - encoding, err := json.Marshal(nodeLabelConfig) - if err != nil { - t.Errorf("Failed to marshal %+v: %v", nodeLabelConfig, err) - } - want := `{"Name":"NodeLabel","Args":{"presentLabels":["zone"],"absentLabels":["foo"],"presentLabelsPreference":["l1"],"absentLabelsPreference":["l2"]}}` - if string(encoding) != want { - t.Errorf("Config for NodeLabel plugin mismatch. got: %v, want: %v", string(encoding), want) - } + wantArgs := `{"Name":"NodeLabel","Args":{"presentLabels":["zone"],"absentLabels":["foo"],"presentLabelsPreference":["l1"],"absentLabelsPreference":["l2"]}}` + verifyPluginConvertion(t, nodelabel.Name, []string{"FilterPlugin", "ScorePlugin"}, conf, 6, wantArgs) + // Verify that service affinity custom predicate/priority is converted to framework plugin. + wantArgs = `{"Name":"ServiceAffinity","Args":{"labels":["zone","foo"],"antiAffinityLabelsPreference":["rack","zone"]}}` + verifyPluginConvertion(t, serviceaffinity.Name, []string{"FilterPlugin", "ScorePlugin"}, conf, 6, wantArgs) +} - // Verify that service affinity predicates are converted to framework plugins. - if _, ok := findPlugin(serviceaffinity.Name, "FilterPlugin", conf); !ok { - t.Fatalf("ServiceAffinity plugin not exist in framework.") - } - // Verify that the policy config is converted to plugin config for service affinity predicate. - serviceAffinityConfig := findPluginConfig(serviceaffinity.Name, conf) - encoding, err = json.Marshal(serviceAffinityConfig) - if err != nil { - t.Errorf("Failed to marshal %+v: %v", serviceAffinityConfig, err) - } - want = `{"Name":"ServiceAffinity","Args":{"labels":["zone","foo"]}}` - if string(encoding) != want { - t.Errorf("Config for ServiceAffinity plugin mismatch. got: %v, want: %v", string(encoding), want) +func verifyPluginConvertion(t *testing.T, name string, extentionPoints []string, conf *Config, wantWeight int32, wantArgs string) { + for _, extensionPoint := range extentionPoints { + plugin, ok := findPlugin(name, extensionPoint, conf) + if !ok { + t.Fatalf("%q plugin does not exist in framework.", name) + } + if extensionPoint == "ScorePlugin" { + if plugin.Weight != wantWeight { + t.Errorf("Wrong weight. Got: %v, want: %v", plugin.Weight, wantWeight) + } + } + // Verify that the policy config is converted to plugin config. + pluginConfig := findPluginConfig(name, conf) + encoding, err := json.Marshal(pluginConfig) + if err != nil { + t.Errorf("Failed to marshal %+v: %v", pluginConfig, err) + } + if string(encoding) != wantArgs { + t.Errorf("Config for %v plugin mismatch. got: %v, want: %v", name, string(encoding), wantArgs) + } } } diff --git a/pkg/scheduler/framework/plugins/default_registry.go b/pkg/scheduler/framework/plugins/default_registry.go index 09dc752828e..6f2cd853286 100644 --- a/pkg/scheduler/framework/plugins/default_registry.go +++ b/pkg/scheduler/framework/plugins/default_registry.go @@ -279,6 +279,12 @@ func NewDefaultConfigProducerRegistry() *ConfigProducerRegistry { pluginConfig = append(pluginConfig, makePluginConfig(nodelabel.Name, args.NodeLabelArgs)) return }) + registry.RegisterPriority(serviceaffinity.Name, + func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) { + plugins.Score = appendToPluginSet(plugins.Score, serviceaffinity.Name, &args.Weight) + pluginConfig = append(pluginConfig, makePluginConfig(serviceaffinity.Name, args.ServiceAffinityArgs)) + return + }) return registry } diff --git a/pkg/scheduler/framework/plugins/serviceaffinity/BUILD b/pkg/scheduler/framework/plugins/serviceaffinity/BUILD index 12678f0c05d..ab27433dffb 100644 --- a/pkg/scheduler/framework/plugins/serviceaffinity/BUILD +++ b/pkg/scheduler/framework/plugins/serviceaffinity/BUILD @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/scheduler/algorithm/predicates:go_default_library", + "//pkg/scheduler/algorithm/priorities:go_default_library", "//pkg/scheduler/framework/plugins/migration:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library", @@ -21,10 +22,12 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/scheduler/algorithm/predicates:go_default_library", + "//pkg/scheduler/algorithm/priorities:go_default_library", "//pkg/scheduler/framework/plugins/migration:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/listers/fake:go_default_library", "//pkg/scheduler/nodeinfo/snapshot:go_default_library", + "//staging/src/k8s.io/api/apps/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", ], diff --git a/pkg/scheduler/framework/plugins/serviceaffinity/service_affinity.go b/pkg/scheduler/framework/plugins/serviceaffinity/service_affinity.go index 46a5317524d..4af032d2d9e 100644 --- a/pkg/scheduler/framework/plugins/serviceaffinity/service_affinity.go +++ b/pkg/scheduler/framework/plugins/serviceaffinity/service_affinity.go @@ -18,22 +18,28 @@ package serviceaffinity import ( "context" + "fmt" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" + "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" "k8s.io/kubernetes/pkg/scheduler/nodeinfo" ) -// Name of this plugin. +// Name is the name of the plugin used in the plugin registry and configurations. const Name = "ServiceAffinity" // Args holds the args that are used to configure the plugin. type Args struct { - // Labels should be present for the node to be considered a fit for hosting the pod - Labels []string `json:"labels,omitempty"` + // Labels are homogeneous for pods that are scheduled to a node. + // (i.e. it returns true IFF this pod can be added to this node such that all other pods in + // the same service are running on nodes with the exact same values for Labels). + AffinityLabels []string `json:"labels,omitempty"` + // AntiAffinityLabelsPreference are the labels to consider for service anti affinity scoring. + AntiAffinityLabelsPreference []string `json:"antiAffinityLabelsPreference,omitempty"` } // New initializes a new plugin and returns it. @@ -46,22 +52,31 @@ func New(plArgs *runtime.Unknown, handle framework.FrameworkHandle) (framework.P nodeInfoLister := handle.SnapshotSharedLister().NodeInfos() podLister := handle.SnapshotSharedLister().Pods() serviceLister := informerFactory.Core().V1().Services().Lister() - fitPredicate, predicateMetadataProducer := predicates.NewServiceAffinityPredicate(nodeInfoLister, podLister, serviceLister, args.Labels) + fitPredicate, predicateMetadataProducer := predicates.NewServiceAffinityPredicate(nodeInfoLister, podLister, serviceLister, args.AffinityLabels) // Once we generate the predicate we should also Register the Precomputation predicates.RegisterPredicateMetadataProducer(predicates.CheckServiceAffinityPred, predicateMetadataProducer) + priorityMapFunction, priorityReduceFunction := priorities.NewServiceAntiAffinityPriority(podLister, serviceLister, args.AntiAffinityLabelsPreference) + return &ServiceAffinity{ - predicate: fitPredicate, + handle: handle, + predicate: fitPredicate, + priorityMapFunction: priorityMapFunction, + priorityReduceFunction: priorityReduceFunction, }, nil } // ServiceAffinity is a plugin that checks service affinity. type ServiceAffinity struct { - predicate predicates.FitPredicate + handle framework.FrameworkHandle + predicate predicates.FitPredicate + priorityMapFunction priorities.PriorityMapFunction + priorityReduceFunction priorities.PriorityReduceFunction } var _ framework.FilterPlugin = &ServiceAffinity{} +var _ framework.ScorePlugin = &ServiceAffinity{} // Name returns name of the plugin. It is used in logs, etc. func (pl *ServiceAffinity) Name() string { @@ -77,3 +92,26 @@ func (pl *ServiceAffinity) Filter(ctx context.Context, cycleState *framework.Cyc _, reasons, err := pl.predicate(pod, meta, nodeInfo) return migration.PredicateResultToFrameworkStatus(reasons, err) } + +// Score invoked at the Score extension point. +func (pl *ServiceAffinity) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { + nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) + if err != nil { + return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) + } + meta := migration.PriorityMetadata(state) + s, err := pl.priorityMapFunction(pod, meta, nodeInfo) + return s.Score, migration.ErrorToFrameworkStatus(err) +} + +// NormalizeScore invoked after scoring all nodes. +func (pl *ServiceAffinity) NormalizeScore(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status { + // Note that priorityReduceFunction doesn't use priority metadata, hence passing nil here. + err := pl.priorityReduceFunction(pod, nil, pl.handle.SnapshotSharedLister(), scores) + return migration.ErrorToFrameworkStatus(err) +} + +// ScoreExtensions of the Score plugin. +func (pl *ServiceAffinity) ScoreExtensions() framework.ScoreExtensions { + return pl +} diff --git a/pkg/scheduler/framework/plugins/serviceaffinity/service_affinity_test.go b/pkg/scheduler/framework/plugins/serviceaffinity/service_affinity_test.go index 90fbc961563..c3234cc9a72 100644 --- a/pkg/scheduler/framework/plugins/serviceaffinity/service_affinity_test.go +++ b/pkg/scheduler/framework/plugins/serviceaffinity/service_affinity_test.go @@ -18,11 +18,15 @@ package serviceaffinity import ( "context" + "reflect" + "sort" "testing" + apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" + "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake" @@ -179,3 +183,269 @@ func TestServiceAffinity(t *testing.T) { }) } } +func TestServiceAffinityScore(t *testing.T) { + labels1 := map[string]string{ + "foo": "bar", + "baz": "blah", + } + labels2 := map[string]string{ + "bar": "foo", + "baz": "blah", + } + zone1 := map[string]string{ + "zone": "zone1", + } + zone1Rack1 := map[string]string{ + "zone": "zone1", + "rack": "rack1", + } + zone1Rack2 := map[string]string{ + "zone": "zone1", + "rack": "rack2", + } + zone2 := map[string]string{ + "zone": "zone2", + } + zone2Rack1 := map[string]string{ + "zone": "zone2", + "rack": "rack1", + } + nozone := map[string]string{ + "name": "value", + } + zone0Spec := v1.PodSpec{ + NodeName: "machine01", + } + zone1Spec := v1.PodSpec{ + NodeName: "machine11", + } + zone2Spec := v1.PodSpec{ + NodeName: "machine21", + } + labeledNodes := map[string]map[string]string{ + "machine01": nozone, "machine02": nozone, + "machine11": zone1, "machine12": zone1, + "machine21": zone2, "machine22": zone2, + } + nodesWithZoneAndRackLabels := map[string]map[string]string{ + "machine01": nozone, "machine02": nozone, + "machine11": zone1Rack1, "machine12": zone1Rack2, + "machine21": zone2Rack1, "machine22": zone2Rack1, + } + tests := []struct { + pod *v1.Pod + pods []*v1.Pod + nodes map[string]map[string]string + services []*v1.Service + labels []string + expectedList framework.NodeScoreList + name string + }{ + { + pod: new(v1.Pod), + nodes: labeledNodes, + labels: []string{"zone"}, + expectedList: []framework.NodeScore{{Name: "machine11", Score: framework.MaxNodeScore}, {Name: "machine12", Score: framework.MaxNodeScore}, + {Name: "machine21", Score: framework.MaxNodeScore}, {Name: "machine22", Score: framework.MaxNodeScore}, + {Name: "machine01", Score: 0}, {Name: "machine02", Score: 0}}, + name: "nothing scheduled", + }, + { + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1}}, + pods: []*v1.Pod{{Spec: zone1Spec}}, + nodes: labeledNodes, + labels: []string{"zone"}, + expectedList: []framework.NodeScore{{Name: "machine11", Score: framework.MaxNodeScore}, {Name: "machine12", Score: framework.MaxNodeScore}, + {Name: "machine21", Score: framework.MaxNodeScore}, {Name: "machine22", Score: framework.MaxNodeScore}, + {Name: "machine01", Score: 0}, {Name: "machine02", Score: 0}}, + name: "no services", + }, + { + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1}}, + pods: []*v1.Pod{{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}}}, + nodes: labeledNodes, + labels: []string{"zone"}, + services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: map[string]string{"key": "value"}}}}, + expectedList: []framework.NodeScore{{Name: "machine11", Score: framework.MaxNodeScore}, {Name: "machine12", Score: framework.MaxNodeScore}, + {Name: "machine21", Score: framework.MaxNodeScore}, {Name: "machine22", Score: framework.MaxNodeScore}, + {Name: "machine01", Score: 0}, {Name: "machine02", Score: 0}}, + name: "different services", + }, + { + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1}}, + pods: []*v1.Pod{ + {Spec: zone0Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}}, + {Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}}, + {Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}}, + }, + nodes: labeledNodes, + labels: []string{"zone"}, + services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: labels1}}}, + expectedList: []framework.NodeScore{{Name: "machine11", Score: framework.MaxNodeScore}, {Name: "machine12", Score: framework.MaxNodeScore}, + {Name: "machine21", Score: 0}, {Name: "machine22", Score: 0}, + {Name: "machine01", Score: 0}, {Name: "machine02", Score: 0}}, + name: "three pods, one service pod", + }, + { + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1}}, + pods: []*v1.Pod{ + {Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}}, + {Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}}, + {Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}}, + }, + nodes: labeledNodes, + labels: []string{"zone"}, + services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: labels1}}}, + expectedList: []framework.NodeScore{{Name: "machine11", Score: 50}, {Name: "machine12", Score: 50}, + {Name: "machine21", Score: 50}, {Name: "machine22", Score: 50}, + {Name: "machine01", Score: 0}, {Name: "machine02", Score: 0}}, + name: "three pods, two service pods on different machines", + }, + { + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1, Namespace: metav1.NamespaceDefault}}, + pods: []*v1.Pod{ + {Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}}, + {Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, Namespace: metav1.NamespaceDefault}}, + {Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}}, + {Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, Namespace: "ns1"}}, + }, + nodes: labeledNodes, + labels: []string{"zone"}, + services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: labels1}, ObjectMeta: metav1.ObjectMeta{Namespace: metav1.NamespaceDefault}}}, + expectedList: []framework.NodeScore{{Name: "machine11", Score: 0}, {Name: "machine12", Score: 0}, + {Name: "machine21", Score: framework.MaxNodeScore}, {Name: "machine22", Score: framework.MaxNodeScore}, + {Name: "machine01", Score: 0}, {Name: "machine02", Score: 0}}, + name: "three service label match pods in different namespaces", + }, + { + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1}}, + pods: []*v1.Pod{ + {Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}}, + {Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}}, + {Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}}, + {Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}}, + }, + nodes: labeledNodes, + labels: []string{"zone"}, + services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: labels1}}}, + expectedList: []framework.NodeScore{{Name: "machine11", Score: 66}, {Name: "machine12", Score: 66}, + {Name: "machine21", Score: 33}, {Name: "machine22", Score: 33}, + {Name: "machine01", Score: 0}, {Name: "machine02", Score: 0}}, + name: "four pods, three service pods", + }, + { + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1}}, + pods: []*v1.Pod{ + {Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}}, + {Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}}, + {Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}}, + }, + nodes: labeledNodes, + labels: []string{"zone"}, + services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}}, + expectedList: []framework.NodeScore{{Name: "machine11", Score: 33}, {Name: "machine12", Score: 33}, + {Name: "machine21", Score: 66}, {Name: "machine22", Score: 66}, + {Name: "machine01", Score: 0}, {Name: "machine02", Score: 0}}, + name: "service with partial pod label matches", + }, + { + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1}}, + pods: []*v1.Pod{ + {Spec: zone0Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}}, + {Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}}, + {Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}}, + {Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}}, + }, + nodes: labeledNodes, + labels: []string{"zone"}, + services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: labels1}}}, + expectedList: []framework.NodeScore{{Name: "machine11", Score: 75}, {Name: "machine12", Score: 75}, + {Name: "machine21", Score: 50}, {Name: "machine22", Score: 50}, + {Name: "machine01", Score: 0}, {Name: "machine02", Score: 0}}, + name: "service pod on non-zoned node", + }, + { + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1}}, + pods: []*v1.Pod{ + {Spec: zone0Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}}, + {Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}}, + {Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}}, + }, + nodes: nodesWithZoneAndRackLabels, + labels: []string{"zone", "rack"}, + services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: labels1}}}, + expectedList: []framework.NodeScore{{Name: "machine11", Score: 25}, {Name: "machine12", Score: 75}, + {Name: "machine21", Score: 25}, {Name: "machine22", Score: 25}, + {Name: "machine01", Score: 0}, {Name: "machine02", Score: 0}}, + name: "three pods, two service pods, with rack label", + }, + } + // these local variables just make sure controllerLister\replicaSetLister\statefulSetLister not nil + // when construct metaDataProducer + sss := []*apps.StatefulSet{{Spec: apps.StatefulSetSpec{Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}} + rcs := []*v1.ReplicationController{{Spec: v1.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}} + rss := []*apps.ReplicaSet{{Spec: apps.ReplicaSetSpec{Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + nodes := makeLabeledNodeList(test.nodes) + snapshot := nodeinfosnapshot.NewSnapshot(test.pods, nodes) + fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot)) + serviceLister := fakelisters.ServiceLister(test.services) + priorityMapFunction, priorityReduceFunction := priorities.NewServiceAntiAffinityPriority(snapshot.Pods(), serviceLister, test.labels) + + p := &ServiceAffinity{ + handle: fh, + priorityMapFunction: priorityMapFunction, + priorityReduceFunction: priorityReduceFunction, + } + metaDataProducer := priorities.NewPriorityMetadataFactory( + fakelisters.ServiceLister(test.services), + fakelisters.ControllerLister(rcs), + fakelisters.ReplicaSetLister(rss), + fakelisters.StatefulSetLister(sss), + 1) + metaData := metaDataProducer(test.pod, nodes, snapshot) + state := framework.NewCycleState() + state.Write(migration.PrioritiesStateKey, &migration.PrioritiesStateData{Reference: metaData}) + + var gotList framework.NodeScoreList + for _, n := range makeLabeledNodeList(test.nodes) { + score, status := p.Score(context.Background(), state, test.pod, n.Name) + if !status.IsSuccess() { + t.Errorf("unexpected error: %v", status) + } + gotList = append(gotList, framework.NodeScore{Name: n.Name, Score: score}) + } + + status := p.ScoreExtensions().NormalizeScore(context.Background(), state, test.pod, gotList) + if !status.IsSuccess() { + t.Errorf("unexpected error: %v", status) + } + + // sort the two lists to avoid failures on account of different ordering + sortNodeScoreList(test.expectedList) + sortNodeScoreList(gotList) + if !reflect.DeepEqual(test.expectedList, gotList) { + t.Errorf("expected %#v, got %#v", test.expectedList, gotList) + } + }) + } +} + +func makeLabeledNodeList(nodeMap map[string]map[string]string) []*v1.Node { + nodes := make([]*v1.Node, 0, len(nodeMap)) + for nodeName, labels := range nodeMap { + nodes = append(nodes, &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName, Labels: labels}}) + } + return nodes +} + +func sortNodeScoreList(out framework.NodeScoreList) { + sort.Slice(out, func(i, j int) bool { + if out[i].Score == out[j].Score { + return out[i].Name < out[j].Name + } + return out[i].Score < out[j].Score + }) +}