diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index 292861c76de..2cd07578acc 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -23,6 +23,7 @@ go_library( "//pkg/scheduler/framework/plugins:go_default_library", "//pkg/scheduler/framework/plugins/nodelabel:go_default_library", "//pkg/scheduler/framework/plugins/requestedtocapacityratio:go_default_library", + "//pkg/scheduler/framework/plugins/serviceaffinity:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/cache/debugger:go_default_library", @@ -81,6 +82,7 @@ go_test( "//pkg/scheduler/core:go_default_library", "//pkg/scheduler/framework/plugins:go_default_library", "//pkg/scheduler/framework/plugins/nodelabel:go_default_library", + "//pkg/scheduler/framework/plugins/serviceaffinity:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/cache/fake:go_default_library", diff --git a/pkg/scheduler/algorithm_factory.go b/pkg/scheduler/algorithm_factory.go index 53444958494..debb4c90e2e 100644 --- a/pkg/scheduler/algorithm_factory.go +++ b/pkg/scheduler/algorithm_factory.go @@ -36,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/requestedtocapacityratio" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/serviceaffinity" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers" "k8s.io/kubernetes/pkg/scheduler/volumebinder" @@ -263,60 +264,73 @@ func RegisterFitPredicateFactory(name string, predicateFactory FitPredicateFacto // RegisterCustomFitPredicate registers a custom fit predicate with the algorithm registry. // Returns the name, with which the predicate was registered. -func RegisterCustomFitPredicate(policy schedulerapi.PredicatePolicy, args *plugins.ConfigProducerArgs) string { +func RegisterCustomFitPredicate(policy schedulerapi.PredicatePolicy, pluginArgs *plugins.ConfigProducerArgs) string { var predicateFactory FitPredicateFactory var ok bool - name := policy.Name + policyName := policy.Name validatePredicateOrDie(policy) // generate the predicate function, if a custom type is requested if policy.Argument != nil { if policy.Argument.ServiceAffinity != nil { + // We use the ServiceAffinity plugin name for all ServiceAffinity custom priorities. + // It may get called multiple times but we essentially only register one instance of ServiceAffinity predicate. + // This name is then used to find the registered plugin and run the plugin instead of the predicate. + policyName = predicates.CheckServiceAffinityPred + + // map LabelsPresence policy to ConfigProducerArgs that's used to configure the ServiceAffinity plugin. + if pluginArgs.ServiceAffinityArgs == nil { + pluginArgs.ServiceAffinityArgs = &serviceaffinity.Args{} + } + + pluginArgs.ServiceAffinityArgs.Labels = append(pluginArgs.ServiceAffinityArgs.Labels, policy.Argument.ServiceAffinity.Labels...) + predicateFactory = func(args PluginFactoryArgs) predicates.FitPredicate { predicate, precomputationFunction := predicates.NewServiceAffinityPredicate( args.NodeInfoLister, args.PodLister, args.ServiceLister, - policy.Argument.ServiceAffinity.Labels, + pluginArgs.ServiceAffinityArgs.Labels, ) // Once we generate the predicate we should also Register the Precomputation - predicates.RegisterPredicateMetadataProducer(policy.Name, precomputationFunction) + predicates.RegisterPredicateMetadataProducer(policyName, precomputationFunction) return predicate } } else if policy.Argument.LabelsPresence != nil { - // map LabelPresence policy to ConfigProducerArgs that's used to configure the NodeLabel plugin. - if args.NodeLabelArgs == nil { - args.NodeLabelArgs = &nodelabel.Args{} - } - if policy.Argument.LabelsPresence.Presence { - args.NodeLabelArgs.PresentLabels = append(args.NodeLabelArgs.PresentLabels, policy.Argument.LabelsPresence.Labels...) - } else { - args.NodeLabelArgs.AbsentLabels = append(args.NodeLabelArgs.AbsentLabels, policy.Argument.LabelsPresence.Labels...) - } - predicateFactory = func(_ PluginFactoryArgs) predicates.FitPredicate { - return predicates.NewNodeLabelPredicate( - args.NodeLabelArgs.PresentLabels, - args.NodeLabelArgs.AbsentLabels, - ) - } // We use the NodeLabel plugin name for all NodeLabel custom priorities. // It may get called multiple times but we essentially only register one instance of NodeLabel predicate. // This name is then used to find the registered plugin and run the plugin instead of the predicate. - name = nodelabel.Name + policyName = predicates.CheckNodeLabelPresencePred + + // map LabelPresence policy to ConfigProducerArgs that's used to configure the NodeLabel plugin. + if pluginArgs.NodeLabelArgs == nil { + pluginArgs.NodeLabelArgs = &nodelabel.Args{} + } + if policy.Argument.LabelsPresence.Presence { + pluginArgs.NodeLabelArgs.PresentLabels = append(pluginArgs.NodeLabelArgs.PresentLabels, policy.Argument.LabelsPresence.Labels...) + } else { + pluginArgs.NodeLabelArgs.AbsentLabels = append(pluginArgs.NodeLabelArgs.AbsentLabels, policy.Argument.LabelsPresence.Labels...) + } + predicateFactory = func(_ PluginFactoryArgs) predicates.FitPredicate { + return predicates.NewNodeLabelPredicate( + pluginArgs.NodeLabelArgs.PresentLabels, + pluginArgs.NodeLabelArgs.AbsentLabels, + ) + } } - } else if predicateFactory, ok = fitPredicateMap[policy.Name]; ok { + } else if predicateFactory, ok = fitPredicateMap[policyName]; ok { // checking to see if a pre-defined predicate is requested - klog.V(2).Infof("Predicate type %s already registered, reusing.", policy.Name) - return name + klog.V(2).Infof("Predicate type %s already registered, reusing.", policyName) + return policyName } if predicateFactory == nil { - klog.Fatalf("Invalid configuration: Predicate type not found for %s", policy.Name) + klog.Fatalf("Invalid configuration: Predicate type not found for %s", policyName) } - return RegisterFitPredicateFactory(name, predicateFactory) + return RegisterFitPredicateFactory(policyName, predicateFactory) } // IsFitPredicateRegistered is useful for testing providers. diff --git a/pkg/scheduler/apis/config/testing/compatibility_test.go b/pkg/scheduler/apis/config/testing/compatibility_test.go index ee11ea35e20..e6eff445530 100644 --- a/pkg/scheduler/apis/config/testing/compatibility_test.go +++ b/pkg/scheduler/apis/config/testing/compatibility_test.go @@ -98,7 +98,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { }`, wantPredicates: sets.NewString( "PodFitsPorts", - "TestServiceAffinity", ), wantPrioritizers: sets.NewString( "ServiceSpreadingPriority", @@ -112,6 +111,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "VolumeRestrictions"}, {Name: "TaintToleration"}, {Name: "NodeLabel"}, + {Name: "ServiceAffinity"}, }, "ScorePlugin": { {Name: "NodeResourcesLeastAllocated", Weight: 1}, @@ -144,9 +144,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {"name": "TestLabelPreference", "weight": 4, "argument": {"labelPreference": {"label": "bar", "presence":true}}} ] }`, - wantPredicates: sets.NewString( - "TestServiceAffinity", - ), + wantPredicates: sets.NewString(), wantPrioritizers: sets.NewString( "TestServiceAntiAffinity", ), @@ -160,6 +158,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "VolumeRestrictions"}, {Name: "TaintToleration"}, {Name: "NodeLabel"}, + {Name: "ServiceAffinity"}, }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, @@ -199,9 +198,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {"name": "TestLabelPreference", "weight": 4, "argument": {"labelPreference": {"label": "bar", "presence":true}}} ] }`, - wantPredicates: sets.NewString( - "TestServiceAffinity", - ), + wantPredicates: sets.NewString(), wantPrioritizers: sets.NewString( "TestServiceAntiAffinity", ), @@ -214,11 +211,12 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "NodeResourcesFit"}, {Name: "VolumeRestrictions"}, {Name: "TaintToleration"}, + {Name: "NodeLabel"}, + {Name: "ServiceAffinity"}, {Name: "EBSLimits"}, {Name: "GCEPDLimits"}, {Name: "AzureDiskLimits"}, {Name: "VolumeZone"}, - {Name: "NodeLabel"}, }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, @@ -263,9 +261,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {"name": "InterPodAffinityPriority", "weight": 2} ] }`, - wantPredicates: sets.NewString( - "TestServiceAffinity", - ), + wantPredicates: sets.NewString(), wantPrioritizers: sets.NewString( "InterPodAffinityPriority", ), @@ -278,12 +274,13 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "NodeResourcesFit"}, {Name: "VolumeRestrictions"}, {Name: "TaintToleration"}, + {Name: "NodeLabel"}, + {Name: "ServiceAffinity"}, {Name: "EBSLimits"}, {Name: "GCEPDLimits"}, {Name: "AzureDiskLimits"}, {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, - {Name: "NodeLabel"}, }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, @@ -330,9 +327,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {"name": "MostRequestedPriority", "weight": 2} ] }`, - wantPredicates: sets.NewString( - "TestServiceAffinity", - ), + wantPredicates: sets.NewString(), wantPrioritizers: sets.NewString( "InterPodAffinityPriority", ), @@ -345,12 +340,13 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "NodeResourcesFit"}, {Name: "VolumeRestrictions"}, {Name: "TaintToleration"}, + {Name: "NodeLabel"}, + {Name: "ServiceAffinity"}, {Name: "EBSLimits"}, {Name: "GCEPDLimits"}, {Name: "AzureDiskLimits"}, {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, - {Name: "NodeLabel"}, }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, @@ -408,9 +404,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "nodeCacheCapable": true }] }`, - wantPredicates: sets.NewString( - "TestServiceAffinity", - ), + wantPredicates: sets.NewString(), wantPrioritizers: sets.NewString( "InterPodAffinityPriority", ), @@ -423,12 +417,13 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "NodeResourcesFit"}, {Name: "VolumeRestrictions"}, {Name: "TaintToleration"}, + {Name: "NodeLabel"}, + {Name: "ServiceAffinity"}, {Name: "EBSLimits"}, {Name: "GCEPDLimits"}, {Name: "AzureDiskLimits"}, {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, - {Name: "NodeLabel"}, }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, @@ -497,9 +492,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "nodeCacheCapable": true }] }`, - wantPredicates: sets.NewString( - "TestServiceAffinity", - ), + wantPredicates: sets.NewString(), wantPrioritizers: sets.NewString( "InterPodAffinityPriority", ), @@ -512,12 +505,13 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "NodeResourcesFit"}, {Name: "VolumeRestrictions"}, {Name: "TaintToleration"}, + {Name: "NodeLabel"}, + {Name: "ServiceAffinity"}, {Name: "EBSLimits"}, {Name: "GCEPDLimits"}, {Name: "AzureDiskLimits"}, {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, - {Name: "NodeLabel"}, }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, @@ -587,9 +581,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "nodeCacheCapable": true }] }`, - wantPredicates: sets.NewString( - "TestServiceAffinity", - ), + wantPredicates: sets.NewString(), wantPrioritizers: sets.NewString( "InterPodAffinityPriority", ), @@ -602,13 +594,14 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "NodeResourcesFit"}, {Name: "VolumeRestrictions"}, {Name: "TaintToleration"}, + {Name: "NodeLabel"}, + {Name: "ServiceAffinity"}, {Name: "EBSLimits"}, {Name: "GCEPDLimits"}, {Name: "AzureDiskLimits"}, {Name: "VolumeBinding"}, {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, - {Name: "NodeLabel"}, }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, @@ -681,9 +674,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "ignorable":true }] }`, - wantPredicates: sets.NewString( - "TestServiceAffinity", - ), + wantPredicates: sets.NewString(), wantPrioritizers: sets.NewString( "InterPodAffinityPriority", ), @@ -696,13 +687,14 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "NodeResourcesFit"}, {Name: "VolumeRestrictions"}, {Name: "TaintToleration"}, + {Name: "NodeLabel"}, + {Name: "ServiceAffinity"}, {Name: "EBSLimits"}, {Name: "GCEPDLimits"}, {Name: "AzureDiskLimits"}, {Name: "VolumeBinding"}, {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, - {Name: "NodeLabel"}, }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, @@ -787,9 +779,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "ignorable":true }] }`, - wantPredicates: sets.NewString( - "TestServiceAffinity", - ), + wantPredicates: sets.NewString(), wantPrioritizers: sets.NewString( "InterPodAffinityPriority", ), @@ -802,13 +792,14 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "NodeResourcesFit"}, {Name: "VolumeRestrictions"}, {Name: "TaintToleration"}, + {Name: "NodeLabel"}, + {Name: "ServiceAffinity"}, {Name: "EBSLimits"}, {Name: "GCEPDLimits"}, {Name: "AzureDiskLimits"}, {Name: "VolumeBinding"}, {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, - {Name: "NodeLabel"}, }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, @@ -895,9 +886,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "ignorable":true }] }`, - wantPredicates: sets.NewString( - "TestServiceAffinity", - ), + wantPredicates: sets.NewString(), wantPrioritizers: sets.NewString( "InterPodAffinityPriority", ), @@ -910,6 +899,8 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "NodeResourcesFit"}, {Name: "VolumeRestrictions"}, {Name: "TaintToleration"}, + {Name: "NodeLabel"}, + {Name: "ServiceAffinity"}, {Name: "EBSLimits"}, {Name: "GCEPDLimits"}, {Name: "NodeVolumeLimits"}, @@ -917,7 +908,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "VolumeBinding"}, {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, - {Name: "NodeLabel"}, }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, @@ -1003,9 +993,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "ignorable":true }] }`, - wantPredicates: sets.NewString( - "TestServiceAffinity", - ), + wantPredicates: sets.NewString(), wantPrioritizers: sets.NewString( "InterPodAffinityPriority", ), @@ -1018,6 +1006,8 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "NodeResourcesFit"}, {Name: "VolumeRestrictions"}, {Name: "TaintToleration"}, + {Name: "NodeLabel"}, + {Name: "ServiceAffinity"}, {Name: "EBSLimits"}, {Name: "GCEPDLimits"}, {Name: "NodeVolumeLimits"}, @@ -1026,7 +1016,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "VolumeBinding"}, {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, - {Name: "NodeLabel"}, }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, @@ -1116,9 +1105,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "ignorable":true }] }`, - wantPredicates: sets.NewString( - "TestServiceAffinity", - ), + wantPredicates: sets.NewString(), wantPrioritizers: sets.NewString( "InterPodAffinityPriority", ), @@ -1131,6 +1118,8 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "NodeResourcesFit"}, {Name: "VolumeRestrictions"}, {Name: "TaintToleration"}, + {Name: "NodeLabel"}, + {Name: "ServiceAffinity"}, {Name: "EBSLimits"}, {Name: "GCEPDLimits"}, {Name: "NodeVolumeLimits"}, @@ -1139,7 +1128,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "VolumeBinding"}, {Name: "VolumeZone"}, {Name: "InterPodAffinity"}, - {Name: "NodeLabel"}, }, "ScorePlugin": { {Name: "NodeResourcesBalancedAllocation", Weight: 2}, @@ -1321,6 +1309,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { seenPriorities.Insert(scoreToPriorityMap[p.Name]) } + if diff := cmp.Diff(tc.wantPlugins, gotPlugins); diff != "" { t.Errorf("unexpected plugins diff (-want, +got): %s", diff) } diff --git a/pkg/scheduler/factory_test.go b/pkg/scheduler/factory_test.go index ccbf7f93bd8..9c2ab1a7150 100644 --- a/pkg/scheduler/factory_test.go +++ b/pkg/scheduler/factory_test.go @@ -47,6 +47,7 @@ import ( extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1" frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/serviceaffinity" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" @@ -91,6 +92,7 @@ func TestCreateFromConfig(t *testing.T) { "apiVersion" : "v1", "predicates" : [ {"name" : "TestZoneAffinity", "argument" : {"serviceAffinity" : {"labels" : ["zone"]}}}, + {"name" : "TestZoneAffinity", "argument" : {"serviceAffinity" : {"labels" : ["foo"]}}}, {"name" : "TestRequireZone", "argument" : {"labelsPresence" : {"labels" : ["zone"], "presence" : true}}}, {"name" : "TestNoFooLabel", "argument" : {"labelsPresence" : {"labels" : ["foo"], "presence" : false}}}, {"name" : "PredicateOne"}, @@ -137,6 +139,21 @@ func TestCreateFromConfig(t *testing.T) { if string(encoding) != want { t.Errorf("Config for NodeLabel plugin mismatch. got: %v, want: %v", string(encoding), want) } + + // Verify that service affinity predicates are converted to framework plugins. + if _, ok := findPlugin(serviceaffinity.Name, "FilterPlugin", conf); !ok { + t.Fatalf("ServiceAffinity plugin not exist in framework.") + } + // Verify that the policy config is converted to plugin config for service affinity predicate. + serviceAffinityConfig := findPluginConfig(serviceaffinity.Name, conf) + encoding, err = json.Marshal(serviceAffinityConfig) + if err != nil { + t.Errorf("Failed to marshal %+v: %v", serviceAffinityConfig, err) + } + want = `{"Name":"ServiceAffinity","Args":{"labels":["zone","foo"]}}` + if string(encoding) != want { + t.Errorf("Config for ServiceAffinity plugin mismatch. got: %v, want: %v", string(encoding), want) + } } func findPlugin(name, extensionPoint string, schedConf *Config) (schedulerapi.Plugin, bool) { diff --git a/pkg/scheduler/framework/plugins/BUILD b/pkg/scheduler/framework/plugins/BUILD index 2a61f860b34..d355afa308d 100644 --- a/pkg/scheduler/framework/plugins/BUILD +++ b/pkg/scheduler/framework/plugins/BUILD @@ -22,6 +22,7 @@ go_library( "//pkg/scheduler/framework/plugins/nodevolumelimits:go_default_library", "//pkg/scheduler/framework/plugins/podtopologyspread:go_default_library", "//pkg/scheduler/framework/plugins/requestedtocapacityratio:go_default_library", + "//pkg/scheduler/framework/plugins/serviceaffinity:go_default_library", "//pkg/scheduler/framework/plugins/tainttoleration:go_default_library", "//pkg/scheduler/framework/plugins/volumebinding:go_default_library", "//pkg/scheduler/framework/plugins/volumerestrictions:go_default_library", @@ -59,6 +60,7 @@ filegroup( "//pkg/scheduler/framework/plugins/nodevolumelimits:all-srcs", "//pkg/scheduler/framework/plugins/podtopologyspread:all-srcs", "//pkg/scheduler/framework/plugins/requestedtocapacityratio:all-srcs", + "//pkg/scheduler/framework/plugins/serviceaffinity:all-srcs", "//pkg/scheduler/framework/plugins/tainttoleration:all-srcs", "//pkg/scheduler/framework/plugins/volumebinding:all-srcs", "//pkg/scheduler/framework/plugins/volumerestrictions:all-srcs", diff --git a/pkg/scheduler/framework/plugins/default_registry.go b/pkg/scheduler/framework/plugins/default_registry.go index a25a91261bf..09dc752828e 100644 --- a/pkg/scheduler/framework/plugins/default_registry.go +++ b/pkg/scheduler/framework/plugins/default_registry.go @@ -38,6 +38,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/requestedtocapacityratio" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/serviceaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions" @@ -82,6 +83,7 @@ func NewDefaultRegistry(args *RegistryArgs) framework.Registry { interpodaffinity.Name: interpodaffinity.New, nodelabel.Name: nodelabel.New, requestedtocapacityratio.Name: requestedtocapacityratio.New, + serviceaffinity.Name: serviceaffinity.New, } } @@ -95,6 +97,8 @@ type ConfigProducerArgs struct { NodeLabelArgs *nodelabel.Args // RequestedToCapacityRatioArgs is the args for the RequestedToCapacityRatio plugin. RequestedToCapacityRatioArgs *requestedtocapacityratio.Args + // ServiceAffinityArgs is the args for the ServiceAffinity plugin. + ServiceAffinityArgs *serviceaffinity.Args } // ConfigProducer produces a framework's configuration. @@ -203,12 +207,18 @@ func NewDefaultConfigProducerRegistry() *ConfigProducerRegistry { plugins.Filter = appendToPluginSet(plugins.Filter, podtopologyspread.Name, nil) return }) - registry.RegisterPredicate(nodelabel.Name, + registry.RegisterPredicate(predicates.CheckNodeLabelPresencePred, func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) { plugins.Filter = appendToPluginSet(plugins.Filter, nodelabel.Name, nil) pluginConfig = append(pluginConfig, makePluginConfig(nodelabel.Name, args.NodeLabelArgs)) return }) + registry.RegisterPredicate(predicates.CheckServiceAffinityPred, + func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) { + plugins.Filter = appendToPluginSet(plugins.Filter, serviceaffinity.Name, nil) + pluginConfig = append(pluginConfig, makePluginConfig(serviceaffinity.Name, args.ServiceAffinityArgs)) + return + }) // Register Priorities. registry.RegisterPriority(priorities.SelectorSpreadPriority, diff --git a/pkg/scheduler/framework/plugins/serviceaffinity/BUILD b/pkg/scheduler/framework/plugins/serviceaffinity/BUILD new file mode 100644 index 00000000000..12678f0c05d --- /dev/null +++ b/pkg/scheduler/framework/plugins/serviceaffinity/BUILD @@ -0,0 +1,45 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["service_affinity.go"], + importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/serviceaffinity", + visibility = ["//visibility:public"], + deps = [ + "//pkg/scheduler/algorithm/predicates:go_default_library", + "//pkg/scheduler/framework/plugins/migration:go_default_library", + "//pkg/scheduler/framework/v1alpha1:go_default_library", + "//pkg/scheduler/nodeinfo:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["service_affinity_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/scheduler/algorithm/predicates:go_default_library", + "//pkg/scheduler/framework/plugins/migration:go_default_library", + "//pkg/scheduler/framework/v1alpha1:go_default_library", + "//pkg/scheduler/listers/fake:go_default_library", + "//pkg/scheduler/nodeinfo/snapshot:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/scheduler/framework/plugins/serviceaffinity/service_affinity.go b/pkg/scheduler/framework/plugins/serviceaffinity/service_affinity.go new file mode 100644 index 00000000000..46a5317524d --- /dev/null +++ b/pkg/scheduler/framework/plugins/serviceaffinity/service_affinity.go @@ -0,0 +1,79 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package serviceaffinity + +import ( + "context" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" + "k8s.io/kubernetes/pkg/scheduler/nodeinfo" +) + +// Name of this plugin. +const Name = "ServiceAffinity" + +// Args holds the args that are used to configure the plugin. +type Args struct { + // Labels should be present for the node to be considered a fit for hosting the pod + Labels []string `json:"labels,omitempty"` +} + +// New initializes a new plugin and returns it. +func New(plArgs *runtime.Unknown, handle framework.FrameworkHandle) (framework.Plugin, error) { + args := &Args{} + if err := framework.DecodeInto(plArgs, args); err != nil { + return nil, err + } + informerFactory := handle.SharedInformerFactory() + nodeInfoLister := handle.SnapshotSharedLister().NodeInfos() + podLister := handle.SnapshotSharedLister().Pods() + serviceLister := informerFactory.Core().V1().Services().Lister() + fitPredicate, predicateMetadataProducer := predicates.NewServiceAffinityPredicate(nodeInfoLister, podLister, serviceLister, args.Labels) + + // Once we generate the predicate we should also Register the Precomputation + predicates.RegisterPredicateMetadataProducer(predicates.CheckServiceAffinityPred, predicateMetadataProducer) + + return &ServiceAffinity{ + predicate: fitPredicate, + }, nil +} + +// ServiceAffinity is a plugin that checks service affinity. +type ServiceAffinity struct { + predicate predicates.FitPredicate +} + +var _ framework.FilterPlugin = &ServiceAffinity{} + +// Name returns name of the plugin. It is used in logs, etc. +func (pl *ServiceAffinity) Name() string { + return Name +} + +// Filter invoked at the filter extension point. +func (pl *ServiceAffinity) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status { + meta, ok := migration.PredicateMetadata(cycleState).(predicates.PredicateMetadata) + if !ok { + return framework.NewStatus(framework.Error, "looking up PredicateMetadata") + } + _, reasons, err := pl.predicate(pod, meta, nodeInfo) + return migration.PredicateResultToFrameworkStatus(reasons, err) +} diff --git a/pkg/scheduler/framework/plugins/serviceaffinity/service_affinity_test.go b/pkg/scheduler/framework/plugins/serviceaffinity/service_affinity_test.go new file mode 100644 index 00000000000..90fbc961563 --- /dev/null +++ b/pkg/scheduler/framework/plugins/serviceaffinity/service_affinity_test.go @@ -0,0 +1,181 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package serviceaffinity + +import ( + "context" + "testing" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" + fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake" + nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" +) + +func TestServiceAffinity(t *testing.T) { + selector := map[string]string{"foo": "bar"} + labels1 := map[string]string{ + "region": "r1", + "zone": "z11", + } + labels2 := map[string]string{ + "region": "r1", + "zone": "z12", + } + labels3 := map[string]string{ + "region": "r2", + "zone": "z21", + } + labels4 := map[string]string{ + "region": "r2", + "zone": "z22", + } + node1 := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labels1}} + node2 := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labels2}} + node3 := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labels3}} + node4 := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine4", Labels: labels4}} + node5 := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine5", Labels: labels4}} + tests := []struct { + name string + pod *v1.Pod + pods []*v1.Pod + services []*v1.Service + node *v1.Node + labels []string + res framework.Code + }{ + { + name: "nothing scheduled", + pod: new(v1.Pod), + node: &node1, + labels: []string{"region"}, + res: framework.Success, + }, + { + name: "pod with region label match", + pod: &v1.Pod{Spec: v1.PodSpec{NodeSelector: map[string]string{"region": "r1"}}}, + node: &node1, + labels: []string{"region"}, + res: framework.Success, + }, + { + name: "pod with region label mismatch", + pod: &v1.Pod{Spec: v1.PodSpec{NodeSelector: map[string]string{"region": "r2"}}}, + node: &node1, + labels: []string{"region"}, + res: framework.Unschedulable, + }, + { + name: "service pod on same node", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: selector}}, + pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: selector}}}, + node: &node1, + services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector}}}, + labels: []string{"region"}, + res: framework.Success, + }, + { + name: "service pod on different node, region match", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: selector}}, + pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: selector}}}, + node: &node1, + services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector}}}, + labels: []string{"region"}, + res: framework.Success, + }, + { + name: "service pod on different node, region mismatch", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: selector}}, + pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine3"}, ObjectMeta: metav1.ObjectMeta{Labels: selector}}}, + node: &node1, + services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector}}}, + labels: []string{"region"}, + res: framework.Unschedulable, + }, + { + name: "service in different namespace, region mismatch", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: selector, Namespace: "ns1"}}, + pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine3"}, ObjectMeta: metav1.ObjectMeta{Labels: selector, Namespace: "ns1"}}}, + node: &node1, + services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector}, ObjectMeta: metav1.ObjectMeta{Namespace: "ns2"}}}, + labels: []string{"region"}, + res: framework.Success, + }, + { + name: "pod in different namespace, region mismatch", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: selector, Namespace: "ns1"}}, + pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine3"}, ObjectMeta: metav1.ObjectMeta{Labels: selector, Namespace: "ns2"}}}, + node: &node1, + services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector}, ObjectMeta: metav1.ObjectMeta{Namespace: "ns1"}}}, + labels: []string{"region"}, + res: framework.Success, + }, + { + name: "service and pod in same namespace, region mismatch", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: selector, Namespace: "ns1"}}, + pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine3"}, ObjectMeta: metav1.ObjectMeta{Labels: selector, Namespace: "ns1"}}}, + node: &node1, + services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector}, ObjectMeta: metav1.ObjectMeta{Namespace: "ns1"}}}, + labels: []string{"region"}, + res: framework.Unschedulable, + }, + { + name: "service pod on different node, multiple labels, not all match", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: selector}}, + pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: selector}}}, + node: &node1, + services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector}}}, + labels: []string{"region", "zone"}, + res: framework.Unschedulable, + }, + { + name: "service pod on different node, multiple labels, all match", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: selector}}, + pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine5"}, ObjectMeta: metav1.ObjectMeta{Labels: selector}}}, + node: &node4, + services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector}}}, + labels: []string{"region", "zone"}, + res: framework.Success, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + nodes := []*v1.Node{&node1, &node2, &node3, &node4, &node5} + snapshot := nodeinfosnapshot.NewSnapshot(test.pods, nodes) + + predicate, precompute := predicates.NewServiceAffinityPredicate(snapshot.NodeInfos(), snapshot.Pods(), fakelisters.ServiceLister(test.services), test.labels) + predicates.RegisterPredicateMetadataProducer("ServiceAffinityMetaProducer", precompute) + + p := &ServiceAffinity{ + predicate: predicate, + } + + meta := predicates.GetPredicateMetadata(test.pod, snapshot) + state := framework.NewCycleState() + state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta}) + + status := p.Filter(context.Background(), state, test.pod, snapshot.NodeInfoMap[test.node.Name]) + if status.Code() != test.res { + t.Errorf("Status mismatch. got: %v, want: %v", status.Code(), test.res) + } + }) + } +}