Migrate CheckServiceAffinity custom predicate to Filter plugin

This commit is contained in:
danielqsj 2019-11-01 17:16:22 +08:00
parent 1c974109b6
commit c0bbc4ce82
9 changed files with 413 additions and 74 deletions

View File

@ -23,6 +23,7 @@ go_library(
"//pkg/scheduler/framework/plugins:go_default_library",
"//pkg/scheduler/framework/plugins/nodelabel:go_default_library",
"//pkg/scheduler/framework/plugins/requestedtocapacityratio:go_default_library",
"//pkg/scheduler/framework/plugins/serviceaffinity:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/cache/debugger:go_default_library",
@ -81,6 +82,7 @@ go_test(
"//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/framework/plugins:go_default_library",
"//pkg/scheduler/framework/plugins/nodelabel:go_default_library",
"//pkg/scheduler/framework/plugins/serviceaffinity:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/cache/fake:go_default_library",

View File

@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/requestedtocapacityratio"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/serviceaffinity"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
@ -263,60 +264,73 @@ func RegisterFitPredicateFactory(name string, predicateFactory FitPredicateFacto
// RegisterCustomFitPredicate registers a custom fit predicate with the algorithm registry.
// Returns the name, with which the predicate was registered.
func RegisterCustomFitPredicate(policy schedulerapi.PredicatePolicy, args *plugins.ConfigProducerArgs) string {
func RegisterCustomFitPredicate(policy schedulerapi.PredicatePolicy, pluginArgs *plugins.ConfigProducerArgs) string {
var predicateFactory FitPredicateFactory
var ok bool
name := policy.Name
policyName := policy.Name
validatePredicateOrDie(policy)
// generate the predicate function, if a custom type is requested
if policy.Argument != nil {
if policy.Argument.ServiceAffinity != nil {
// We use the ServiceAffinity plugin name for all ServiceAffinity custom priorities.
// It may get called multiple times but we essentially only register one instance of ServiceAffinity predicate.
// This name is then used to find the registered plugin and run the plugin instead of the predicate.
policyName = predicates.CheckServiceAffinityPred
// map LabelsPresence policy to ConfigProducerArgs that's used to configure the ServiceAffinity plugin.
if pluginArgs.ServiceAffinityArgs == nil {
pluginArgs.ServiceAffinityArgs = &serviceaffinity.Args{}
}
pluginArgs.ServiceAffinityArgs.Labels = append(pluginArgs.ServiceAffinityArgs.Labels, policy.Argument.ServiceAffinity.Labels...)
predicateFactory = func(args PluginFactoryArgs) predicates.FitPredicate {
predicate, precomputationFunction := predicates.NewServiceAffinityPredicate(
args.NodeInfoLister,
args.PodLister,
args.ServiceLister,
policy.Argument.ServiceAffinity.Labels,
pluginArgs.ServiceAffinityArgs.Labels,
)
// Once we generate the predicate we should also Register the Precomputation
predicates.RegisterPredicateMetadataProducer(policy.Name, precomputationFunction)
predicates.RegisterPredicateMetadataProducer(policyName, precomputationFunction)
return predicate
}
} else if policy.Argument.LabelsPresence != nil {
// map LabelPresence policy to ConfigProducerArgs that's used to configure the NodeLabel plugin.
if args.NodeLabelArgs == nil {
args.NodeLabelArgs = &nodelabel.Args{}
}
if policy.Argument.LabelsPresence.Presence {
args.NodeLabelArgs.PresentLabels = append(args.NodeLabelArgs.PresentLabels, policy.Argument.LabelsPresence.Labels...)
} else {
args.NodeLabelArgs.AbsentLabels = append(args.NodeLabelArgs.AbsentLabels, policy.Argument.LabelsPresence.Labels...)
}
predicateFactory = func(_ PluginFactoryArgs) predicates.FitPredicate {
return predicates.NewNodeLabelPredicate(
args.NodeLabelArgs.PresentLabels,
args.NodeLabelArgs.AbsentLabels,
)
}
// We use the NodeLabel plugin name for all NodeLabel custom priorities.
// It may get called multiple times but we essentially only register one instance of NodeLabel predicate.
// This name is then used to find the registered plugin and run the plugin instead of the predicate.
name = nodelabel.Name
policyName = predicates.CheckNodeLabelPresencePred
// map LabelPresence policy to ConfigProducerArgs that's used to configure the NodeLabel plugin.
if pluginArgs.NodeLabelArgs == nil {
pluginArgs.NodeLabelArgs = &nodelabel.Args{}
}
} else if predicateFactory, ok = fitPredicateMap[policy.Name]; ok {
if policy.Argument.LabelsPresence.Presence {
pluginArgs.NodeLabelArgs.PresentLabels = append(pluginArgs.NodeLabelArgs.PresentLabels, policy.Argument.LabelsPresence.Labels...)
} else {
pluginArgs.NodeLabelArgs.AbsentLabels = append(pluginArgs.NodeLabelArgs.AbsentLabels, policy.Argument.LabelsPresence.Labels...)
}
predicateFactory = func(_ PluginFactoryArgs) predicates.FitPredicate {
return predicates.NewNodeLabelPredicate(
pluginArgs.NodeLabelArgs.PresentLabels,
pluginArgs.NodeLabelArgs.AbsentLabels,
)
}
}
} else if predicateFactory, ok = fitPredicateMap[policyName]; ok {
// checking to see if a pre-defined predicate is requested
klog.V(2).Infof("Predicate type %s already registered, reusing.", policy.Name)
return name
klog.V(2).Infof("Predicate type %s already registered, reusing.", policyName)
return policyName
}
if predicateFactory == nil {
klog.Fatalf("Invalid configuration: Predicate type not found for %s", policy.Name)
klog.Fatalf("Invalid configuration: Predicate type not found for %s", policyName)
}
return RegisterFitPredicateFactory(name, predicateFactory)
return RegisterFitPredicateFactory(policyName, predicateFactory)
}
// IsFitPredicateRegistered is useful for testing providers.

View File

@ -98,7 +98,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}`,
wantPredicates: sets.NewString(
"PodFitsPorts",
"TestServiceAffinity",
),
wantPrioritizers: sets.NewString(
"ServiceSpreadingPriority",
@ -112,6 +111,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "VolumeRestrictions"},
{Name: "TaintToleration"},
{Name: "NodeLabel"},
{Name: "ServiceAffinity"},
},
"ScorePlugin": {
{Name: "NodeResourcesLeastAllocated", Weight: 1},
@ -144,9 +144,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{"name": "TestLabelPreference", "weight": 4, "argument": {"labelPreference": {"label": "bar", "presence":true}}}
]
}`,
wantPredicates: sets.NewString(
"TestServiceAffinity",
),
wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(
"EqualPriority",
"TestServiceAntiAffinity",
@ -161,6 +159,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "VolumeRestrictions"},
{Name: "TaintToleration"},
{Name: "NodeLabel"},
{Name: "ServiceAffinity"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2},
@ -200,9 +199,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{"name": "TestLabelPreference", "weight": 4, "argument": {"labelPreference": {"label": "bar", "presence":true}}}
]
}`,
wantPredicates: sets.NewString(
"TestServiceAffinity",
),
wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(
"EqualPriority",
"TestServiceAntiAffinity",
@ -216,11 +213,12 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodeResourcesFit"},
{Name: "VolumeRestrictions"},
{Name: "TaintToleration"},
{Name: "NodeLabel"},
{Name: "ServiceAffinity"},
{Name: "EBSLimits"},
{Name: "GCEPDLimits"},
{Name: "AzureDiskLimits"},
{Name: "VolumeZone"},
{Name: "NodeLabel"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2},
@ -265,9 +263,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{"name": "InterPodAffinityPriority", "weight": 2}
]
}`,
wantPredicates: sets.NewString(
"TestServiceAffinity",
),
wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(
"EqualPriority",
"InterPodAffinityPriority",
@ -281,12 +277,13 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodeResourcesFit"},
{Name: "VolumeRestrictions"},
{Name: "TaintToleration"},
{Name: "NodeLabel"},
{Name: "ServiceAffinity"},
{Name: "EBSLimits"},
{Name: "GCEPDLimits"},
{Name: "AzureDiskLimits"},
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
{Name: "NodeLabel"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2},
@ -333,9 +330,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{"name": "MostRequestedPriority", "weight": 2}
]
}`,
wantPredicates: sets.NewString(
"TestServiceAffinity",
),
wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(
"EqualPriority",
"InterPodAffinityPriority",
@ -349,12 +344,13 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodeResourcesFit"},
{Name: "VolumeRestrictions"},
{Name: "TaintToleration"},
{Name: "NodeLabel"},
{Name: "ServiceAffinity"},
{Name: "EBSLimits"},
{Name: "GCEPDLimits"},
{Name: "AzureDiskLimits"},
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
{Name: "NodeLabel"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2},
@ -412,9 +408,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"nodeCacheCapable": true
}]
}`,
wantPredicates: sets.NewString(
"TestServiceAffinity",
),
wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(
"EqualPriority",
"InterPodAffinityPriority",
@ -428,12 +422,13 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodeResourcesFit"},
{Name: "VolumeRestrictions"},
{Name: "TaintToleration"},
{Name: "NodeLabel"},
{Name: "ServiceAffinity"},
{Name: "EBSLimits"},
{Name: "GCEPDLimits"},
{Name: "AzureDiskLimits"},
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
{Name: "NodeLabel"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2},
@ -502,9 +497,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"nodeCacheCapable": true
}]
}`,
wantPredicates: sets.NewString(
"TestServiceAffinity",
),
wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(
"EqualPriority",
"InterPodAffinityPriority",
@ -518,12 +511,13 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodeResourcesFit"},
{Name: "VolumeRestrictions"},
{Name: "TaintToleration"},
{Name: "NodeLabel"},
{Name: "ServiceAffinity"},
{Name: "EBSLimits"},
{Name: "GCEPDLimits"},
{Name: "AzureDiskLimits"},
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
{Name: "NodeLabel"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2},
@ -593,9 +587,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"nodeCacheCapable": true
}]
}`,
wantPredicates: sets.NewString(
"TestServiceAffinity",
),
wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(
"EqualPriority",
"InterPodAffinityPriority",
@ -609,13 +601,14 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodeResourcesFit"},
{Name: "VolumeRestrictions"},
{Name: "TaintToleration"},
{Name: "NodeLabel"},
{Name: "ServiceAffinity"},
{Name: "EBSLimits"},
{Name: "GCEPDLimits"},
{Name: "AzureDiskLimits"},
{Name: "VolumeBinding"},
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
{Name: "NodeLabel"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2},
@ -688,9 +681,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"ignorable":true
}]
}`,
wantPredicates: sets.NewString(
"TestServiceAffinity",
),
wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(
"EqualPriority",
"InterPodAffinityPriority",
@ -704,13 +695,14 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodeResourcesFit"},
{Name: "VolumeRestrictions"},
{Name: "TaintToleration"},
{Name: "NodeLabel"},
{Name: "ServiceAffinity"},
{Name: "EBSLimits"},
{Name: "GCEPDLimits"},
{Name: "AzureDiskLimits"},
{Name: "VolumeBinding"},
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
{Name: "NodeLabel"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2},
@ -795,9 +787,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"ignorable":true
}]
}`,
wantPredicates: sets.NewString(
"TestServiceAffinity",
),
wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(
"EqualPriority",
"InterPodAffinityPriority",
@ -811,13 +801,14 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodeResourcesFit"},
{Name: "VolumeRestrictions"},
{Name: "TaintToleration"},
{Name: "NodeLabel"},
{Name: "ServiceAffinity"},
{Name: "EBSLimits"},
{Name: "GCEPDLimits"},
{Name: "AzureDiskLimits"},
{Name: "VolumeBinding"},
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
{Name: "NodeLabel"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2},
@ -904,9 +895,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"ignorable":true
}]
}`,
wantPredicates: sets.NewString(
"TestServiceAffinity",
),
wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(
"EqualPriority",
"InterPodAffinityPriority",
@ -920,6 +909,8 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodeResourcesFit"},
{Name: "VolumeRestrictions"},
{Name: "TaintToleration"},
{Name: "NodeLabel"},
{Name: "ServiceAffinity"},
{Name: "EBSLimits"},
{Name: "GCEPDLimits"},
{Name: "NodeVolumeLimits"},
@ -927,7 +918,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "VolumeBinding"},
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
{Name: "NodeLabel"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2},
@ -1013,9 +1003,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"ignorable":true
}]
}`,
wantPredicates: sets.NewString(
"TestServiceAffinity",
),
wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(
"EqualPriority",
"InterPodAffinityPriority",
@ -1029,6 +1017,8 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodeResourcesFit"},
{Name: "VolumeRestrictions"},
{Name: "TaintToleration"},
{Name: "NodeLabel"},
{Name: "ServiceAffinity"},
{Name: "EBSLimits"},
{Name: "GCEPDLimits"},
{Name: "NodeVolumeLimits"},
@ -1037,7 +1027,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "VolumeBinding"},
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
{Name: "NodeLabel"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2},
@ -1127,9 +1116,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"ignorable":true
}]
}`,
wantPredicates: sets.NewString(
"TestServiceAffinity",
),
wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(
"EqualPriority",
"InterPodAffinityPriority",
@ -1143,6 +1130,8 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodeResourcesFit"},
{Name: "VolumeRestrictions"},
{Name: "TaintToleration"},
{Name: "NodeLabel"},
{Name: "ServiceAffinity"},
{Name: "EBSLimits"},
{Name: "GCEPDLimits"},
{Name: "NodeVolumeLimits"},
@ -1151,7 +1140,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "VolumeBinding"},
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
{Name: "NodeLabel"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 2},
@ -1333,6 +1321,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
seenPriorities.Insert(scoreToPriorityMap[p.Name])
}
if diff := cmp.Diff(tc.wantPlugins, gotPlugins); diff != "" {
t.Errorf("unexpected plugins diff (-want, +got): %s", diff)
}

View File

@ -47,6 +47,7 @@ import (
extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1"
frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/serviceaffinity"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
@ -91,6 +92,7 @@ func TestCreateFromConfig(t *testing.T) {
"apiVersion" : "v1",
"predicates" : [
{"name" : "TestZoneAffinity", "argument" : {"serviceAffinity" : {"labels" : ["zone"]}}},
{"name" : "TestZoneAffinity", "argument" : {"serviceAffinity" : {"labels" : ["foo"]}}},
{"name" : "TestRequireZone", "argument" : {"labelsPresence" : {"labels" : ["zone"], "presence" : true}}},
{"name" : "TestNoFooLabel", "argument" : {"labelsPresence" : {"labels" : ["foo"], "presence" : false}}},
{"name" : "PredicateOne"},
@ -137,6 +139,21 @@ func TestCreateFromConfig(t *testing.T) {
if string(encoding) != want {
t.Errorf("Config for NodeLabel plugin mismatch. got: %v, want: %v", string(encoding), want)
}
// Verify that service affinity predicates are converted to framework plugins.
if _, ok := findPlugin(serviceaffinity.Name, "FilterPlugin", conf); !ok {
t.Fatalf("ServiceAffinity plugin not exist in framework.")
}
// Verify that the policy config is converted to plugin config for service affinity predicate.
serviceAffinityConfig := findPluginConfig(serviceaffinity.Name, conf)
encoding, err = json.Marshal(serviceAffinityConfig)
if err != nil {
t.Errorf("Failed to marshal %+v: %v", serviceAffinityConfig, err)
}
want = `{"Name":"ServiceAffinity","Args":{"labels":["zone","foo"]}}`
if string(encoding) != want {
t.Errorf("Config for ServiceAffinity plugin mismatch. got: %v, want: %v", string(encoding), want)
}
}
func findPlugin(name, extensionPoint string, schedConf *Config) (schedulerapi.Plugin, bool) {

View File

@ -22,6 +22,7 @@ go_library(
"//pkg/scheduler/framework/plugins/nodevolumelimits:go_default_library",
"//pkg/scheduler/framework/plugins/podtopologyspread:go_default_library",
"//pkg/scheduler/framework/plugins/requestedtocapacityratio:go_default_library",
"//pkg/scheduler/framework/plugins/serviceaffinity:go_default_library",
"//pkg/scheduler/framework/plugins/tainttoleration:go_default_library",
"//pkg/scheduler/framework/plugins/volumebinding:go_default_library",
"//pkg/scheduler/framework/plugins/volumerestrictions:go_default_library",
@ -59,6 +60,7 @@ filegroup(
"//pkg/scheduler/framework/plugins/nodevolumelimits:all-srcs",
"//pkg/scheduler/framework/plugins/podtopologyspread:all-srcs",
"//pkg/scheduler/framework/plugins/requestedtocapacityratio:all-srcs",
"//pkg/scheduler/framework/plugins/serviceaffinity:all-srcs",
"//pkg/scheduler/framework/plugins/tainttoleration:all-srcs",
"//pkg/scheduler/framework/plugins/volumebinding:all-srcs",
"//pkg/scheduler/framework/plugins/volumerestrictions:all-srcs",

View File

@ -38,6 +38,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/requestedtocapacityratio"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/serviceaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions"
@ -82,6 +83,7 @@ func NewDefaultRegistry(args *RegistryArgs) framework.Registry {
interpodaffinity.Name: interpodaffinity.New,
nodelabel.Name: nodelabel.New,
requestedtocapacityratio.Name: requestedtocapacityratio.New,
serviceaffinity.Name: serviceaffinity.New,
}
}
@ -95,6 +97,8 @@ type ConfigProducerArgs struct {
NodeLabelArgs *nodelabel.Args
// RequestedToCapacityRatioArgs is the args for the RequestedToCapacityRatio plugin.
RequestedToCapacityRatioArgs *requestedtocapacityratio.Args
// ServiceAffinityArgs is the args for the ServiceAffinity plugin.
ServiceAffinityArgs *serviceaffinity.Args
}
// ConfigProducer produces a framework's configuration.
@ -203,12 +207,18 @@ func NewDefaultConfigProducerRegistry() *ConfigProducerRegistry {
plugins.Filter = appendToPluginSet(plugins.Filter, podtopologyspread.Name, nil)
return
})
registry.RegisterPredicate(nodelabel.Name,
registry.RegisterPredicate(predicates.CheckNodeLabelPresencePred,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodelabel.Name, nil)
pluginConfig = append(pluginConfig, makePluginConfig(nodelabel.Name, args.NodeLabelArgs))
return
})
registry.RegisterPredicate(predicates.CheckServiceAffinityPred,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, serviceaffinity.Name, nil)
pluginConfig = append(pluginConfig, makePluginConfig(serviceaffinity.Name, args.ServiceAffinityArgs))
return
})
// Register Priorities.
registry.RegisterPriority(priorities.SelectorSpreadPriority,

View File

@ -0,0 +1,45 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["service_affinity.go"],
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/serviceaffinity",
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/framework/plugins/migration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["service_affinity_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/framework/plugins/migration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/listers/fake:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,79 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package serviceaffinity
import (
"context"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
// Name of this plugin.
const Name = "ServiceAffinity"
// Args holds the args that are used to configure the plugin.
type Args struct {
// Labels should be present for the node to be considered a fit for hosting the pod
Labels []string `json:"labels,omitempty"`
}
// New initializes a new plugin and returns it.
func New(plArgs *runtime.Unknown, handle framework.FrameworkHandle) (framework.Plugin, error) {
args := &Args{}
if err := framework.DecodeInto(plArgs, args); err != nil {
return nil, err
}
informerFactory := handle.SharedInformerFactory()
nodeInfoLister := handle.SnapshotSharedLister().NodeInfos()
podLister := handle.SnapshotSharedLister().Pods()
serviceLister := informerFactory.Core().V1().Services().Lister()
fitPredicate, predicateMetadataProducer := predicates.NewServiceAffinityPredicate(nodeInfoLister, podLister, serviceLister, args.Labels)
// Once we generate the predicate we should also Register the Precomputation
predicates.RegisterPredicateMetadataProducer(predicates.CheckServiceAffinityPred, predicateMetadataProducer)
return &ServiceAffinity{
predicate: fitPredicate,
}, nil
}
// ServiceAffinity is a plugin that checks service affinity.
type ServiceAffinity struct {
predicate predicates.FitPredicate
}
var _ framework.FilterPlugin = &ServiceAffinity{}
// Name returns name of the plugin. It is used in logs, etc.
func (pl *ServiceAffinity) Name() string {
return Name
}
// Filter invoked at the filter extension point.
func (pl *ServiceAffinity) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
meta, ok := migration.PredicateMetadata(cycleState).(predicates.PredicateMetadata)
if !ok {
return framework.NewStatus(framework.Error, "looking up PredicateMetadata")
}
_, reasons, err := pl.predicate(pod, meta, nodeInfo)
return migration.PredicateResultToFrameworkStatus(reasons, err)
}

View File

@ -0,0 +1,181 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package serviceaffinity
import (
"context"
"testing"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)
func TestServiceAffinity(t *testing.T) {
selector := map[string]string{"foo": "bar"}
labels1 := map[string]string{
"region": "r1",
"zone": "z11",
}
labels2 := map[string]string{
"region": "r1",
"zone": "z12",
}
labels3 := map[string]string{
"region": "r2",
"zone": "z21",
}
labels4 := map[string]string{
"region": "r2",
"zone": "z22",
}
node1 := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labels1}}
node2 := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labels2}}
node3 := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labels3}}
node4 := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine4", Labels: labels4}}
node5 := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine5", Labels: labels4}}
tests := []struct {
name string
pod *v1.Pod
pods []*v1.Pod
services []*v1.Service
node *v1.Node
labels []string
res framework.Code
}{
{
name: "nothing scheduled",
pod: new(v1.Pod),
node: &node1,
labels: []string{"region"},
res: framework.Success,
},
{
name: "pod with region label match",
pod: &v1.Pod{Spec: v1.PodSpec{NodeSelector: map[string]string{"region": "r1"}}},
node: &node1,
labels: []string{"region"},
res: framework.Success,
},
{
name: "pod with region label mismatch",
pod: &v1.Pod{Spec: v1.PodSpec{NodeSelector: map[string]string{"region": "r2"}}},
node: &node1,
labels: []string{"region"},
res: framework.Unschedulable,
},
{
name: "service pod on same node",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: selector}},
pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: selector}}},
node: &node1,
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector}}},
labels: []string{"region"},
res: framework.Success,
},
{
name: "service pod on different node, region match",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: selector}},
pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: selector}}},
node: &node1,
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector}}},
labels: []string{"region"},
res: framework.Success,
},
{
name: "service pod on different node, region mismatch",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: selector}},
pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine3"}, ObjectMeta: metav1.ObjectMeta{Labels: selector}}},
node: &node1,
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector}}},
labels: []string{"region"},
res: framework.Unschedulable,
},
{
name: "service in different namespace, region mismatch",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: selector, Namespace: "ns1"}},
pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine3"}, ObjectMeta: metav1.ObjectMeta{Labels: selector, Namespace: "ns1"}}},
node: &node1,
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector}, ObjectMeta: metav1.ObjectMeta{Namespace: "ns2"}}},
labels: []string{"region"},
res: framework.Success,
},
{
name: "pod in different namespace, region mismatch",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: selector, Namespace: "ns1"}},
pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine3"}, ObjectMeta: metav1.ObjectMeta{Labels: selector, Namespace: "ns2"}}},
node: &node1,
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector}, ObjectMeta: metav1.ObjectMeta{Namespace: "ns1"}}},
labels: []string{"region"},
res: framework.Success,
},
{
name: "service and pod in same namespace, region mismatch",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: selector, Namespace: "ns1"}},
pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine3"}, ObjectMeta: metav1.ObjectMeta{Labels: selector, Namespace: "ns1"}}},
node: &node1,
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector}, ObjectMeta: metav1.ObjectMeta{Namespace: "ns1"}}},
labels: []string{"region"},
res: framework.Unschedulable,
},
{
name: "service pod on different node, multiple labels, not all match",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: selector}},
pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: selector}}},
node: &node1,
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector}}},
labels: []string{"region", "zone"},
res: framework.Unschedulable,
},
{
name: "service pod on different node, multiple labels, all match",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: selector}},
pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine5"}, ObjectMeta: metav1.ObjectMeta{Labels: selector}}},
node: &node4,
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector}}},
labels: []string{"region", "zone"},
res: framework.Success,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodes := []*v1.Node{&node1, &node2, &node3, &node4, &node5}
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, nodes)
predicate, precompute := predicates.NewServiceAffinityPredicate(snapshot.NodeInfos(), snapshot.Pods(), fakelisters.ServiceLister(test.services), test.labels)
predicates.RegisterPredicateMetadataProducer("ServiceAffinityMetaProducer", precompute)
p := &ServiceAffinity{
predicate: predicate,
}
meta := predicates.GetPredicateMetadata(test.pod, snapshot)
state := framework.NewCycleState()
state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta})
status := p.Filter(context.Background(), state, test.pod, snapshot.NodeInfoMap[test.node.Name])
if status.Code() != test.res {
t.Errorf("Status mismatch. got: %v, want: %v", status.Code(), test.res)
}
})
}
}