From c7fef196b60856420ce3f0470acd1093ab1d9b5f Mon Sep 17 00:00:00 2001 From: Abdullah Gharaibeh Date: Thu, 4 Mar 2021 07:30:24 -0500 Subject: [PATCH] Implements pod affinity NamespaceSelector feature --- pkg/scheduler/eventhandlers_test.go | 9 +- pkg/scheduler/factory.go | 1 + pkg/scheduler/factory_test.go | 6 +- .../default_preemption_test.go | 13 +- .../framework/plugins/feature/feature.go | 24 ++ .../plugins/interpodaffinity/filtering.go | 164 ++++---- .../interpodaffinity/filtering_test.go | 351 +++++++++++++----- .../plugins/interpodaffinity/plugin.go | 60 ++- .../plugins/interpodaffinity/scoring.go | 83 +++-- .../plugins/interpodaffinity/scoring_test.go | 338 ++++++++++++++--- pkg/scheduler/framework/plugins/registry.go | 23 +- .../framework/plugins/testing/testing.go | 59 +++ pkg/scheduler/framework/types.go | 69 +++- pkg/scheduler/framework/types_test.go | 44 +++ .../internal/queue/scheduling_queue.go | 41 +- .../internal/queue/scheduling_queue_test.go | 57 +-- pkg/scheduler/internal/queue/testing.go | 46 +++ pkg/scheduler/scheduler_test.go | 4 +- pkg/scheduler/util/topologies.go | 81 ---- pkg/scheduler/util/topologies_test.go | 260 ------------- pkg/scheduler/util/utils.go | 28 -- 21 files changed, 1051 insertions(+), 710 deletions(-) create mode 100644 pkg/scheduler/framework/plugins/feature/feature.go create mode 100644 pkg/scheduler/framework/plugins/testing/testing.go create mode 100644 pkg/scheduler/internal/queue/testing.go delete mode 100644 pkg/scheduler/util/topologies.go delete mode 100644 pkg/scheduler/util/topologies_test.go diff --git a/pkg/scheduler/eventhandlers_test.go b/pkg/scheduler/eventhandlers_test.go index fc64e329a86..b0bc31844e2 100644 --- a/pkg/scheduler/eventhandlers_test.go +++ b/pkg/scheduler/eventhandlers_test.go @@ -17,6 +17,7 @@ limitations under the License. package scheduler import ( + "context" "reflect" "testing" "time" @@ -417,10 +418,10 @@ func TestUpdatePodInCache(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - stopCh := make(chan struct{}) - defer close(stopCh) - schedulerCache := cache.New(ttl, stopCh) - schedulerQueue := queue.NewPriorityQueue(nil) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + schedulerCache := cache.New(ttl, ctx.Done()) + schedulerQueue := queue.NewTestQueue(ctx, nil) sched := &Scheduler{ SchedulerCache: schedulerCache, SchedulingQueue: schedulerQueue, diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index 648abd45c67..05acc274ae1 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -153,6 +153,7 @@ func (c *Configurator) create() (*Scheduler, error) { lessFn := profiles[c.profiles[0].SchedulerName].QueueSortFunc() podQueue := internalqueue.NewSchedulingQueue( lessFn, + c.informerFactory, internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second), internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second), internalqueue.WithPodNominator(nominator), diff --git a/pkg/scheduler/factory_test.go b/pkg/scheduler/factory_test.go index 1aa9f7b771a..9615a980f40 100644 --- a/pkg/scheduler/factory_test.go +++ b/pkg/scheduler/factory_test.go @@ -452,7 +452,7 @@ func TestDefaultErrorFunc(t *testing.T) { // Need to add/update/delete testPod to the store. podInformer.Informer().GetStore().Add(testPod) - queue := internalqueue.NewPriorityQueue(nil, internalqueue.WithClock(clock.NewFakeClock(time.Now()))) + queue := internalqueue.NewPriorityQueue(nil, informerFactory, internalqueue.WithClock(clock.NewFakeClock(time.Now()))) schedulerCache := internalcache.New(30*time.Second, stopCh) queue.Add(testPod) @@ -526,7 +526,7 @@ func TestDefaultErrorFunc_NodeNotFound(t *testing.T) { // Need to add testPod to the store. podInformer.Informer().GetStore().Add(testPod) - queue := internalqueue.NewPriorityQueue(nil, internalqueue.WithClock(clock.NewFakeClock(time.Now()))) + queue := internalqueue.NewPriorityQueue(nil, informerFactory, internalqueue.WithClock(clock.NewFakeClock(time.Now()))) schedulerCache := internalcache.New(30*time.Second, stopCh) for i := range tt.nodes { @@ -567,7 +567,7 @@ func TestDefaultErrorFunc_PodAlreadyBound(t *testing.T) { // Need to add testPod to the store. podInformer.Informer().GetStore().Add(testPod) - queue := internalqueue.NewPriorityQueue(nil, internalqueue.WithClock(clock.NewFakeClock(time.Now()))) + queue := internalqueue.NewPriorityQueue(nil, informerFactory, internalqueue.WithClock(clock.NewFakeClock(time.Now()))) schedulerCache := internalcache.New(30*time.Second, stopCh) // Add node to schedulerCache no matter it's deleted in API server or not. diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go index 8dc8f529ed8..832e4f3a0ac 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go @@ -42,6 +42,7 @@ import ( configv1beta1 "k8s.io/kubernetes/pkg/scheduler/apis/config/v1beta1" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel" @@ -530,7 +531,9 @@ func TestDryRunPreemption(t *testing.T) { name: "pod with anti-affinity is preempted", registerPlugins: []st.RegisterPluginFunc{ st.RegisterPluginAsExtensions(noderesources.FitName, noderesources.NewFit, "Filter", "PreFilter"), - st.RegisterPluginAsExtensions(interpodaffinity.Name, interpodaffinity.New, "Filter", "PreFilter"), + st.RegisterPluginAsExtensions(interpodaffinity.Name, func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) { + return interpodaffinity.New(plArgs, fh, feature.Features{}) + }, "Filter", "PreFilter"), }, nodeNames: []string{"node1", "node2"}, testPods: []*v1.Pod{ @@ -991,7 +994,8 @@ func TestDryRunPreemption(t *testing.T) { st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), ) registeredPlugins = append(registeredPlugins, tt.registerPlugins...) - informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0) + objs := []runtime.Object{&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ""}}} + informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(objs...), 0) fwk, err := st.NewFramework( registeredPlugins, frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), @@ -1002,6 +1006,11 @@ func TestDryRunPreemption(t *testing.T) { t.Fatal(err) } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + nodeInfos, err := snapshot.NodeInfos().List() if err != nil { t.Fatal(err) diff --git a/pkg/scheduler/framework/plugins/feature/feature.go b/pkg/scheduler/framework/plugins/feature/feature.go new file mode 100644 index 00000000000..54f8d2ed88f --- /dev/null +++ b/pkg/scheduler/framework/plugins/feature/feature.go @@ -0,0 +1,24 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package feature + +// Features carries feature gate values used by various plugins. +// This struct allows us to break the dependency of the plugins on +// the internal k8s features pkg. +type Features struct { + EnablePodAffinityNamespaceSelector bool +} diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go b/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go index 43f5834683d..31e965efce8 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go @@ -21,11 +21,11 @@ import ( "fmt" "sync/atomic" - v1 "k8s.io/api/core/v1" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/internal/parallelize" - schedutil "k8s.io/kubernetes/pkg/scheduler/util" ) const ( @@ -46,13 +46,16 @@ const ( // preFilterState computed at PreFilter and used at Filter. type preFilterState struct { // A map of topology pairs to the number of existing pods that has anti-affinity terms that match the "pod". - topologyToMatchedExistingAntiAffinityTerms topologyToMatchedTermCount + existingAntiAffinityCounts topologyToMatchedTermCount // A map of topology pairs to the number of existing pods that match the affinity terms of the "pod". - topologyToMatchedAffinityTerms topologyToMatchedTermCount + affinityCounts topologyToMatchedTermCount // A map of topology pairs to the number of existing pods that match the anti-affinity terms of the "pod". - topologyToMatchedAntiAffinityTerms topologyToMatchedTermCount + antiAffinityCounts topologyToMatchedTermCount // podInfo of the incoming pod. podInfo *framework.PodInfo + // A copy of the incoming pod's namespace labels. + namespaceLabels labels.Set + enableNamespaceSelector bool } // Clone the prefilter state. @@ -62,27 +65,27 @@ func (s *preFilterState) Clone() framework.StateData { } copy := preFilterState{} - copy.topologyToMatchedAffinityTerms = s.topologyToMatchedAffinityTerms.clone() - copy.topologyToMatchedAntiAffinityTerms = s.topologyToMatchedAntiAffinityTerms.clone() - copy.topologyToMatchedExistingAntiAffinityTerms = s.topologyToMatchedExistingAntiAffinityTerms.clone() + copy.affinityCounts = s.affinityCounts.clone() + copy.antiAffinityCounts = s.antiAffinityCounts.clone() + copy.existingAntiAffinityCounts = s.existingAntiAffinityCounts.clone() // No need to deep copy the podInfo because it shouldn't change. copy.podInfo = s.podInfo - + copy.namespaceLabels = s.namespaceLabels + copy.enableNamespaceSelector = s.enableNamespaceSelector return © } // updateWithPod updates the preFilterState counters with the (anti)affinity matches for the given podInfo. -func (s *preFilterState) updateWithPod(updatedPodInfo *framework.PodInfo, node *v1.Node, multiplier int64) { +func (s *preFilterState) updateWithPod(pInfo *framework.PodInfo, node *v1.Node, multiplier int64) { if s == nil { return } - // Update matching existing anti-affinity terms. - s.topologyToMatchedExistingAntiAffinityTerms.updateWithAntiAffinityTerms(s.podInfo.Pod, node, updatedPodInfo.RequiredAntiAffinityTerms, multiplier) - - // Update matching incoming pod (anti)affinity terms. - s.topologyToMatchedAffinityTerms.updateWithAffinityTerms(updatedPodInfo.Pod, node, s.podInfo.RequiredAffinityTerms, multiplier) - s.topologyToMatchedAntiAffinityTerms.updateWithAntiAffinityTerms(updatedPodInfo.Pod, node, s.podInfo.RequiredAntiAffinityTerms, multiplier) + s.existingAntiAffinityCounts.updateWithAntiAffinityTerms(pInfo.RequiredAntiAffinityTerms, s.podInfo.Pod, s.namespaceLabels, node, multiplier, s.enableNamespaceSelector) + s.affinityCounts.updateWithAffinityTerms(s.podInfo.RequiredAffinityTerms, pInfo.Pod, node, multiplier, s.enableNamespaceSelector) + // The incoming pod's terms have the namespaceSelector merged into the namespaces, and so + // here we don't lookup the updated pod's namespace labels, hence passing nil for nsLabels. + s.antiAffinityCounts.updateWithAntiAffinityTerms(s.podInfo.RequiredAntiAffinityTerms, pInfo.Pod, nil, node, multiplier, s.enableNamespaceSelector) } type topologyPair struct { @@ -103,60 +106,58 @@ func (m topologyToMatchedTermCount) clone() topologyToMatchedTermCount { return copy } -// updateWithAffinityTerms updates the topologyToMatchedTermCount map with the specified value +func (m topologyToMatchedTermCount) update(node *v1.Node, tk string, value int64) { + if tv, ok := node.Labels[tk]; ok { + pair := topologyPair{key: tk, value: tv} + m[pair] += value + // value could be negative, hence we delete the entry if it is down to zero. + if m[pair] == 0 { + delete(m, pair) + } + } +} + +// updates the topologyToMatchedTermCount map with the specified value // for each affinity term if "targetPod" matches ALL terms. -func (m topologyToMatchedTermCount) updateWithAffinityTerms(targetPod *v1.Pod, targetPodNode *v1.Node, affinityTerms []framework.AffinityTerm, value int64) { - if podMatchesAllAffinityTerms(targetPod, affinityTerms) { - for _, t := range affinityTerms { - if topologyValue, ok := targetPodNode.Labels[t.TopologyKey]; ok { - pair := topologyPair{key: t.TopologyKey, value: topologyValue} - m[pair] += value - // value could be a negative value, hence we delete the entry if - // the entry is down to zero. - if m[pair] == 0 { - delete(m, pair) - } - } +func (m topologyToMatchedTermCount) updateWithAffinityTerms( + terms []framework.AffinityTerm, pod *v1.Pod, node *v1.Node, value int64, enableNamespaceSelector bool) { + if podMatchesAllAffinityTerms(terms, pod, enableNamespaceSelector) { + for _, t := range terms { + m.update(node, t.TopologyKey, value) } } } -// updateWithAntiAffinityTerms updates the topologyToMatchedTermCount map with the specified value +// updates the topologyToMatchedTermCount map with the specified value // for each anti-affinity term matched the target pod. -func (m topologyToMatchedTermCount) updateWithAntiAffinityTerms(targetPod *v1.Pod, targetPodNode *v1.Node, antiAffinityTerms []framework.AffinityTerm, value int64) { +func (m topologyToMatchedTermCount) updateWithAntiAffinityTerms(terms []framework.AffinityTerm, pod *v1.Pod, nsLabels labels.Set, node *v1.Node, value int64, enableNamespaceSelector bool) { // Check anti-affinity terms. - for _, a := range antiAffinityTerms { - if schedutil.PodMatchesTermsNamespaceAndSelector(targetPod, a.Namespaces, a.Selector) { - if topologyValue, ok := targetPodNode.Labels[a.TopologyKey]; ok { - pair := topologyPair{key: a.TopologyKey, value: topologyValue} - m[pair] += value - // value could be a negative value, hence we delete the entry if - // the entry is down to zero. - if m[pair] == 0 { - delete(m, pair) - } - } + for _, t := range terms { + if t.Matches(pod, nsLabels, enableNamespaceSelector) { + m.update(node, t.TopologyKey, value) } } } -// podMatchesAllAffinityTerms returns true IFF the given pod matches all the given terms. -func podMatchesAllAffinityTerms(pod *v1.Pod, terms []framework.AffinityTerm) bool { +// returns true IFF the given pod matches all the given terms. +func podMatchesAllAffinityTerms(terms []framework.AffinityTerm, pod *v1.Pod, enableNamespaceSelector bool) bool { if len(terms) == 0 { return false } - for _, term := range terms { - if !schedutil.PodMatchesTermsNamespaceAndSelector(pod, term.Namespaces, term.Selector) { + for _, t := range terms { + // The incoming pod NamespaceSelector was merged into the Namespaces set, and so + // we are not explicitly passing in namespace labels. + if !t.Matches(pod, nil, enableNamespaceSelector) { return false } } return true } -// getTPMapMatchingExistingAntiAffinity calculates the following for each existing pod on each node: +// calculates the following for each existing pod on each node: // (1) Whether it has PodAntiAffinity // (2) Whether any AffinityTerm matches the incoming pod -func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, nodes []*framework.NodeInfo) topologyToMatchedTermCount { +func getExistingAntiAffinityCounts(pod *v1.Pod, nsLabels labels.Set, nodes []*framework.NodeInfo, enableNamespaceSelector bool) topologyToMatchedTermCount { topoMaps := make([]topologyToMatchedTermCount, len(nodes)) index := int32(-1) processNode := func(i int) { @@ -168,7 +169,7 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, nodes []*framework.NodeIn } topoMap := make(topologyToMatchedTermCount) for _, existingPod := range nodeInfo.PodsWithRequiredAntiAffinity { - topoMap.updateWithAntiAffinityTerms(pod, node, existingPod.RequiredAntiAffinityTerms, 1) + topoMap.updateWithAntiAffinityTerms(existingPod.RequiredAntiAffinityTerms, pod, nsLabels, node, 1, enableNamespaceSelector) } if len(topoMap) != 0 { topoMaps[atomic.AddInt32(&index, 1)] = topoMap @@ -184,11 +185,11 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, nodes []*framework.NodeIn return result } -// getTPMapMatchingIncomingAffinityAntiAffinity finds existing Pods that match affinity terms of the given "pod". +// finds existing Pods that match affinity terms of the incoming pod's (anti)affinity terms. // It returns a topologyToMatchedTermCount that are checked later by the affinity // predicate. With this topologyToMatchedTermCount available, the affinity predicate does not // need to check all the pods in the cluster. -func getTPMapMatchingIncomingAffinityAntiAffinity(podInfo *framework.PodInfo, allNodes []*framework.NodeInfo) (topologyToMatchedTermCount, topologyToMatchedTermCount) { +func getIncomingAffinityAntiAffinityCounts(podInfo *framework.PodInfo, allNodes []*framework.NodeInfo, enableNamespaceSelector bool) (topologyToMatchedTermCount, topologyToMatchedTermCount) { affinityCounts := make(topologyToMatchedTermCount) antiAffinityCounts := make(topologyToMatchedTermCount) if len(podInfo.RequiredAffinityTerms) == 0 && len(podInfo.RequiredAntiAffinityTerms) == 0 { @@ -208,11 +209,10 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(podInfo *framework.PodInfo, al affinity := make(topologyToMatchedTermCount) antiAffinity := make(topologyToMatchedTermCount) for _, existingPod := range nodeInfo.Pods { - // Check affinity terms. - affinity.updateWithAffinityTerms(existingPod.Pod, node, podInfo.RequiredAffinityTerms, 1) - - // Check anti-affinity terms. - antiAffinity.updateWithAntiAffinityTerms(existingPod.Pod, node, podInfo.RequiredAntiAffinityTerms, 1) + affinity.updateWithAffinityTerms(podInfo.RequiredAffinityTerms, existingPod.Pod, node, 1, enableNamespaceSelector) + // The incoming pod's terms have the namespaceSelector merged into the namespaces, and so + // here we don't lookup the existing pod's namespace labels, hence passing nil for nsLabels. + antiAffinity.updateWithAntiAffinityTerms(podInfo.RequiredAntiAffinityTerms, existingPod.Pod, nil, node, 1, enableNamespaceSelector) } if len(affinity) > 0 || len(antiAffinity) > 0 { @@ -243,25 +243,32 @@ func (pl *InterPodAffinity) PreFilter(ctx context.Context, cycleState *framework return framework.AsStatus(fmt.Errorf("failed to list NodeInfos with pods with affinity: %w", err)) } - podInfo := framework.NewPodInfo(pod) - if podInfo.ParseError != nil { - return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("parsing pod: %+v", podInfo.ParseError)) - } - - // existingPodAntiAffinityMap will be used later for efficient check on existing pods' anti-affinity - existingPodAntiAffinityMap := getTPMapMatchingExistingAntiAffinity(pod, nodesWithRequiredAntiAffinityPods) - - // incomingPodAffinityMap will be used later for efficient check on incoming pod's affinity - // incomingPodAntiAffinityMap will be used later for efficient check on incoming pod's anti-affinity - incomingPodAffinityMap, incomingPodAntiAffinityMap := getTPMapMatchingIncomingAffinityAntiAffinity(podInfo, allNodes) - s := &preFilterState{ - topologyToMatchedAffinityTerms: incomingPodAffinityMap, - topologyToMatchedAntiAffinityTerms: incomingPodAntiAffinityMap, - topologyToMatchedExistingAntiAffinityTerms: existingPodAntiAffinityMap, - podInfo: podInfo, + enableNamespaceSelector: pl.enableNamespaceSelector, } + s.podInfo = framework.NewPodInfo(pod) + if s.podInfo.ParseError != nil { + return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("parsing pod: %+v", s.podInfo.ParseError)) + } + + if pl.enableNamespaceSelector { + for i := range s.podInfo.RequiredAffinityTerms { + if err := pl.mergeAffinityTermNamespacesIfNotEmpty(&s.podInfo.RequiredAffinityTerms[i]); err != nil { + return framework.AsStatus(err) + } + } + for i := range s.podInfo.RequiredAntiAffinityTerms { + if err := pl.mergeAffinityTermNamespacesIfNotEmpty(&s.podInfo.RequiredAntiAffinityTerms[i]); err != nil { + return framework.AsStatus(err) + } + } + s.namespaceLabels = GetNamespaceLabelsSnapshot(pod.Namespace, pl.nsLister) + } + + s.existingAntiAffinityCounts = getExistingAntiAffinityCounts(pod, s.namespaceLabels, nodesWithRequiredAntiAffinityPods, pl.enableNamespaceSelector) + s.affinityCounts, s.antiAffinityCounts = getIncomingAffinityAntiAffinityCounts(s.podInfo, allNodes, pl.enableNamespaceSelector) + cycleState.Write(preFilterStateKey, s) return nil } @@ -308,12 +315,12 @@ func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error // Checks if scheduling the pod onto this node would break any anti-affinity // terms indicated by the existing pods. func satisfyExistingPodsAntiAffinity(state *preFilterState, nodeInfo *framework.NodeInfo) bool { - if len(state.topologyToMatchedExistingAntiAffinityTerms) > 0 { + if len(state.existingAntiAffinityCounts) > 0 { // Iterate over topology pairs to get any of the pods being affected by // the scheduled pod anti-affinity terms for topologyKey, topologyValue := range nodeInfo.Node().Labels { tp := topologyPair{key: topologyKey, value: topologyValue} - if state.topologyToMatchedExistingAntiAffinityTerms[tp] > 0 { + if state.existingAntiAffinityCounts[tp] > 0 { return false } } @@ -323,11 +330,11 @@ func satisfyExistingPodsAntiAffinity(state *preFilterState, nodeInfo *framework. // Checks if the node satisfies the incoming pod's anti-affinity rules. func satisfyPodAntiAffinity(state *preFilterState, nodeInfo *framework.NodeInfo) bool { - if len(state.topologyToMatchedAntiAffinityTerms) > 0 { + if len(state.antiAffinityCounts) > 0 { for _, term := range state.podInfo.RequiredAntiAffinityTerms { if topologyValue, ok := nodeInfo.Node().Labels[term.TopologyKey]; ok { tp := topologyPair{key: term.TopologyKey, value: topologyValue} - if state.topologyToMatchedAntiAffinityTerms[tp] > 0 { + if state.antiAffinityCounts[tp] > 0 { return false } } @@ -342,7 +349,7 @@ func satisfyPodAffinity(state *preFilterState, nodeInfo *framework.NodeInfo) boo for _, term := range state.podInfo.RequiredAffinityTerms { if topologyValue, ok := nodeInfo.Node().Labels[term.TopologyKey]; ok { tp := topologyPair{key: term.TopologyKey, value: topologyValue} - if state.topologyToMatchedAffinityTerms[tp] <= 0 { + if state.affinityCounts[tp] <= 0 { podsExist = false } } else { @@ -357,8 +364,7 @@ func satisfyPodAffinity(state *preFilterState, nodeInfo *framework.NodeInfo) boo // in the cluster matches the namespace and selector of this pod, the pod matches // its own terms, and the node has all the requested topologies, then we allow the pod // to pass the affinity check. - podInfo := state.podInfo - if len(state.topologyToMatchedAffinityTerms) == 0 && podMatchesAllAffinityTerms(podInfo.Pod, podInfo.RequiredAffinityTerms) { + if len(state.affinityCounts) == 0 && podMatchesAllAffinityTerms(state.podInfo.RequiredAffinityTerms, state.podInfo.Pod, state.enableNamespaceSelector) { return true } return false diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/filtering_test.go b/pkg/scheduler/framework/plugins/interpodaffinity/filtering_test.go index cc9e5873215..79b746f922a 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/filtering_test.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/filtering_test.go @@ -24,7 +24,11 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" + plugintesting "k8s.io/kubernetes/pkg/scheduler/framework/plugins/testing" "k8s.io/kubernetes/pkg/scheduler/internal/cache" ) @@ -62,18 +66,20 @@ func TestRequiredAffinitySingleNode(t *testing.T) { podLabel2 := map[string]string{"security": "S1"} node1 := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labels1}} tests := []struct { - pod *v1.Pod - pods []*v1.Pod - node *v1.Node - name string - wantStatus *framework.Status + pod *v1.Pod + pods []*v1.Pod + node *v1.Node + name string + wantStatus *framework.Status + disableNSSelector bool }{ { + name: "A pod that has no required pod affinity scheduling rules can schedule onto a node with no existing pods", pod: new(v1.Pod), node: &node1, - name: "A pod that has no required pod affinity scheduling rules can schedule onto a node with no existing pods", }, { + name: "satisfies with requiredDuringSchedulingIgnoredDuringExecution in PodAffinity using In operator that matches the existing pod", pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel2, []v1.PodAffinityTerm{ { @@ -91,9 +97,9 @@ func TestRequiredAffinitySingleNode(t *testing.T) { }, nil), pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabel}}}, node: &node1, - name: "satisfies with requiredDuringSchedulingIgnoredDuringExecution in PodAffinity using In operator that matches the existing pod", }, { + name: "satisfies the pod with requiredDuringSchedulingIgnoredDuringExecution in PodAffinity using not in operator in labelSelector that matches the existing pod", pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel2, []v1.PodAffinityTerm{ { @@ -111,9 +117,9 @@ func TestRequiredAffinitySingleNode(t *testing.T) { }, nil), pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabel}}}, node: &node1, - name: "satisfies the pod with requiredDuringSchedulingIgnoredDuringExecution in PodAffinity using not in operator in labelSelector that matches the existing pod", }, { + name: "Does not satisfy the PodAffinity with labelSelector because of diff Namespace", pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel2, []v1.PodAffinityTerm{ { @@ -131,7 +137,6 @@ func TestRequiredAffinitySingleNode(t *testing.T) { }, nil), pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabel, Namespace: "ns"}}}, node: &node1, - name: "Does not satisfy the PodAffinity with labelSelector because of diff Namespace", wantStatus: framework.NewStatus( framework.UnschedulableAndUnresolvable, ErrReasonAffinityNotMatch, @@ -139,6 +144,7 @@ func TestRequiredAffinitySingleNode(t *testing.T) { ), }, { + name: "Doesn't satisfy the PodAffinity because of unmatching labelSelector with the existing pod", pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel, []v1.PodAffinityTerm{ { @@ -155,7 +161,6 @@ func TestRequiredAffinitySingleNode(t *testing.T) { }, nil), pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabel}}}, node: &node1, - name: "Doesn't satisfy the PodAffinity because of unmatching labelSelector with the existing pod", wantStatus: framework.NewStatus( framework.UnschedulableAndUnresolvable, ErrReasonAffinityNotMatch, @@ -163,6 +168,7 @@ func TestRequiredAffinitySingleNode(t *testing.T) { ), }, { + name: "satisfies the PodAffinity with different label Operators in multiple RequiredDuringSchedulingIgnoredDuringExecution ", pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel2, []v1.PodAffinityTerm{ { @@ -197,9 +203,9 @@ func TestRequiredAffinitySingleNode(t *testing.T) { }, nil), pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabel}}}, node: &node1, - name: "satisfies the PodAffinity with different label Operators in multiple RequiredDuringSchedulingIgnoredDuringExecution ", }, { + name: "The labelSelector requirements(items of matchExpressions) are ANDed, the pod cannot schedule onto the node because one of the matchExpression item don't match.", pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel2, []v1.PodAffinityTerm{ { @@ -234,7 +240,6 @@ func TestRequiredAffinitySingleNode(t *testing.T) { }, nil), pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabel}}}, node: &node1, - name: "The labelSelector requirements(items of matchExpressions) are ANDed, the pod cannot schedule onto the node because one of the matchExpression item don't match.", wantStatus: framework.NewStatus( framework.UnschedulableAndUnresolvable, ErrReasonAffinityNotMatch, @@ -242,6 +247,7 @@ func TestRequiredAffinitySingleNode(t *testing.T) { ), }, { + name: "satisfies the PodAffinity and PodAntiAffinity with the existing pod", pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel2, []v1.PodAffinityTerm{ { @@ -273,9 +279,9 @@ func TestRequiredAffinitySingleNode(t *testing.T) { }), pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabel}}}, node: &node1, - name: "satisfies the PodAffinity and PodAntiAffinity with the existing pod", }, { + name: "satisfies the PodAffinity and PodAntiAffinity and PodAntiAffinity symmetry with the existing pod", pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel2, []v1.PodAffinityTerm{ { @@ -323,9 +329,9 @@ func TestRequiredAffinitySingleNode(t *testing.T) { }), }, node: &node1, - name: "satisfies the PodAffinity and PodAntiAffinity and PodAntiAffinity symmetry with the existing pod", }, { + name: "satisfies the PodAffinity but doesn't satisfy the PodAntiAffinity with the existing pod", pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel2, []v1.PodAffinityTerm{ { @@ -357,7 +363,6 @@ func TestRequiredAffinitySingleNode(t *testing.T) { }), pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabel}}}, node: &node1, - name: "satisfies the PodAffinity but doesn't satisfy the PodAntiAffinity with the existing pod", wantStatus: framework.NewStatus( framework.Unschedulable, ErrReasonAffinityNotMatch, @@ -365,6 +370,7 @@ func TestRequiredAffinitySingleNode(t *testing.T) { ), }, { + name: "satisfies the PodAffinity and PodAntiAffinity but doesn't satisfy PodAntiAffinity symmetry with the existing pod", pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel, []v1.PodAffinityTerm{ { @@ -412,7 +418,6 @@ func TestRequiredAffinitySingleNode(t *testing.T) { }), }, node: &node1, - name: "satisfies the PodAffinity and PodAntiAffinity but doesn't satisfy PodAntiAffinity symmetry with the existing pod", wantStatus: framework.NewStatus( framework.Unschedulable, ErrReasonAffinityNotMatch, @@ -420,6 +425,7 @@ func TestRequiredAffinitySingleNode(t *testing.T) { ), }, { + name: "pod matches its own Label in PodAffinity and that matches the existing pod Labels", pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel, []v1.PodAffinityTerm{ { @@ -437,7 +443,6 @@ func TestRequiredAffinitySingleNode(t *testing.T) { }, nil), pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabel}}}, node: &node1, - name: "pod matches its own Label in PodAffinity and that matches the existing pod Labels", wantStatus: framework.NewStatus( framework.UnschedulableAndUnresolvable, ErrReasonAffinityNotMatch, @@ -445,6 +450,7 @@ func TestRequiredAffinitySingleNode(t *testing.T) { ), }, { + name: "verify that PodAntiAffinity from existing pod is respected when pod has no AntiAffinity constraints. doesn't satisfy PodAntiAffinity symmetry with the existing pod", pod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Labels: podLabel, @@ -468,7 +474,6 @@ func TestRequiredAffinitySingleNode(t *testing.T) { }), }, node: &node1, - name: "verify that PodAntiAffinity from existing pod is respected when pod has no AntiAffinity constraints. doesn't satisfy PodAntiAffinity symmetry with the existing pod", wantStatus: framework.NewStatus( framework.Unschedulable, ErrReasonAffinityNotMatch, @@ -476,6 +481,7 @@ func TestRequiredAffinitySingleNode(t *testing.T) { ), }, { + name: "verify that PodAntiAffinity from existing pod is respected when pod has no AntiAffinity constraints. satisfy PodAntiAffinity symmetry with the existing pod", pod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Labels: podLabel, @@ -499,52 +505,51 @@ func TestRequiredAffinitySingleNode(t *testing.T) { }), }, node: &node1, - name: "verify that PodAntiAffinity from existing pod is respected when pod has no AntiAffinity constraints. satisfy PodAntiAffinity symmetry with the existing pod", }, { - pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel, nil, - []v1.PodAffinityTerm{ - { - LabelSelector: &metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: "service", - Operator: metav1.LabelSelectorOpExists, - }, - }, - }, - TopologyKey: "region", - }, - { - LabelSelector: &metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: "security", - Operator: metav1.LabelSelectorOpExists, - }, - }, - }, - TopologyKey: "region", - }, - }), - pods: []*v1.Pod{ - createPodWithAffinityTerms(defaultNamespace, "machine1", podLabel2, nil, - []v1.PodAffinityTerm{ - { - LabelSelector: &metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: "security", - Operator: metav1.LabelSelectorOpExists, - }, - }, - }, - TopologyKey: "zone", - }, - }), - }, - node: &node1, name: "satisfies the PodAntiAffinity with existing pod but doesn't satisfy PodAntiAffinity symmetry with incoming pod", + pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel, nil, + []v1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "service", + Operator: metav1.LabelSelectorOpExists, + }, + }, + }, + TopologyKey: "region", + }, + { + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "security", + Operator: metav1.LabelSelectorOpExists, + }, + }, + }, + TopologyKey: "region", + }, + }), + pods: []*v1.Pod{ + createPodWithAffinityTerms(defaultNamespace, "machine1", podLabel2, nil, + []v1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "security", + Operator: metav1.LabelSelectorOpExists, + }, + }, + }, + TopologyKey: "zone", + }, + }), + }, + node: &node1, wantStatus: framework.NewStatus( framework.Unschedulable, ErrReasonAffinityNotMatch, @@ -552,6 +557,7 @@ func TestRequiredAffinitySingleNode(t *testing.T) { ), }, { + name: "PodAntiAffinity symmetry check a1: incoming pod and existing pod partially match each other on AffinityTerms", pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel, nil, []v1.PodAffinityTerm{ { @@ -599,9 +605,9 @@ func TestRequiredAffinitySingleNode(t *testing.T) { ErrReasonAffinityNotMatch, ErrReasonAntiAffinityRulesNotMatch, ), - name: "PodAntiAffinity symmetry check a1: incoming pod and existing pod partially match each other on AffinityTerms", }, { + name: "PodAntiAffinity symmetry check a2: incoming pod and existing pod partially match each other on AffinityTerms", pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel2, nil, []v1.PodAffinityTerm{ { @@ -649,9 +655,9 @@ func TestRequiredAffinitySingleNode(t *testing.T) { ErrReasonAffinityNotMatch, ErrReasonExistingAntiAffinityRulesNotMatch, ), - name: "PodAntiAffinity symmetry check a2: incoming pod and existing pod partially match each other on AffinityTerms", }, { + name: "PodAntiAffinity symmetry check b1: incoming pod and existing pod partially match each other on AffinityTerms", pod: createPodWithAffinityTerms(defaultNamespace, "", map[string]string{"abc": "", "xyz": ""}, nil, []v1.PodAffinityTerm{ { @@ -710,9 +716,9 @@ func TestRequiredAffinitySingleNode(t *testing.T) { ErrReasonAffinityNotMatch, ErrReasonAntiAffinityRulesNotMatch, ), - name: "PodAntiAffinity symmetry check b1: incoming pod and existing pod partially match each other on AffinityTerms", }, { + name: "PodAntiAffinity symmetry check b2: incoming pod and existing pod partially match each other on AffinityTerms", pod: createPodWithAffinityTerms(defaultNamespace, "", map[string]string{"def": "", "xyz": ""}, nil, []v1.PodAffinityTerm{ { @@ -771,7 +777,6 @@ func TestRequiredAffinitySingleNode(t *testing.T) { ErrReasonAffinityNotMatch, ErrReasonAntiAffinityRulesNotMatch, ), - name: "PodAntiAffinity symmetry check b2: incoming pod and existing pod partially match each other on AffinityTerms", }, { name: "PodAffinity fails PreFilter with an invalid affinity label syntax", @@ -847,23 +852,179 @@ func TestRequiredAffinitySingleNode(t *testing.T) { `Invalid value: "{{.bad-value.}}"`, ), }, + { + name: "affinity with NamespaceSelector", + pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel2, + []v1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "service", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"securityscan", "value2"}, + }, + }, + }, + NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "team", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"team1"}, + }, + }, + }, + TopologyKey: "region", + }, + }, nil), + pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabel}}}, + node: &node1, + }, + { + name: "affinity with non-matching NamespaceSelector", + pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel2, + []v1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "service", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"securityscan", "value2"}, + }, + }, + }, + NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "team", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"team1"}, + }, + }, + }, + TopologyKey: "region", + }, + }, nil), + pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team2", Labels: podLabel}}}, + node: &node1, + wantStatus: framework.NewStatus( + framework.UnschedulableAndUnresolvable, + ErrReasonAffinityNotMatch, + ErrReasonAffinityRulesNotMatch, + ), + }, + { + name: "anti-affinity with matching NamespaceSelector", + pod: createPodWithAffinityTerms("subteam1.team1", "", podLabel2, nil, + []v1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "service", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"securityscan", "value2"}, + }, + }, + }, + NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "team", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"team1"}, + }, + }, + }, + TopologyKey: "zone", + }, + }), + pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam2.team1", Labels: podLabel}}}, + node: &node1, + wantStatus: framework.NewStatus( + framework.Unschedulable, + ErrReasonAffinityNotMatch, + ErrReasonAntiAffinityRulesNotMatch, + ), + }, + { + name: "anti-affinity with matching all NamespaceSelector", + pod: createPodWithAffinityTerms("subteam1.team1", "", podLabel2, nil, + []v1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "service", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"securityscan", "value2"}, + }, + }, + }, + NamespaceSelector: &metav1.LabelSelector{}, + TopologyKey: "zone", + }, + }), + pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam2.team1", Labels: podLabel}}}, + node: &node1, + wantStatus: framework.NewStatus( + framework.Unschedulable, + ErrReasonAffinityNotMatch, + ErrReasonAntiAffinityRulesNotMatch, + ), + }, + { + name: "anti-affinity with non-matching NamespaceSelector", + pod: createPodWithAffinityTerms("subteam1.team1", "", podLabel2, nil, + []v1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "service", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"securityscan", "value2"}, + }, + }, + }, + NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "team", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"team1"}, + }, + }, + }, + TopologyKey: "zone", + }, + }), + pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team2", Labels: podLabel}}}, + node: &node1, + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + ctx := context.Background() snapshot := cache.NewSnapshot(test.pods, []*v1.Node{test.node}) - p := &InterPodAffinity{ - sharedLister: snapshot, + n := func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) { + return New(plArgs, fh, feature.Features{ + EnablePodAffinityNamespaceSelector: !test.disableNSSelector, + }) } + p := plugintesting.SetupPlugin(ctx, t, n, &config.InterPodAffinityArgs{}, snapshot, namespaces) state := framework.NewCycleState() - preFilterStatus := p.PreFilter(context.Background(), state, test.pod) + preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, state, test.pod) if !preFilterStatus.IsSuccess() { if !strings.Contains(preFilterStatus.Message(), test.wantStatus.Message()) { t.Errorf("prefilter failed with status: %v", preFilterStatus) } } else { nodeInfo := mustGetNodeInfo(t, snapshot, test.node.Name) - gotStatus := p.Filter(context.Background(), state, test.pod, nodeInfo) + gotStatus := p.(framework.FilterPlugin).Filter(ctx, state, test.pod, nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } @@ -1438,7 +1599,10 @@ func TestRequiredAffinityMultipleNodes(t *testing.T) { }), pods: []*v1.Pod{ { - ObjectMeta: metav1.ObjectMeta{Name: "podA", Labels: map[string]string{"foo": "", "bar": ""}}, + ObjectMeta: metav1.ObjectMeta{ + Name: "podA", + Labels: map[string]string{"foo": "", "bar": ""}, + }, Spec: v1.PodSpec{ NodeName: "nodeA", }, @@ -1738,18 +1902,23 @@ func TestRequiredAffinityMultipleNodes(t *testing.T) { for indexTest, test := range tests { t.Run(test.name, func(t *testing.T) { + ctx := context.Background() snapshot := cache.NewSnapshot(test.pods, test.nodes) + n := func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) { + return New(plArgs, fh, feature.Features{}) + } + p := plugintesting.SetupPlugin(ctx, t, n, &config.InterPodAffinityArgs{}, snapshot, + []runtime.Object{ + &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "NS1"}}, + }) for indexNode, node := range test.nodes { - p := &InterPodAffinity{ - sharedLister: snapshot, - } state := framework.NewCycleState() - preFilterStatus := p.PreFilter(context.Background(), state, test.pod) + preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, state, test.pod) if !preFilterStatus.IsSuccess() { t.Errorf("prefilter failed with status: %v", preFilterStatus) } nodeInfo := mustGetNodeInfo(t, snapshot, node.Name) - gotStatus := p.Filter(context.Background(), state, test.pod, nodeInfo) + gotStatus := p.(framework.FilterPlugin).Filter(ctx, state, test.pod, nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatuses[indexNode]) { t.Errorf("index: %d status does not match: %v, want: %v", indexTest, gotStatus, test.wantStatuses[indexNode]) } @@ -1865,7 +2034,6 @@ func TestPreFilterStateAddRemovePod(t *testing.T) { addedPod *v1.Pod existingPods []*v1.Pod nodes []*v1.Node - services []*v1.Service expectedAntiAffinity topologyToMatchedTermCount expectedAffinity topologyToMatchedTermCount }{ @@ -1968,7 +2136,6 @@ func TestPreFilterStateAddRemovePod(t *testing.T) { }, }, }, - services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector1}}}, nodes: []*v1.Node{ {ObjectMeta: metav1.ObjectMeta{Name: "nodeA", Labels: label1}}, {ObjectMeta: metav1.ObjectMeta{Name: "nodeB", Labels: label2}}, @@ -2014,7 +2181,6 @@ func TestPreFilterStateAddRemovePod(t *testing.T) { }, }, }, - services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector1}}}, nodes: []*v1.Node{ {ObjectMeta: metav1.ObjectMeta{Name: "nodeA", Labels: label1}}, {ObjectMeta: metav1.ObjectMeta{Name: "nodeB", Labels: label2}}, @@ -2030,15 +2196,16 @@ func TestPreFilterStateAddRemovePod(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + ctx := context.Background() // getMeta creates predicate meta data given the list of pods. getState := func(pods []*v1.Pod) (*InterPodAffinity, *framework.CycleState, *preFilterState, *cache.Snapshot) { snapshot := cache.NewSnapshot(pods, test.nodes) - - p := &InterPodAffinity{ - sharedLister: snapshot, + n := func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) { + return New(plArgs, fh, feature.Features{}) } + p := plugintesting.SetupPlugin(ctx, t, n, &config.InterPodAffinityArgs{}, snapshot, nil) cycleState := framework.NewCycleState() - preFilterStatus := p.PreFilter(context.Background(), cycleState, test.pendingPod) + preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, cycleState, test.pendingPod) if !preFilterStatus.IsSuccess() { t.Errorf("prefilter failed with status: %v", preFilterStatus) } @@ -2048,7 +2215,7 @@ func TestPreFilterStateAddRemovePod(t *testing.T) { t.Errorf("failed to get preFilterState from cycleState: %v", err) } - return p, cycleState, state, snapshot + return p.(*InterPodAffinity), cycleState, state, snapshot } // allPodsState is the state produced when all pods, including test.addedPod are given to prefilter. @@ -2061,7 +2228,7 @@ func TestPreFilterStateAddRemovePod(t *testing.T) { // Add test.addedPod to state1 and verify it is equal to allPodsState. nodeInfo := mustGetNodeInfo(t, snapshot, test.addedPod.Spec.NodeName) - if err := ipa.AddPod(context.Background(), cycleState, test.pendingPod, framework.NewPodInfo(test.addedPod), nodeInfo); err != nil { + if err := ipa.AddPod(ctx, cycleState, test.pendingPod, framework.NewPodInfo(test.addedPod), nodeInfo); err != nil { t.Errorf("error adding pod to meta: %v", err) } @@ -2070,12 +2237,12 @@ func TestPreFilterStateAddRemovePod(t *testing.T) { t.Errorf("failed to get preFilterState from cycleState: %v", err) } - if !reflect.DeepEqual(newState.topologyToMatchedAntiAffinityTerms, test.expectedAntiAffinity) { - t.Errorf("State is not equal, got: %v, want: %v", newState.topologyToMatchedAntiAffinityTerms, test.expectedAntiAffinity) + if !reflect.DeepEqual(newState.antiAffinityCounts, test.expectedAntiAffinity) { + t.Errorf("State is not equal, got: %v, want: %v", newState.antiAffinityCounts, test.expectedAntiAffinity) } - if !reflect.DeepEqual(newState.topologyToMatchedAffinityTerms, test.expectedAffinity) { - t.Errorf("State is not equal, got: %v, want: %v", newState.topologyToMatchedAffinityTerms, test.expectedAffinity) + if !reflect.DeepEqual(newState.affinityCounts, test.expectedAffinity) { + t.Errorf("State is not equal, got: %v, want: %v", newState.affinityCounts, test.expectedAffinity) } if !reflect.DeepEqual(allPodsState, state) { @@ -2095,15 +2262,15 @@ func TestPreFilterStateAddRemovePod(t *testing.T) { func TestPreFilterStateClone(t *testing.T) { source := &preFilterState{ - topologyToMatchedExistingAntiAffinityTerms: topologyToMatchedTermCount{ + existingAntiAffinityCounts: topologyToMatchedTermCount{ {key: "name", value: "machine1"}: 1, {key: "name", value: "machine2"}: 1, }, - topologyToMatchedAffinityTerms: topologyToMatchedTermCount{ + affinityCounts: topologyToMatchedTermCount{ {key: "name", value: "nodeA"}: 1, {key: "name", value: "nodeC"}: 2, }, - topologyToMatchedAntiAffinityTerms: topologyToMatchedTermCount{ + antiAffinityCounts: topologyToMatchedTermCount{ {key: "name", value: "nodeN"}: 3, {key: "name", value: "nodeM"}: 1, }, @@ -2319,7 +2486,7 @@ func TestGetTPMapMatchingIncomingAffinityAntiAffinity(t *testing.T) { t.Run(tt.name, func(t *testing.T) { s := cache.NewSnapshot(tt.existingPods, tt.nodes) l, _ := s.NodeInfos().List() - gotAffinityPodsMap, gotAntiAffinityPodsMap := getTPMapMatchingIncomingAffinityAntiAffinity(framework.NewPodInfo(tt.pod), l) + gotAffinityPodsMap, gotAntiAffinityPodsMap := getIncomingAffinityAntiAffinityCounts(framework.NewPodInfo(tt.pod), l, true) if !reflect.DeepEqual(gotAffinityPodsMap, tt.wantAffinityPodsMap) { t.Errorf("getTPMapMatchingIncomingAffinityAntiAffinity() gotAffinityPodsMap = %#v, want %#v", gotAffinityPodsMap, tt.wantAffinityPodsMap) } diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go b/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go index ff64235a463..9928b260235 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go @@ -19,10 +19,14 @@ package interpodaffinity import ( "fmt" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + listersv1 "k8s.io/client-go/listers/core/v1" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config/validation" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" ) const ( @@ -37,8 +41,10 @@ var _ framework.ScorePlugin = &InterPodAffinity{} // InterPodAffinity is a plugin that checks inter pod affinity type InterPodAffinity struct { - args config.InterPodAffinityArgs - sharedLister framework.SharedLister + args config.InterPodAffinityArgs + sharedLister framework.SharedLister + nsLister listersv1.NamespaceLister + enableNamespaceSelector bool } // Name returns name of the plugin. It is used in logs, etc. @@ -47,7 +53,7 @@ func (pl *InterPodAffinity) Name() string { } // New initializes a new plugin and returns it. -func New(plArgs runtime.Object, h framework.Handle) (framework.Plugin, error) { +func New(plArgs runtime.Object, h framework.Handle, fts feature.Features) (framework.Plugin, error) { if h.SnapshotSharedLister() == nil { return nil, fmt.Errorf("SnapshotSharedlister is nil") } @@ -58,10 +64,16 @@ func New(plArgs runtime.Object, h framework.Handle) (framework.Plugin, error) { if err := validation.ValidateInterPodAffinityArgs(args); err != nil { return nil, err } - return &InterPodAffinity{ - args: args, - sharedLister: h.SnapshotSharedLister(), - }, nil + pl := &InterPodAffinity{ + args: args, + sharedLister: h.SnapshotSharedLister(), + enableNamespaceSelector: fts.EnablePodAffinityNamespaceSelector, + } + + if pl.enableNamespaceSelector { + pl.nsLister = h.SharedInformerFactory().Core().V1().Namespaces().Lister() + } + return pl, nil } func getArgs(obj runtime.Object) (config.InterPodAffinityArgs, error) { @@ -71,3 +83,37 @@ func getArgs(obj runtime.Object) (config.InterPodAffinityArgs, error) { } return *ptr, nil } + +// Updates Namespaces with the set of namespaces identified by NamespaceSelector. +// If successful, NamespaceSelector is set to nil. +// The assumption is that the term is for an incoming pod, in which case +// namespaceSelector is either unrolled into Namespaces (and so the selector +// is set to Nothing()) or is Empty(), which means match everything. Therefore, +// there when matching against this term, there is no need to lookup the existing +// pod's namespace labels to match them against term's namespaceSelector explicitly. +func (pl *InterPodAffinity) mergeAffinityTermNamespacesIfNotEmpty(at *framework.AffinityTerm) error { + if at.NamespaceSelector.Empty() { + return nil + } + ns, err := pl.nsLister.List(at.NamespaceSelector) + if err != nil { + return err + } + for _, n := range ns { + at.Namespaces.Insert(n.Name) + } + at.NamespaceSelector = labels.Nothing() + return nil +} + +// GetNamespaceLabelsSnapshot returns a snapshot of the labels associated with +// the namespace. +func GetNamespaceLabelsSnapshot(ns string, nsLister listersv1.NamespaceLister) (nsLabels labels.Set) { + podNS, err := nsLister.Get(ns) + if err == nil { + // Create and return snapshot of the labels. + return labels.Merge(podNS.Labels, nil) + } + klog.ErrorS(err, "getting namespace, assuming empty set of namespace labels", "namespace", ns) + return +} diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go b/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go index dd05918158f..5cb040f26cd 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go @@ -22,10 +22,10 @@ import ( "math" "sync/atomic" - v1 "k8s.io/api/core/v1" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/internal/parallelize" - schedutil "k8s.io/kubernetes/pkg/scheduler/util" ) // preScoreStateKey is the key in CycleState to InterPodAffinity pre-computed data for Scoring. @@ -37,6 +37,8 @@ type scoreMap map[string]map[string]int64 type preScoreState struct { topologyScore scoreMap podInfo *framework.PodInfo + // A copy of the incoming pod's namespace labels. + namespaceLabels labels.Set } // Clone implements the mandatory Clone interface. We don't really copy the data since @@ -45,30 +47,20 @@ func (s *preScoreState) Clone() framework.StateData { return s } -func (m scoreMap) processTerm( - term *framework.WeightedAffinityTerm, - podToCheck *v1.Pod, - fixedNode *v1.Node, - multiplier int, -) { - if len(fixedNode.Labels) == 0 { - return - } - - match := schedutil.PodMatchesTermsNamespaceAndSelector(podToCheck, term.Namespaces, term.Selector) - tpValue, tpValueExist := fixedNode.Labels[term.TopologyKey] - if match && tpValueExist { - if m[term.TopologyKey] == nil { - m[term.TopologyKey] = make(map[string]int64) +func (m scoreMap) processTerm(term *framework.AffinityTerm, weight int32, pod *v1.Pod, nsLabels labels.Set, node *v1.Node, multiplier int32, enableNamespaceSelector bool) { + if term.Matches(pod, nsLabels, enableNamespaceSelector) { + if tpValue, tpValueExist := node.Labels[term.TopologyKey]; tpValueExist { + if m[term.TopologyKey] == nil { + m[term.TopologyKey] = make(map[string]int64) + } + m[term.TopologyKey][tpValue] += int64(weight * multiplier) } - m[term.TopologyKey][tpValue] += int64(term.Weight * int32(multiplier)) } - return } -func (m scoreMap) processTerms(terms []framework.WeightedAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) { +func (m scoreMap) processTerms(terms []framework.WeightedAffinityTerm, pod *v1.Pod, nsLabels labels.Set, node *v1.Node, multiplier int32, enableNamespaceSelector bool) { for _, term := range terms { - m.processTerm(&term, podToCheck, fixedNode, multiplier) + m.processTerm(&term.AffinityTerm, term.Weight, pod, nsLabels, node, multiplier, enableNamespaceSelector) } } @@ -93,36 +85,42 @@ func (pl *InterPodAffinity) processExistingPod( topoScore scoreMap, ) { existingPodNode := existingPodNodeInfo.Node() + if len(existingPodNode.Labels) == 0 { + return + } // For every soft pod affinity term of , if matches the term, // increment for every node in the cluster with the same // value as that of `s node by the term`s weight. - topoScore.processTerms(state.podInfo.PreferredAffinityTerms, existingPod.Pod, existingPodNode, 1) + // Note that the incoming pod's terms have the namespaceSelector merged into the namespaces, and so + // here we don't lookup the existing pod's namespace labels, hence passing nil for nsLabels. + topoScore.processTerms(state.podInfo.PreferredAffinityTerms, existingPod.Pod, nil, existingPodNode, 1, pl.enableNamespaceSelector) // For every soft pod anti-affinity term of , if matches the term, // decrement for every node in the cluster with the same // value as that of `s node by the term`s weight. - topoScore.processTerms(state.podInfo.PreferredAntiAffinityTerms, existingPod.Pod, existingPodNode, -1) + // Note that the incoming pod's terms have the namespaceSelector merged into the namespaces, and so + // here we don't lookup the existing pod's namespace labels, hence passing nil for nsLabels. + topoScore.processTerms(state.podInfo.PreferredAntiAffinityTerms, existingPod.Pod, nil, existingPodNode, -1, pl.enableNamespaceSelector) // For every hard pod affinity term of , if matches the term, // increment for every node in the cluster with the same // value as that of 's node by the constant - if pl.args.HardPodAffinityWeight > 0 { - for _, term := range existingPod.RequiredAffinityTerms { - t := framework.WeightedAffinityTerm{AffinityTerm: term, Weight: pl.args.HardPodAffinityWeight} - topoScore.processTerm(&t, incomingPod, existingPodNode, 1) + if pl.args.HardPodAffinityWeight > 0 && len(existingPodNode.Labels) != 0 { + for _, t := range existingPod.RequiredAffinityTerms { + topoScore.processTerm(&t, pl.args.HardPodAffinityWeight, incomingPod, state.namespaceLabels, existingPodNode, 1, pl.enableNamespaceSelector) } } // For every soft pod affinity term of , if matches the term, // increment for every node in the cluster with the same // value as that of 's node by the term's weight. - topoScore.processTerms(existingPod.PreferredAffinityTerms, incomingPod, existingPodNode, 1) + topoScore.processTerms(existingPod.PreferredAffinityTerms, incomingPod, state.namespaceLabels, existingPodNode, 1, pl.enableNamespaceSelector) // For every soft pod anti-affinity term of , if matches the term, // decrement for every node in the cluster with the same // value as that of 's node by the term's weight. - topoScore.processTerms(existingPod.PreferredAntiAffinityTerms, incomingPod, existingPodNode, -1) + topoScore.processTerms(existingPod.PreferredAntiAffinityTerms, incomingPod, state.namespaceLabels, existingPodNode, -1, pl.enableNamespaceSelector) } // PreScore builds and writes cycle state used by Score and NormalizeScore. @@ -161,15 +159,28 @@ func (pl *InterPodAffinity) PreScore( } } - podInfo := framework.NewPodInfo(pod) - if podInfo.ParseError != nil { - // Ideally we never reach here, because errors will be caught by PreFilter - return framework.AsStatus(fmt.Errorf("failed to parse pod: %w", podInfo.ParseError)) - } - state := &preScoreState{ topologyScore: make(map[string]map[string]int64), - podInfo: podInfo, + } + + state.podInfo = framework.NewPodInfo(pod) + if state.podInfo.ParseError != nil { + // Ideally we never reach here, because errors will be caught by PreFilter + return framework.AsStatus(fmt.Errorf("failed to parse pod: %w", state.podInfo.ParseError)) + } + + if pl.enableNamespaceSelector { + for i := range state.podInfo.PreferredAffinityTerms { + if err := pl.mergeAffinityTermNamespacesIfNotEmpty(&state.podInfo.PreferredAffinityTerms[i].AffinityTerm); err != nil { + return framework.AsStatus(fmt.Errorf("updating PreferredAffinityTerms: %w", err)) + } + } + for i := range state.podInfo.PreferredAntiAffinityTerms { + if err := pl.mergeAffinityTermNamespacesIfNotEmpty(&state.podInfo.PreferredAntiAffinityTerms[i].AffinityTerm); err != nil { + return framework.AsStatus(fmt.Errorf("updating PreferredAntiAffinityTerms: %w", err)) + } + } + state.namespaceLabels = GetNamespaceLabelsSnapshot(pod.Namespace, pl.nsLister) } topoScores := make([]scoreMap, len(allNodes)) diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/scoring_test.go b/pkg/scheduler/framework/plugins/interpodaffinity/scoring_test.go index 116e2d33a20..78ef9c584c3 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/scoring_test.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/scoring_test.go @@ -22,14 +22,25 @@ import ( "strings" "testing" - v1 "k8s.io/api/core/v1" + "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" - "k8s.io/kubernetes/pkg/scheduler/framework/runtime" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" + plugintesting "k8s.io/kubernetes/pkg/scheduler/framework/plugins/testing" "k8s.io/kubernetes/pkg/scheduler/internal/cache" ) +var nsLabelT1 = map[string]string{"team": "team1"} +var nsLabelT2 = map[string]string{"team": "team2"} +var namespaces = []runtime.Object{ + &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "subteam1.team1", Labels: nsLabelT1}}, + &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "subteam2.team1", Labels: nsLabelT1}}, + &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "subteam1.team2", Labels: nsLabelT2}}, + &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "subteam2.team2", Labels: nsLabelT2}}, +} + func TestPreferredAffinity(t *testing.T) { labelRgChina := map[string]string{ "region": "China", @@ -253,6 +264,68 @@ func TestPreferredAffinity(t *testing.T) { }, } + affinityNamespaceSelector := &v1.Affinity{ + PodAffinity: &v1.PodAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{ + { + Weight: 5, + PodAffinityTerm: v1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "security", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"S1"}, + }, + }, + }, + TopologyKey: "region", + Namespaces: []string{"subteam2.team2"}, + NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "team", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"team1"}, + }, + }, + }, + }, + }, + }, + }, + } + antiAffinityNamespaceSelector := &v1.Affinity{ + PodAntiAffinity: &v1.PodAntiAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{ + { + Weight: 5, + PodAffinityTerm: v1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "security", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"S1"}, + }, + }, + }, + TopologyKey: "region", + Namespaces: []string{"subteam2.team2"}, + NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "team", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"team1"}, + }, + }, + }, + }, + }, + }, + }, + } invalidAffinityLabels := &v1.Affinity{ PodAffinity: &v1.PodAffinity{ PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{ @@ -297,27 +370,30 @@ func TestPreferredAffinity(t *testing.T) { } tests := []struct { - pod *v1.Pod - pods []*v1.Pod - nodes []*v1.Node - expectedList framework.NodeScoreList - name string - wantStatus *framework.Status + pod *v1.Pod + pods []*v1.Pod + nodes []*v1.Node + expectedList framework.NodeScoreList + name string + wantStatus *framework.Status + disableNSSelector bool }{ { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, + name: "all machines are same priority as Affinity is nil", + pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, nodes: []*v1.Node{ {ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}}, {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}}, {ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}}, }, expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: 0}, {Name: "machine3", Score: 0}}, - name: "all machines are same priority as Affinity is nil", }, // the node(machine1) that have the label {"region": "China"} (match the topology key) and that have existing pods that match the labelSelector get high score // the node(machine3) that don't have the label {"region": "whatever the value is"} (mismatch the topology key) but that have existing pods that match the labelSelector get low score // the node(machine2) that have the label {"region": "China"} (match the topology key) but that have existing pods that mismatch the labelSelector get low score { + name: "Affinity: pod that matches topology key & pods in nodes will get high score comparing to others" + + "which doesn't match either pods in nodes or in topology key", pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS1InRegion}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, pods: []*v1.Pod{ {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, @@ -330,15 +406,14 @@ func TestPreferredAffinity(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}}, }, expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 0}, {Name: "machine3", Score: 0}}, - name: "Affinity: pod that matches topology key & pods in nodes will get high score comparing to others" + - "which doesn't match either pods in nodes or in topology key", }, // the node1(machine1) that have the label {"region": "China"} (match the topology key) and that have existing pods that match the labelSelector get high score // the node2(machine2) that have the label {"region": "China"}, match the topology key and have the same label value with node1, get the same high score with node1 // the node3(machine3) that have the label {"region": "India"}, match the topology key but have a different label value, don't have existing pods that match the labelSelector, // get a low score. { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS1InRegion}}, + name: "All the nodes that have the same topology key & label value with one of them has an existing pod that match the affinity rules, have the same score", + pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS1InRegion}}, pods: []*v1.Pod{ {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, }, @@ -348,14 +423,14 @@ func TestPreferredAffinity(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelRgIndia}}, }, expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: framework.MaxNodeScore}, {Name: "machine3", Score: 0}}, - name: "All the nodes that have the same topology key & label value with one of them has an existing pod that match the affinity rules, have the same score", }, // there are 2 regions, say regionChina(machine1,machine3,machine4) and regionIndia(machine2,machine5), both regions have nodes that match the preference. // But there are more nodes(actually more existing pods) in regionChina that match the preference than regionIndia. // Then, nodes in regionChina get higher score than nodes in regionIndia, and all the nodes in regionChina should get a same score(high score), // while all the nodes in regionIndia should get another same score(low score). { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS2InRegion}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, + name: "Affinity: nodes in one region has more matching pods comparing to other region, so the region which has more matches will get high score", + pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS2InRegion}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, pods: []*v1.Pod{ {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, @@ -372,11 +447,11 @@ func TestPreferredAffinity(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "machine5", Labels: labelRgIndia}}, }, expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 0}, {Name: "machine3", Score: framework.MaxNodeScore}, {Name: "machine4", Score: framework.MaxNodeScore}, {Name: "machine5", Score: 0}}, - name: "Affinity: nodes in one region has more matching pods comparing to other region, so the region which has more matches will get high score", }, // Test with the different operators and values for pod affinity scheduling preference, including some match failures. { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: affinity3}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, + name: "Affinity: different Label operators and values for pod affinity scheduling preference, including some match failures ", + pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: affinity3}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, pods: []*v1.Pod{ {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, {Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, @@ -388,12 +463,12 @@ func TestPreferredAffinity(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}}, }, expectedList: []framework.NodeScore{{Name: "machine1", Score: 20}, {Name: "machine2", Score: framework.MaxNodeScore}, {Name: "machine3", Score: 0}}, - name: "Affinity: different Label operators and values for pod affinity scheduling preference, including some match failures ", }, // Test the symmetry cases for affinity, the difference between affinity and symmetry is not the pod wants to run together with some existing pods, // but the existing pods have the inter pod affinity preference while the pod to schedule satisfy the preference. { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, + name: "Affinity symmetry: considered only the preferredDuringSchedulingIgnoredDuringExecution in pod affinity symmetry", + pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, pods: []*v1.Pod{ {Spec: v1.PodSpec{NodeName: "machine1", Affinity: stayWithS1InRegion}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, {Spec: v1.PodSpec{NodeName: "machine2", Affinity: stayWithS2InRegion}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, @@ -404,10 +479,38 @@ func TestPreferredAffinity(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}}, }, expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: framework.MaxNodeScore}, {Name: "machine3", Score: 0}}, - name: "Affinity symmetry: considered only the preferredDuringSchedulingIgnoredDuringExecution in pod affinity symmetry", }, { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, + name: "Affinity symmetry with namespace selector", + pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelSecurityS1}}, + pods: []*v1.Pod{ + {Spec: v1.PodSpec{NodeName: "machine1", Affinity: affinityNamespaceSelector}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, + {Spec: v1.PodSpec{NodeName: "machine2", Affinity: stayWithS2InRegion}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, + }, + nodes: []*v1.Node{ + {ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}}, + {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}}, + {ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}}, + }, + expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 0}, {Name: "machine3", Score: 0}}, + }, + { + name: "AntiAffinity symmetry with namespace selector", + pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelSecurityS1}}, + pods: []*v1.Pod{ + {Spec: v1.PodSpec{NodeName: "machine1", Affinity: antiAffinityNamespaceSelector}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, + {Spec: v1.PodSpec{NodeName: "machine2", Affinity: stayWithS2InRegion}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, + }, + nodes: []*v1.Node{ + {ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}}, + {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}}, + {ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}}, + }, + expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: framework.MaxNodeScore}, {Name: "machine3", Score: framework.MaxNodeScore}}, + }, + { + name: "Affinity symmetry: considered RequiredDuringSchedulingIgnoredDuringExecution in pod affinity symmetry", + pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, pods: []*v1.Pod{ {Spec: v1.PodSpec{NodeName: "machine1", Affinity: hardAffinity}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, {Spec: v1.PodSpec{NodeName: "machine2", Affinity: hardAffinity}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, @@ -418,7 +521,6 @@ func TestPreferredAffinity(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}}, }, expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: framework.MaxNodeScore}, {Name: "machine3", Score: 0}}, - name: "Affinity symmetry: considered RequiredDuringSchedulingIgnoredDuringExecution in pod affinity symmetry", }, // The pod to schedule prefer to stay away from some existing pods at node level using the pod anti affinity. @@ -428,7 +530,8 @@ func TestPreferredAffinity(t *testing.T) { // there are 2 nodes, say node1 and node2, both nodes have pods that match the labelSelector and have topology-key in node.Labels. // But there are more pods on node1 that match the preference than node2. Then, node1 get a lower score than node2. { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: awayFromS1InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, + name: "Anti Affinity: pod that does not match existing pods in node will get high score ", + pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: awayFromS1InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, pods: []*v1.Pod{ {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, {Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, @@ -438,10 +541,10 @@ func TestPreferredAffinity(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgChina}}, }, expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: framework.MaxNodeScore}}, - name: "Anti Affinity: pod that does not match existing pods in node will get high score ", }, { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: awayFromS1InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, + name: "Anti Affinity: pod that does not match topology key & match the pods in nodes will get higher score comparing to others ", + pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: awayFromS1InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, pods: []*v1.Pod{ {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, {Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, @@ -451,10 +554,10 @@ func TestPreferredAffinity(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgChina}}, }, expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: framework.MaxNodeScore}}, - name: "Anti Affinity: pod that does not match topology key & match the pods in nodes will get higher score comparing to others ", }, { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: awayFromS1InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, + name: "Anti Affinity: one node has more matching pods comparing to other node, so the node which has more unmatches will get high score", + pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: awayFromS1InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, pods: []*v1.Pod{ {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, @@ -465,11 +568,11 @@ func TestPreferredAffinity(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}}, }, expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: framework.MaxNodeScore}}, - name: "Anti Affinity: one node has more matching pods comparing to other node, so the node which has more unmatches will get high score", }, // Test the symmetry cases for anti affinity { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, + name: "Anti Affinity symmetry: the existing pods in node which has anti affinity match will get high score", + pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, pods: []*v1.Pod{ {Spec: v1.PodSpec{NodeName: "machine1", Affinity: awayFromS2InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, {Spec: v1.PodSpec{NodeName: "machine2", Affinity: awayFromS1InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, @@ -479,11 +582,11 @@ func TestPreferredAffinity(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelAzAz2}}, }, expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: framework.MaxNodeScore}}, - name: "Anti Affinity symmetry: the existing pods in node which has anti affinity match will get high score", }, // Test both affinity and anti-affinity { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS1InRegionAwayFromS2InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, + name: "Affinity and Anti Affinity: considered only preferredDuringSchedulingIgnoredDuringExecution in both pod affinity & anti affinity", + pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS1InRegionAwayFromS2InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, pods: []*v1.Pod{ {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, {Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, @@ -493,14 +596,14 @@ func TestPreferredAffinity(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelAzAz1}}, }, expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 0}}, - name: "Affinity and Anti Affinity: considered only preferredDuringSchedulingIgnoredDuringExecution in both pod affinity & anti affinity", }, // Combined cases considering both affinity and anti-affinity, the pod to schedule and existing pods have the same labels (they are in the same RC/service), // the pod prefer to run together with its brother pods in the same region, but wants to stay away from them at node level, // so that all the pods of a RC/service can stay in a same region but trying to separate with each other // machine-1,machine-3,machine-4 are in ChinaRegion others machine-2,machine-5 are in IndiaRegion { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS1InRegionAwayFromS2InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, + name: "Affinity and Anti Affinity: considering both affinity and anti-affinity, the pod to schedule and existing pods have the same labels", + pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS1InRegionAwayFromS2InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, pods: []*v1.Pod{ {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, @@ -518,7 +621,6 @@ func TestPreferredAffinity(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "machine5", Labels: labelRgIndia}}, }, expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 0}, {Name: "machine3", Score: framework.MaxNodeScore}, {Name: "machine4", Score: framework.MaxNodeScore}, {Name: "machine5", Score: 0}}, - name: "Affinity and Anti Affinity: considering both affinity and anti-affinity, the pod to schedule and existing pods have the same labels", }, // Consider Affinity, Anti Affinity and symmetry together. // for Affinity, the weights are: 8, 0, 0, 0 @@ -526,7 +628,8 @@ func TestPreferredAffinity(t *testing.T) { // for Affinity symmetry, the weights are: 0, 0, 8, 0 // for Anti Affinity symmetry, the weights are: 0, 0, 0, -5 { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS1InRegionAwayFromS2InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, + name: "Affinity and Anti Affinity and symmetry: considered only preferredDuringSchedulingIgnoredDuringExecution in both pod affinity & anti affinity & symmetry", + pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS1InRegionAwayFromS2InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, pods: []*v1.Pod{ {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, {Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}}, @@ -540,13 +643,13 @@ func TestPreferredAffinity(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "machine4", Labels: labelAzAz2}}, }, expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 0}, {Name: "machine3", Score: framework.MaxNodeScore}, {Name: "machine4", Score: 0}}, - name: "Affinity and Anti Affinity and symmetry: considered only preferredDuringSchedulingIgnoredDuringExecution in both pod affinity & anti affinity & symmetry", }, // Cover https://github.com/kubernetes/kubernetes/issues/82796 which panics upon: // 1. Some nodes in a topology don't have pods with affinity, but other nodes in the same topology have. // 2. The incoming pod doesn't have affinity. { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, + name: "Avoid panic when partial nodes in a topology don't have pods with affinity", + pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, pods: []*v1.Pod{ {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}}, {Spec: v1.PodSpec{NodeName: "machine2", Affinity: stayWithS1InRegionAwayFromS2InAz}}, @@ -556,7 +659,6 @@ func TestPreferredAffinity(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgChina}}, }, expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: 0}}, - name: "Avoid panic when partial nodes in a topology don't have pods with affinity", }, { name: "invalid Affinity fails PreScore", @@ -576,19 +678,79 @@ func TestPreferredAffinity(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgChina}}, }, }, + { + name: "Affinity with pods matching NamespaceSelector", + pod: &v1.Pod{Spec: v1.PodSpec{Affinity: affinityNamespaceSelector}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelSecurityS1}}, + pods: []*v1.Pod{ + {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelSecurityS1}}, + {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelSecurityS1}}, + {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team2", Labels: podLabelSecurityS1}}, + {Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam2.team1", Labels: podLabelSecurityS1}}, + }, + nodes: []*v1.Node{ + {ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}}, + {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}}, + }, + expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 0}}, + }, + { + name: "Affinity with pods matching both NamespaceSelector and Namespaces fields", + pod: &v1.Pod{Spec: v1.PodSpec{Affinity: affinityNamespaceSelector}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelSecurityS1}}, + pods: []*v1.Pod{ + {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelSecurityS1}}, + {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelSecurityS1}}, + {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam2.team2", Labels: podLabelSecurityS1}}, + {Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam2.team1", Labels: podLabelSecurityS1}}, + }, + nodes: []*v1.Node{ + {ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}}, + {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}}, + }, + expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 0}}, + }, + { + name: "Affinity with pods matching NamespaceSelector", + pod: &v1.Pod{Spec: v1.PodSpec{Affinity: antiAffinityNamespaceSelector}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelSecurityS1}}, + pods: []*v1.Pod{ + {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelSecurityS1}}, + {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelSecurityS1}}, + {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team2", Labels: podLabelSecurityS1}}, + {Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam2.team1", Labels: podLabelSecurityS1}}, + }, + nodes: []*v1.Node{ + {ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}}, + {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}}, + }, + expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: framework.MaxNodeScore}}, + }, + { + name: "Affinity with pods matching both NamespaceSelector and Namespaces fields", + pod: &v1.Pod{Spec: v1.PodSpec{Affinity: antiAffinityNamespaceSelector}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelSecurityS1}}, + pods: []*v1.Pod{ + {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelSecurityS1}}, + {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelSecurityS1}}, + {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam2.team2", Labels: podLabelSecurityS1}}, + {Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam2.team1", Labels: podLabelSecurityS1}}, + }, + nodes: []*v1.Node{ + {ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}}, + {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}}, + }, + expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: framework.MaxNodeScore}}, + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + ctx := context.Background() state := framework.NewCycleState() snapshot := cache.NewSnapshot(test.pods, test.nodes) - p := &InterPodAffinity{ - args: config.InterPodAffinityArgs{ - HardPodAffinityWeight: 1, - }, - sharedLister: snapshot, + n := func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) { + return New(plArgs, fh, feature.Features{ + EnablePodAffinityNamespaceSelector: !test.disableNSSelector, + }) } - - status := p.PreScore(context.Background(), state, test.pod, test.nodes) + p := plugintesting.SetupPlugin(ctx, t, n, &config.InterPodAffinityArgs{HardPodAffinityWeight: 1}, snapshot, namespaces) + status := p.(framework.PreScorePlugin).PreScore(ctx, state, test.pod, test.nodes) if !status.IsSuccess() { if !strings.Contains(status.Message(), test.wantStatus.Message()) { t.Errorf("unexpected error: %v", status) @@ -597,14 +759,14 @@ func TestPreferredAffinity(t *testing.T) { var gotList framework.NodeScoreList for _, n := range test.nodes { nodeName := n.ObjectMeta.Name - score, status := p.Score(context.Background(), state, test.pod, nodeName) + score, status := p.(framework.ScorePlugin).Score(ctx, state, test.pod, nodeName) if !status.IsSuccess() { t.Errorf("unexpected error: %v", status) } gotList = append(gotList, framework.NodeScore{Name: nodeName, Score: score}) } - status = p.ScoreExtensions().NormalizeScore(context.Background(), state, test.pod, gotList) + status = p.(framework.ScorePlugin).ScoreExtensions().NormalizeScore(ctx, state, test.pod, gotList) if !status.IsSuccess() { t.Errorf("unexpected error: %v", status) } @@ -644,6 +806,16 @@ func TestPreferredAffinityWithHardPodAffinitySymmetricWeight(t *testing.T) { }, }, }, + Namespaces: []string{"", "subteam2.team2"}, + NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "team", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"team1"}, + }, + }, + }, TopologyKey: "region", }, }, @@ -656,9 +828,11 @@ func TestPreferredAffinityWithHardPodAffinitySymmetricWeight(t *testing.T) { hardPodAffinityWeight int32 expectedList framework.NodeScoreList name string + disableNSSelector bool }{ { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelServiceS1}}, + name: "with default weight", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: podLabelServiceS1}}, pods: []*v1.Pod{ {Spec: v1.PodSpec{NodeName: "machine1", Affinity: hardPodAffinity}}, {Spec: v1.PodSpec{NodeName: "machine2", Affinity: hardPodAffinity}}, @@ -670,10 +844,10 @@ func TestPreferredAffinityWithHardPodAffinitySymmetricWeight(t *testing.T) { }, hardPodAffinityWeight: v1.DefaultHardPodAffinitySymmetricWeight, expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: framework.MaxNodeScore}, {Name: "machine3", Score: 0}}, - name: "Hard Pod Affinity symmetry: hard pod affinity symmetry weights 1 by default, then nodes that match the hard pod affinity symmetry rules, get a high score", }, { - pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelServiceS1}}, + name: "with zero weight", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: podLabelServiceS1}}, pods: []*v1.Pod{ {Spec: v1.PodSpec{NodeName: "machine1", Affinity: hardPodAffinity}}, {Spec: v1.PodSpec{NodeName: "machine2", Affinity: hardPodAffinity}}, @@ -685,35 +859,79 @@ func TestPreferredAffinityWithHardPodAffinitySymmetricWeight(t *testing.T) { }, hardPodAffinityWeight: 0, expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: 0}, {Name: "machine3", Score: 0}}, - name: "Hard Pod Affinity symmetry: hard pod affinity symmetry is closed(weights 0), then nodes that match the hard pod affinity symmetry rules, get same score with those not match", + }, + { + name: "with no matching namespace", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team2", Labels: podLabelServiceS1}}, + pods: []*v1.Pod{ + {Spec: v1.PodSpec{NodeName: "machine1", Affinity: hardPodAffinity}}, + {Spec: v1.PodSpec{NodeName: "machine2", Affinity: hardPodAffinity}}, + }, + nodes: []*v1.Node{ + {ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}}, + {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}}, + {ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}}, + }, + hardPodAffinityWeight: v1.DefaultHardPodAffinitySymmetricWeight, + expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: 0}, {Name: "machine3", Score: 0}}, + }, + { + name: "with matching NamespaceSelector", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelServiceS1}}, + pods: []*v1.Pod{ + {Spec: v1.PodSpec{NodeName: "machine1", Affinity: hardPodAffinity}}, + {Spec: v1.PodSpec{NodeName: "machine2", Affinity: hardPodAffinity}}, + }, + nodes: []*v1.Node{ + {ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}}, + {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}}, + {ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}}, + }, + hardPodAffinityWeight: v1.DefaultHardPodAffinitySymmetricWeight, + expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: framework.MaxNodeScore}, {Name: "machine3", Score: 0}}, + }, + { + name: "with matching Namespaces", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "subteam2.team2", Labels: podLabelServiceS1}}, + pods: []*v1.Pod{ + {Spec: v1.PodSpec{NodeName: "machine1", Affinity: hardPodAffinity}}, + {Spec: v1.PodSpec{NodeName: "machine2", Affinity: hardPodAffinity}}, + }, + nodes: []*v1.Node{ + {ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}}, + {ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}}, + {ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}}, + }, + hardPodAffinityWeight: v1.DefaultHardPodAffinitySymmetricWeight, + expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: framework.MaxNodeScore}, {Name: "machine3", Score: 0}}, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + ctx := context.Background() state := framework.NewCycleState() snapshot := cache.NewSnapshot(test.pods, test.nodes) - fh, _ := runtime.NewFramework(nil, nil, nil, runtime.WithSnapshotSharedLister(snapshot)) - - args := &config.InterPodAffinityArgs{HardPodAffinityWeight: test.hardPodAffinityWeight} - p, err := New(args, fh) - if err != nil { - t.Fatal(err) + n := func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) { + return New(plArgs, fh, feature.Features{ + EnablePodAffinityNamespaceSelector: !test.disableNSSelector, + }) } - status := p.(framework.PreScorePlugin).PreScore(context.Background(), state, test.pod, test.nodes) + p := plugintesting.SetupPlugin(ctx, t, n, &config.InterPodAffinityArgs{HardPodAffinityWeight: test.hardPodAffinityWeight}, snapshot, namespaces) + status := p.(framework.PreScorePlugin).PreScore(ctx, state, test.pod, test.nodes) if !status.IsSuccess() { t.Errorf("unexpected error: %v", status) } var gotList framework.NodeScoreList for _, n := range test.nodes { nodeName := n.ObjectMeta.Name - score, status := p.(framework.ScorePlugin).Score(context.Background(), state, test.pod, nodeName) + score, status := p.(framework.ScorePlugin).Score(ctx, state, test.pod, nodeName) if !status.IsSuccess() { t.Errorf("unexpected error: %v", status) } gotList = append(gotList, framework.NodeScore{Name: nodeName, Score: score}) } - status = p.(framework.ScorePlugin).ScoreExtensions().NormalizeScore(context.Background(), state, test.pod, gotList) + status = p.(framework.ScorePlugin).ScoreExtensions().NormalizeScore(ctx, state, test.pod, gotList) if !status.IsSuccess() { t.Errorf("unexpected error: %v", status) } diff --git a/pkg/scheduler/framework/plugins/registry.go b/pkg/scheduler/framework/plugins/registry.go index 5083f55eeb7..57d07aa377d 100644 --- a/pkg/scheduler/framework/plugins/registry.go +++ b/pkg/scheduler/framework/plugins/registry.go @@ -17,8 +17,13 @@ limitations under the License. package plugins import ( + apiruntime "k8s.io/apimachinery/pkg/runtime" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpreemption" + plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/imagelocality" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity" @@ -44,6 +49,10 @@ import ( // A scheduler that runs out of tree plugins can register additional plugins // through the WithFrameworkOutOfTreeRegistry option. func NewInTreeRegistry() runtime.Registry { + fts := plfeature.Features{ + EnablePodAffinityNamespaceSelector: utilfeature.DefaultFeatureGate.Enabled(features.PodAffinityNamespaceSelector), + } + return runtime.Registry{ selectorspread.Name: selectorspread.New, imagelocality.Name: imagelocality.New, @@ -67,11 +76,13 @@ func NewInTreeRegistry() runtime.Registry { nodevolumelimits.GCEPDName: nodevolumelimits.NewGCEPD, nodevolumelimits.AzureDiskName: nodevolumelimits.NewAzureDisk, nodevolumelimits.CinderName: nodevolumelimits.NewCinder, - interpodaffinity.Name: interpodaffinity.New, - nodelabel.Name: nodelabel.New, - serviceaffinity.Name: serviceaffinity.New, - queuesort.Name: queuesort.New, - defaultbinder.Name: defaultbinder.New, - defaultpreemption.Name: defaultpreemption.New, + interpodaffinity.Name: func(plArgs apiruntime.Object, fh framework.Handle) (framework.Plugin, error) { + return interpodaffinity.New(plArgs, fh, fts) + }, + nodelabel.Name: nodelabel.New, + serviceaffinity.Name: serviceaffinity.New, + queuesort.Name: queuesort.New, + defaultbinder.Name: defaultbinder.New, + defaultpreemption.Name: defaultpreemption.New, } } diff --git a/pkg/scheduler/framework/plugins/testing/testing.go b/pkg/scheduler/framework/plugins/testing/testing.go new file mode 100644 index 00000000000..1165cd9d9f1 --- /dev/null +++ b/pkg/scheduler/framework/plugins/testing/testing.go @@ -0,0 +1,59 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "context" + "testing" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/kubernetes/pkg/scheduler/framework" + frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" +) + +// SetupPlugin creates a plugin using a framework handle that includes +// the provided sharedLister and a SharedInformerFactory with the provided objects. +// The function also creates an empty namespace (since most tests creates pods with +// empty namespace), and start informer factory. +func SetupPlugin( + ctx context.Context, + t *testing.T, + pf frameworkruntime.PluginFactory, + config runtime.Object, + sharedLister framework.SharedLister, + objs []runtime.Object, +) framework.Plugin { + objs = append([]runtime.Object{&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ""}}}, objs...) + informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(objs...), 0) + fh, err := frameworkruntime.NewFramework(nil, nil, nil, + frameworkruntime.WithSnapshotSharedLister(sharedLister), + frameworkruntime.WithInformerFactory(informerFactory)) + if err != nil { + t.Fatalf("Failed creating framework runtime: %v", err) + } + p, err := pf(config, fh) + if err != nil { + t.Fatal(err) + } + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + return p +} diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index 92afd098c98..a29c0791510 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -25,7 +25,7 @@ import ( "sync/atomic" "time" - v1 "k8s.io/api/core/v1" + "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -161,12 +161,12 @@ func (pi *PodInfo) Update(pod *v1.Pod) { // Attempt to parse the affinity terms var parseErrs []error - requiredAffinityTerms, err := getAffinityTerms(pod, schedutil.GetPodAffinityTerms(pod.Spec.Affinity)) + requiredAffinityTerms, err := getAffinityTerms(pod, getPodAffinityTerms(pod.Spec.Affinity)) if err != nil { parseErrs = append(parseErrs, fmt.Errorf("requiredAffinityTerms: %w", err)) } requiredAntiAffinityTerms, err := getAffinityTerms(pod, - schedutil.GetPodAntiAffinityTerms(pod.Spec.Affinity)) + getPodAntiAffinityTerms(pod.Spec.Affinity)) if err != nil { parseErrs = append(parseErrs, fmt.Errorf("requiredAntiAffinityTerms: %w", err)) } @@ -189,9 +189,18 @@ func (pi *PodInfo) Update(pod *v1.Pod) { // AffinityTerm is a processed version of v1.PodAffinityTerm. type AffinityTerm struct { - Namespaces sets.String - Selector labels.Selector - TopologyKey string + Namespaces sets.String + Selector labels.Selector + TopologyKey string + NamespaceSelector labels.Selector +} + +// Matches returns true if the pod matches the label selector and namespaces or namespace selector. +func (at *AffinityTerm) Matches(pod *v1.Pod, nsLabels labels.Set, nsSelectorEnabled bool) bool { + if at.Namespaces.Has(pod.Namespace) || (nsSelectorEnabled && at.NamespaceSelector.Matches(nsLabels)) { + return at.Selector.Matches(labels.Set(pod.Labels)) + } + return false } // WeightedAffinityTerm is a "processed" representation of v1.WeightedAffinityTerm. @@ -240,12 +249,18 @@ func (f *FitError) Error() string { } func newAffinityTerm(pod *v1.Pod, term *v1.PodAffinityTerm) (*AffinityTerm, error) { - namespaces := schedutil.GetNamespacesFromPodAffinityTerm(pod, term) selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) if err != nil { return nil, err } - return &AffinityTerm{Namespaces: namespaces, Selector: selector, TopologyKey: term.TopologyKey}, nil + + namespaces := getNamespacesFromPodAffinityTerm(pod, term) + nsSelector, err := metav1.LabelSelectorAsSelector(term.NamespaceSelector) + if err != nil { + return nil, err + } + + return &AffinityTerm{Namespaces: namespaces, Selector: selector, TopologyKey: term.TopologyKey, NamespaceSelector: nsSelector}, nil } // getAffinityTerms receives a Pod and affinity terms and returns the namespaces and @@ -292,6 +307,44 @@ func NewPodInfo(pod *v1.Pod) *PodInfo { return pInfo } +func getPodAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm) { + if affinity != nil && affinity.PodAffinity != nil { + if len(affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 { + terms = affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution + } + // TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution. + //if len(affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 { + // terms = append(terms, affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...) + //} + } + return terms +} + +func getPodAntiAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm) { + if affinity != nil && affinity.PodAntiAffinity != nil { + if len(affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 { + terms = affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution + } + // TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution. + //if len(affinity.PodAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 { + // terms = append(terms, affinity.PodAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution...) + //} + } + return terms +} + +// returns a set of names according to the namespaces indicated in podAffinityTerm. +// If namespaces is empty it considers the given pod's namespace. +func getNamespacesFromPodAffinityTerm(pod *v1.Pod, podAffinityTerm *v1.PodAffinityTerm) sets.String { + names := sets.String{} + if len(podAffinityTerm.Namespaces) == 0 && podAffinityTerm.NamespaceSelector == nil { + names.Insert(pod.Namespace) + } else { + names.Insert(podAffinityTerm.Namespaces...) + } + return names +} + // ImageStateSummary provides summarized information about the state of an image. type ImageStateSummary struct { // Size of the image diff --git a/pkg/scheduler/framework/types_test.go b/pkg/scheduler/framework/types_test.go index 5518d8ffae3..cba369a03ff 100644 --- a/pkg/scheduler/framework/types_test.go +++ b/pkg/scheduler/framework/types_test.go @@ -22,10 +22,12 @@ import ( "strings" "testing" + "github.com/google/go-cmp/cmp" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" ) func TestNewResource(t *testing.T) { @@ -1303,3 +1305,45 @@ func TestHostPortInfo_Check(t *testing.T) { }) } } + +func TestGetNamespacesFromPodAffinityTerm(t *testing.T) { + tests := []struct { + name string + term *v1.PodAffinityTerm + want sets.String + }{ + { + name: "podAffinityTerm_namespace_empty", + term: &v1.PodAffinityTerm{}, + want: sets.String{metav1.NamespaceDefault: sets.Empty{}}, + }, + { + name: "podAffinityTerm_namespace_not_empty", + term: &v1.PodAffinityTerm{ + Namespaces: []string{metav1.NamespacePublic, metav1.NamespaceSystem}, + }, + want: sets.NewString(metav1.NamespacePublic, metav1.NamespaceSystem), + }, + { + name: "podAffinityTerm_namespace_selector_not_nil", + term: &v1.PodAffinityTerm{ + NamespaceSelector: &metav1.LabelSelector{}, + }, + want: sets.String{}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := getNamespacesFromPodAffinityTerm(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "topologies_pod", + Namespace: metav1.NamespaceDefault, + }, + }, test.term) + if diff := cmp.Diff(test.want, got); diff != "" { + t.Errorf("unexpected diff (-want, +got):\n%s", diff) + } + }) + } +} diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index fb0c0b57fb0..725a05b27f5 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -29,15 +29,19 @@ import ( "sync" "time" - "k8s.io/klog/v2" - - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" ktypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/informers" + listersv1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity" "k8s.io/kubernetes/pkg/scheduler/internal/heap" "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/util" @@ -95,8 +99,11 @@ type SchedulingQueue interface { } // NewSchedulingQueue initializes a priority queue as a new scheduling queue. -func NewSchedulingQueue(lessFn framework.LessFunc, opts ...Option) SchedulingQueue { - return NewPriorityQueue(lessFn, opts...) +func NewSchedulingQueue( + lessFn framework.LessFunc, + informerFactory informers.SharedInformerFactory, + opts ...Option) SchedulingQueue { + return NewPriorityQueue(lessFn, informerFactory, opts...) } // NominatedNodeName returns nominated node name of a Pod. @@ -148,6 +155,8 @@ type PriorityQueue struct { // closed indicates that the queue is closed. // It is mainly used to let Pop() exit its control loop while waiting for an item. closed bool + + nsLister listersv1.NamespaceLister } type priorityQueueOptions struct { @@ -218,6 +227,7 @@ func newQueuedPodInfoForLookup(pod *v1.Pod, plugins ...string) *framework.Queued // NewPriorityQueue creates a PriorityQueue object. func NewPriorityQueue( lessFn framework.LessFunc, + informerFactory informers.SharedInformerFactory, opts ...Option, ) *PriorityQueue { options := defaultPriorityQueueOptions @@ -248,6 +258,9 @@ func NewPriorityQueue( } pq.cond.L = &pq.lock pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder()) + if utilfeature.DefaultFeatureGate.Enabled(features.PodAffinityNamespaceSelector) { + pq.nsLister = informerFactory.Core().V1().Namespaces().Lister() + } return pq } @@ -559,17 +572,15 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework. // any affinity term that matches "pod". // NOTE: this function assumes lock has been acquired in caller. func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*framework.QueuedPodInfo { + nsSelectorEnabled := p.nsLister != nil + var nsLabels labels.Set + if nsSelectorEnabled { + nsLabels = interpodaffinity.GetNamespaceLabelsSnapshot(pod.Namespace, p.nsLister) + } var podsToMove []*framework.QueuedPodInfo for _, pInfo := range p.unschedulableQ.podInfoMap { - up := pInfo.Pod - terms := util.GetPodAffinityTerms(up.Spec.Affinity) - for _, term := range terms { - namespaces := util.GetNamespacesFromPodAffinityTerm(up, &term) - selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) - if err != nil { - klog.ErrorS(err, "Error getting label selectors for pod", "pod", klog.KObj(up)) - } - if util.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) { + for _, term := range pInfo.RequiredAffinityTerms { + if term.Matches(pod, nsLabels, nsSelectorEnabled) { podsToMove = append(podsToMove, pInfo) break } diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 156786c0f91..078e72856d8 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -17,6 +17,7 @@ limitations under the License. package queue import ( + "context" "fmt" "reflect" "strings" @@ -24,7 +25,7 @@ import ( "testing" "time" - v1 "k8s.io/api/core/v1" + "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ktypes "k8s.io/apimachinery/pkg/types" @@ -117,7 +118,7 @@ func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod { } func TestPriorityQueue_Add(t *testing.T) { - q := NewPriorityQueue(newDefaultQueueSort()) + q := NewTestQueue(context.Background(), newDefaultQueueSort()) if err := q.Add(medPriorityPodInfo.Pod); err != nil { t.Errorf("add failed: %v", err) } @@ -159,7 +160,7 @@ func newDefaultQueueSort() framework.LessFunc { } func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) { - q := NewPriorityQueue(newDefaultQueueSort()) + q := NewTestQueue(context.Background(), newDefaultQueueSort()) if err := q.Add(medPriorityPodInfo.Pod); err != nil { t.Errorf("add failed: %v", err) } @@ -175,7 +176,7 @@ func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) { } func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { - q := NewPriorityQueue(newDefaultQueueSort()) + q := NewTestQueue(context.Background(), newDefaultQueueSort()) q.Add(highPriNominatedPodInfo.Pod) q.AddUnschedulableIfNotPresent(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod), q.SchedulingCycle()) // Must not add anything. q.AddUnschedulableIfNotPresent(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod), q.SchedulingCycle()) @@ -207,7 +208,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { // Pods in and before current scheduling cycle will be put back to activeQueue // if we were trying to schedule them when we received move request. func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) { - q := NewPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(time.Now()))) + q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(clock.NewFakeClock(time.Now()))) totalNum := 10 expectedPods := make([]v1.Pod, 0, totalNum) for i := 0; i < totalNum; i++ { @@ -272,7 +273,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) { } func TestPriorityQueue_Pop(t *testing.T) { - q := NewPriorityQueue(newDefaultQueueSort()) + q := NewTestQueue(context.Background(), newDefaultQueueSort()) wg := sync.WaitGroup{} wg.Add(1) go func() { @@ -289,7 +290,7 @@ func TestPriorityQueue_Pop(t *testing.T) { } func TestPriorityQueue_Update(t *testing.T) { - q := NewPriorityQueue(newDefaultQueueSort()) + q := NewTestQueue(context.Background(), newDefaultQueueSort()) q.Update(nil, highPriorityPodInfo.Pod) if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(highPriorityPodInfo.Pod)); !exists { t.Errorf("Expected %v to be added to activeQ.", highPriorityPodInfo.Pod.Name) @@ -337,7 +338,7 @@ func TestPriorityQueue_Update(t *testing.T) { } func TestPriorityQueue_Delete(t *testing.T) { - q := NewPriorityQueue(newDefaultQueueSort()) + q := NewTestQueue(context.Background(), newDefaultQueueSort()) q.Update(highPriorityPodInfo.Pod, highPriNominatedPodInfo.Pod) q.Add(unschedulablePodInfo.Pod) if err := q.Delete(highPriNominatedPodInfo.Pod); err != nil { @@ -413,7 +414,7 @@ func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) { } } - q := NewPriorityQueue(newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m)) + q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m)) // Init pods in unschedulableQ. for j := 0; j < podsInUnschedulableQ; j++ { @@ -459,7 +460,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { m := map[framework.ClusterEvent]sets.String{ {Resource: framework.Node, ActionType: framework.Add}: sets.NewString("fooPlugin"), } - q := NewPriorityQueue(newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m)) + q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m)) q.Add(medPriorityPodInfo.Pod) q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"), q.SchedulingCycle()) q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin"), q.SchedulingCycle()) @@ -548,7 +549,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { m := map[framework.ClusterEvent]sets.String{ {Resource: framework.Pod, ActionType: framework.Add}: sets.NewString("fakePlugin"), } - q := NewPriorityQueue(newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m)) + q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m)) q.Add(medPriorityPodInfo.Pod) // Add a couple of pods to the unschedulableQ. q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fakePlugin"), q.SchedulingCycle()) @@ -572,7 +573,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { } func TestPriorityQueue_NominatedPodsForNode(t *testing.T) { - q := NewPriorityQueue(newDefaultQueueSort()) + q := NewTestQueue(context.Background(), newDefaultQueueSort()) q.Add(medPriorityPodInfo.Pod) q.Add(unschedulablePodInfo.Pod) q.Add(highPriorityPodInfo.Pod) @@ -597,7 +598,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) { return pendingSet } - q := NewPriorityQueue(newDefaultQueueSort()) + q := NewTestQueue(context.Background(), newDefaultQueueSort()) q.Add(medPriorityPodInfo.Pod) q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(unschedulablePodInfo.Pod), q.SchedulingCycle()) q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(highPriorityPodInfo.Pod), q.SchedulingCycle()) @@ -614,7 +615,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) { } func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { - q := NewPriorityQueue(newDefaultQueueSort()) + q := NewTestQueue(context.Background(), newDefaultQueueSort()) if err := q.Add(medPriorityPodInfo.Pod); err != nil { t.Errorf("add failed: %v", err) } @@ -683,7 +684,7 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { } func TestPriorityQueue_NewWithOptions(t *testing.T) { - q := NewPriorityQueue( + q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithPodInitialBackoffDuration(2*time.Second), WithPodMaxBackoffDuration(20*time.Second), @@ -849,7 +850,7 @@ func TestUnschedulablePodsMap(t *testing.T) { } func TestSchedulingQueue_Close(t *testing.T) { - q := NewPriorityQueue(newDefaultQueueSort()) + q := NewTestQueue(context.Background(), newDefaultQueueSort()) wantErr := fmt.Errorf(queueClosed) wg := sync.WaitGroup{} wg.Add(1) @@ -873,7 +874,7 @@ func TestSchedulingQueue_Close(t *testing.T) { // are frequent events that move pods to the active queue. func TestRecentlyTriedPodsGoBack(t *testing.T) { c := clock.NewFakeClock(time.Now()) - q := NewPriorityQueue(newDefaultQueueSort(), WithClock(c)) + q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c)) // Add a few pods to priority queue. for i := 0; i < 5; i++ { p := v1.Pod{ @@ -930,7 +931,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) { // are frequent events that move pods to the active queue. func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { c := clock.NewFakeClock(time.Now()) - q := NewPriorityQueue(newDefaultQueueSort(), WithClock(c)) + q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c)) // Add an unschedulable pod to a priority queue. // This makes a situation that the pod was tried to schedule @@ -1021,7 +1022,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { // TestHighPriorityBackoff tests that a high priority pod does not block // other pods if it is unschedulable func TestHighPriorityBackoff(t *testing.T) { - q := NewPriorityQueue(newDefaultQueueSort()) + q := NewTestQueue(context.Background(), newDefaultQueueSort()) midPod := v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -1088,7 +1089,7 @@ func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) { m := map[framework.ClusterEvent]sets.String{ {Resource: framework.Node, ActionType: framework.Add}: sets.NewString("fakePlugin"), } - q := NewPriorityQueue(newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m)) + q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m)) midPod := v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "test-midpod", @@ -1272,7 +1273,7 @@ func TestPodTimestamp(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - queue := NewPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp))) + queue := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp))) var podInfoList []*framework.QueuedPodInfo for i, op := range test.operations { @@ -1429,7 +1430,7 @@ scheduler_pending_pods{queue="unschedulable"} 0 for _, test := range tests { t.Run(test.name, func(t *testing.T) { resetMetrics() - queue := NewPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp))) + queue := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp))) for i, op := range test.operations { for _, pInfo := range test.operands[i] { op(queue, pInfo) @@ -1458,7 +1459,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) { // Case 1: A pod is created and scheduled after 1 attempt. The queue operations are // Add -> Pop. c := clock.NewFakeClock(timestamp) - queue := NewPriorityQueue(newDefaultQueueSort(), WithClock(c)) + queue := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c)) queue.Add(pod) pInfo, err := queue.Pop() if err != nil { @@ -1469,7 +1470,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) { // Case 2: A pod is created and scheduled after 2 attempts. The queue operations are // Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Pop. c = clock.NewFakeClock(timestamp) - queue = NewPriorityQueue(newDefaultQueueSort(), WithClock(c)) + queue = NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c)) queue.Add(pod) pInfo, err = queue.Pop() if err != nil { @@ -1489,7 +1490,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) { // Case 3: Similar to case 2, but before the second pop, call update, the queue operations are // Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Update -> Pop. c = clock.NewFakeClock(timestamp) - queue = NewPriorityQueue(newDefaultQueueSort(), WithClock(c)) + queue = NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c)) queue.Add(pod) pInfo, err = queue.Pop() if err != nil { @@ -1587,7 +1588,7 @@ func TestIncomingPodsMetrics(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { metrics.SchedulerQueueIncomingPods.Reset() - queue := NewPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp))) + queue := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp))) for _, op := range test.operations { for _, pInfo := range pInfos { op(queue, pInfo) @@ -1613,7 +1614,7 @@ func checkPerPodSchedulingMetrics(name string, t *testing.T, pInfo *framework.Qu func TestBackOffFlow(t *testing.T) { cl := clock.NewFakeClock(time.Now()) - q := NewPriorityQueue(newDefaultQueueSort(), WithClock(cl)) + q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(cl)) steps := []struct { wantBackoff time.Duration }{ @@ -1772,7 +1773,7 @@ func TestPodMatchesEvent(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - q := NewPriorityQueue(newDefaultQueueSort()) + q := NewTestQueue(context.Background(), newDefaultQueueSort()) q.clusterEventMap = tt.clusterEventMap if got := q.podMatchesEvent(tt.podInfo, tt.event); got != tt.want { t.Errorf("Want %v, but got %v", tt.want, got) diff --git a/pkg/scheduler/internal/queue/testing.go b/pkg/scheduler/internal/queue/testing.go new file mode 100644 index 00000000000..7a11bc45f1c --- /dev/null +++ b/pkg/scheduler/internal/queue/testing.go @@ -0,0 +1,46 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "context" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/kubernetes/pkg/scheduler/framework" +) + +// NewTestQueueWithObjects creates a priority queue with an informer factory +// populated with the provided objects. +func NewTestQueueWithObjects( + ctx context.Context, + lessFn framework.LessFunc, + objs []runtime.Object, + opts ...Option, +) *PriorityQueue { + informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(objs...), 0) + pq := NewPriorityQueue(lessFn, informerFactory, opts...) + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + return pq +} + +// NewTestQueue creates a priority queue with an empty informer factory. +func NewTestQueue(ctx context.Context, lessFn framework.LessFunc, opts ...Option) *PriorityQueue { + return NewTestQueueWithObjects(ctx, lessFn, nil, opts...) +} diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 06acf9e4b12..c39ad061558 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -448,7 +448,9 @@ func TestSchedulerMultipleProfilesScheduling(t *testing.T) { // Set up scheduler for the 3 nodes. // We use a fake filter that only allows one particular node. We create two // profiles, each with a different node in the filter configuration. - client := clientsetfake.NewSimpleClientset(nodes...) + objs := append([]runtime.Object{ + &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ""}}}, nodes...) + client := clientsetfake.NewSimpleClientset(objs...) broadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/pkg/scheduler/util/topologies.go b/pkg/scheduler/util/topologies.go deleted file mode 100644 index bf5ee53ac01..00000000000 --- a/pkg/scheduler/util/topologies.go +++ /dev/null @@ -1,81 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import ( - "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/util/sets" -) - -// GetNamespacesFromPodAffinityTerm returns a set of names -// according to the namespaces indicated in podAffinityTerm. -// If namespaces is empty it considers the given pod's namespace. -func GetNamespacesFromPodAffinityTerm(pod *v1.Pod, podAffinityTerm *v1.PodAffinityTerm) sets.String { - names := sets.String{} - if len(podAffinityTerm.Namespaces) == 0 { - names.Insert(pod.Namespace) - } else { - names.Insert(podAffinityTerm.Namespaces...) - } - return names -} - -// PodMatchesTermsNamespaceAndSelector returns true if the given -// matches the namespace and selector defined by `s . -func PodMatchesTermsNamespaceAndSelector(pod *v1.Pod, namespaces sets.String, selector labels.Selector) bool { - if !namespaces.Has(pod.Namespace) { - return false - } - - if !selector.Matches(labels.Set(pod.Labels)) { - return false - } - return true -} - -// NodesHaveSameTopologyKey checks if nodeA and nodeB have same label value with given topologyKey as label key. -// Returns false if topologyKey is empty. -func NodesHaveSameTopologyKey(nodeA, nodeB *v1.Node, topologyKey string) bool { - if len(topologyKey) == 0 { - return false - } - - if nodeA.Labels == nil || nodeB.Labels == nil { - return false - } - - nodeALabel, okA := nodeA.Labels[topologyKey] - nodeBLabel, okB := nodeB.Labels[topologyKey] - - // If found label in both nodes, check the label - if okB && okA { - return nodeALabel == nodeBLabel - } - - return false -} - -// Topologies contains topologies information of nodes. -type Topologies struct { - DefaultKeys []string -} - -// NodesHaveSameTopologyKey checks if nodeA and nodeB have same label value with given topologyKey as label key. -func (tps *Topologies) NodesHaveSameTopologyKey(nodeA, nodeB *v1.Node, topologyKey string) bool { - return NodesHaveSameTopologyKey(nodeA, nodeB, topologyKey) -} diff --git a/pkg/scheduler/util/topologies_test.go b/pkg/scheduler/util/topologies_test.go deleted file mode 100644 index 1399bdacc03..00000000000 --- a/pkg/scheduler/util/topologies_test.go +++ /dev/null @@ -1,260 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/selection" - "k8s.io/apimachinery/pkg/util/sets" -) - -func fakePod() *v1.Pod { - return &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "topologies_pod", - Namespace: metav1.NamespaceDefault, - UID: "551f5a43-9f2f-11e7-a589-fa163e148d75", - }, - } -} - -func TestGetNamespacesFromPodAffinityTerm(t *testing.T) { - tests := []struct { - name string - podAffinityTerm *v1.PodAffinityTerm - expectedValue sets.String - }{ - { - "podAffinityTerm_namespace_empty", - &v1.PodAffinityTerm{}, - sets.String{metav1.NamespaceDefault: sets.Empty{}}, - }, - { - "podAffinityTerm_namespace_not_empty", - &v1.PodAffinityTerm{ - Namespaces: []string{metav1.NamespacePublic, metav1.NamespaceSystem}, - }, - sets.String{metav1.NamespacePublic: sets.Empty{}, metav1.NamespaceSystem: sets.Empty{}}, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - realValue := GetNamespacesFromPodAffinityTerm(fakePod(), test.podAffinityTerm) - assert.EqualValuesf(t, test.expectedValue, realValue, "Failed to test: %s", test.name) - }) - } -} - -func TestPodMatchesTermsNamespaceAndSelector(t *testing.T) { - fakeNamespaces := sets.String{metav1.NamespacePublic: sets.Empty{}, metav1.NamespaceSystem: sets.Empty{}} - fakeRequirement, _ := labels.NewRequirement("service", selection.In, []string{"topologies_service1", "topologies_service2"}) - fakeSelector := labels.NewSelector().Add(*fakeRequirement) - - tests := []struct { - name string - podNamespaces string - podLabels map[string]string - expectedResult bool - }{ - { - "namespace_not_in", - metav1.NamespaceDefault, - map[string]string{"service": "topologies_service1"}, - false, - }, - { - "label_not_match", - metav1.NamespacePublic, - map[string]string{"service": "topologies_service3"}, - false, - }, - { - "normal_case", - metav1.NamespacePublic, - map[string]string{"service": "topologies_service1"}, - true, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - fakeTestPod := fakePod() - fakeTestPod.Namespace = test.podNamespaces - fakeTestPod.Labels = test.podLabels - - realValue := PodMatchesTermsNamespaceAndSelector(fakeTestPod, fakeNamespaces, fakeSelector) - assert.EqualValuesf(t, test.expectedResult, realValue, "Failed to test: %s", test.name) - }) - } - -} - -func TestNodesHaveSameTopologyKey(t *testing.T) { - tests := []struct { - name string - nodeA, nodeB *v1.Node - topologyKey string - expected bool - }{ - { - name: "nodeA{'a':'a'} vs. empty label in nodeB", - nodeA: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "a": "a", - }, - }, - }, - nodeB: &v1.Node{}, - expected: false, - topologyKey: "a", - }, - { - name: "nodeA{'a':'a'} vs. nodeB{'a':'a'}", - nodeA: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "a": "a", - }, - }, - }, - nodeB: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "a": "a", - }, - }, - }, - expected: true, - topologyKey: "a", - }, - { - name: "nodeA{'a':''} vs. empty label in nodeB", - nodeA: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "a": "", - }, - }, - }, - nodeB: &v1.Node{}, - expected: false, - topologyKey: "a", - }, - { - name: "nodeA{'a':''} vs. nodeB{'a':''}", - nodeA: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "a": "", - }, - }, - }, - nodeB: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "a": "", - }, - }, - }, - expected: true, - topologyKey: "a", - }, - { - name: "nodeA{'a':'a'} vs. nodeB{'a':'a'} by key{'b'}", - nodeA: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "a": "a", - }, - }, - }, - nodeB: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "a": "a", - }, - }, - }, - expected: false, - topologyKey: "b", - }, - { - name: "topologyKey empty", - nodeA: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "a": "", - }, - }, - }, - nodeB: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "a": "", - }, - }, - }, - expected: false, - topologyKey: "", - }, - { - name: "nodeA label nil vs. nodeB{'a':''} by key('a')", - nodeA: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{}, - }, - nodeB: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "a": "", - }, - }, - }, - expected: false, - topologyKey: "a", - }, - { - name: "nodeA{'a':''} vs. nodeB label is nil by key('a')", - nodeA: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "a": "", - }, - }, - }, - nodeB: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{}, - }, - expected: false, - topologyKey: "a", - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - got := NodesHaveSameTopologyKey(test.nodeA, test.nodeB, test.topologyKey) - assert.Equalf(t, test.expected, got, "Failed to test: %s", test.name) - }) - } -} diff --git a/pkg/scheduler/util/utils.go b/pkg/scheduler/util/utils.go index 93ed14f3954..6836a2150b4 100644 --- a/pkg/scheduler/util/utils.go +++ b/pkg/scheduler/util/utils.go @@ -90,34 +90,6 @@ func MoreImportantPod(pod1, pod2 *v1.Pod) bool { return GetPodStartTime(pod1).Before(GetPodStartTime(pod2)) } -// GetPodAffinityTerms gets pod affinity terms by a pod affinity object. -func GetPodAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm) { - if affinity != nil && affinity.PodAffinity != nil { - if len(affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 { - terms = affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution - } - // TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution. - //if len(affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 { - // terms = append(terms, affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...) - //} - } - return terms -} - -// GetPodAntiAffinityTerms gets pod affinity terms by a pod anti-affinity. -func GetPodAntiAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm) { - if affinity != nil && affinity.PodAntiAffinity != nil { - if len(affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 { - terms = affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution - } - // TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution. - //if len(affinity.PodAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 { - // terms = append(terms, affinity.PodAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution...) - //} - } - return terms -} - // PatchPod calculates the delta bytes change from to , // and then submit a request to API server to patch the pod changes. func PatchPod(cs kubernetes.Interface, old *v1.Pod, new *v1.Pod) error {