Merge pull request #98446 from ahg-g/ahg-nss

Implements NamespaceSelector for pod affinity
This commit is contained in:
Kubernetes Prow Robot 2021-03-04 07:44:32 -08:00 committed by GitHub
commit 3a7d16db72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1051 additions and 710 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package scheduler
import (
"context"
"reflect"
"testing"
"time"
@ -417,10 +418,10 @@ func TestUpdatePodInCache(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
schedulerCache := cache.New(ttl, stopCh)
schedulerQueue := queue.NewPriorityQueue(nil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
schedulerCache := cache.New(ttl, ctx.Done())
schedulerQueue := queue.NewTestQueue(ctx, nil)
sched := &Scheduler{
SchedulerCache: schedulerCache,
SchedulingQueue: schedulerQueue,

View File

@ -153,6 +153,7 @@ func (c *Configurator) create() (*Scheduler, error) {
lessFn := profiles[c.profiles[0].SchedulerName].QueueSortFunc()
podQueue := internalqueue.NewSchedulingQueue(
lessFn,
c.informerFactory,
internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second),
internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second),
internalqueue.WithPodNominator(nominator),

View File

@ -452,7 +452,7 @@ func TestDefaultErrorFunc(t *testing.T) {
// Need to add/update/delete testPod to the store.
podInformer.Informer().GetStore().Add(testPod)
queue := internalqueue.NewPriorityQueue(nil, internalqueue.WithClock(clock.NewFakeClock(time.Now())))
queue := internalqueue.NewPriorityQueue(nil, informerFactory, internalqueue.WithClock(clock.NewFakeClock(time.Now())))
schedulerCache := internalcache.New(30*time.Second, stopCh)
queue.Add(testPod)
@ -526,7 +526,7 @@ func TestDefaultErrorFunc_NodeNotFound(t *testing.T) {
// Need to add testPod to the store.
podInformer.Informer().GetStore().Add(testPod)
queue := internalqueue.NewPriorityQueue(nil, internalqueue.WithClock(clock.NewFakeClock(time.Now())))
queue := internalqueue.NewPriorityQueue(nil, informerFactory, internalqueue.WithClock(clock.NewFakeClock(time.Now())))
schedulerCache := internalcache.New(30*time.Second, stopCh)
for i := range tt.nodes {
@ -567,7 +567,7 @@ func TestDefaultErrorFunc_PodAlreadyBound(t *testing.T) {
// Need to add testPod to the store.
podInformer.Informer().GetStore().Add(testPod)
queue := internalqueue.NewPriorityQueue(nil, internalqueue.WithClock(clock.NewFakeClock(time.Now())))
queue := internalqueue.NewPriorityQueue(nil, informerFactory, internalqueue.WithClock(clock.NewFakeClock(time.Now())))
schedulerCache := internalcache.New(30*time.Second, stopCh)
// Add node to schedulerCache no matter it's deleted in API server or not.

View File

@ -42,6 +42,7 @@ import (
configv1beta1 "k8s.io/kubernetes/pkg/scheduler/apis/config/v1beta1"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel"
@ -530,7 +531,9 @@ func TestDryRunPreemption(t *testing.T) {
name: "pod with anti-affinity is preempted",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterPluginAsExtensions(noderesources.FitName, noderesources.NewFit, "Filter", "PreFilter"),
st.RegisterPluginAsExtensions(interpodaffinity.Name, interpodaffinity.New, "Filter", "PreFilter"),
st.RegisterPluginAsExtensions(interpodaffinity.Name, func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) {
return interpodaffinity.New(plArgs, fh, feature.Features{})
}, "Filter", "PreFilter"),
},
nodeNames: []string{"node1", "node2"},
testPods: []*v1.Pod{
@ -991,7 +994,8 @@ func TestDryRunPreemption(t *testing.T) {
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
)
registeredPlugins = append(registeredPlugins, tt.registerPlugins...)
informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0)
objs := []runtime.Object{&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ""}}}
informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(objs...), 0)
fwk, err := st.NewFramework(
registeredPlugins,
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()),
@ -1002,6 +1006,11 @@ func TestDryRunPreemption(t *testing.T) {
t.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
nodeInfos, err := snapshot.NodeInfos().List()
if err != nil {
t.Fatal(err)

View File

@ -0,0 +1,24 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package feature
// Features carries feature gate values used by various plugins.
// This struct allows us to break the dependency of the plugins on
// the internal k8s features pkg.
type Features struct {
EnablePodAffinityNamespaceSelector bool
}

View File

@ -21,11 +21,11 @@ import (
"fmt"
"sync/atomic"
v1 "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
)
const (
@ -46,13 +46,16 @@ const (
// preFilterState computed at PreFilter and used at Filter.
type preFilterState struct {
// A map of topology pairs to the number of existing pods that has anti-affinity terms that match the "pod".
topologyToMatchedExistingAntiAffinityTerms topologyToMatchedTermCount
existingAntiAffinityCounts topologyToMatchedTermCount
// A map of topology pairs to the number of existing pods that match the affinity terms of the "pod".
topologyToMatchedAffinityTerms topologyToMatchedTermCount
affinityCounts topologyToMatchedTermCount
// A map of topology pairs to the number of existing pods that match the anti-affinity terms of the "pod".
topologyToMatchedAntiAffinityTerms topologyToMatchedTermCount
antiAffinityCounts topologyToMatchedTermCount
// podInfo of the incoming pod.
podInfo *framework.PodInfo
// A copy of the incoming pod's namespace labels.
namespaceLabels labels.Set
enableNamespaceSelector bool
}
// Clone the prefilter state.
@ -62,27 +65,27 @@ func (s *preFilterState) Clone() framework.StateData {
}
copy := preFilterState{}
copy.topologyToMatchedAffinityTerms = s.topologyToMatchedAffinityTerms.clone()
copy.topologyToMatchedAntiAffinityTerms = s.topologyToMatchedAntiAffinityTerms.clone()
copy.topologyToMatchedExistingAntiAffinityTerms = s.topologyToMatchedExistingAntiAffinityTerms.clone()
copy.affinityCounts = s.affinityCounts.clone()
copy.antiAffinityCounts = s.antiAffinityCounts.clone()
copy.existingAntiAffinityCounts = s.existingAntiAffinityCounts.clone()
// No need to deep copy the podInfo because it shouldn't change.
copy.podInfo = s.podInfo
copy.namespaceLabels = s.namespaceLabels
copy.enableNamespaceSelector = s.enableNamespaceSelector
return &copy
}
// updateWithPod updates the preFilterState counters with the (anti)affinity matches for the given podInfo.
func (s *preFilterState) updateWithPod(updatedPodInfo *framework.PodInfo, node *v1.Node, multiplier int64) {
func (s *preFilterState) updateWithPod(pInfo *framework.PodInfo, node *v1.Node, multiplier int64) {
if s == nil {
return
}
// Update matching existing anti-affinity terms.
s.topologyToMatchedExistingAntiAffinityTerms.updateWithAntiAffinityTerms(s.podInfo.Pod, node, updatedPodInfo.RequiredAntiAffinityTerms, multiplier)
// Update matching incoming pod (anti)affinity terms.
s.topologyToMatchedAffinityTerms.updateWithAffinityTerms(updatedPodInfo.Pod, node, s.podInfo.RequiredAffinityTerms, multiplier)
s.topologyToMatchedAntiAffinityTerms.updateWithAntiAffinityTerms(updatedPodInfo.Pod, node, s.podInfo.RequiredAntiAffinityTerms, multiplier)
s.existingAntiAffinityCounts.updateWithAntiAffinityTerms(pInfo.RequiredAntiAffinityTerms, s.podInfo.Pod, s.namespaceLabels, node, multiplier, s.enableNamespaceSelector)
s.affinityCounts.updateWithAffinityTerms(s.podInfo.RequiredAffinityTerms, pInfo.Pod, node, multiplier, s.enableNamespaceSelector)
// The incoming pod's terms have the namespaceSelector merged into the namespaces, and so
// here we don't lookup the updated pod's namespace labels, hence passing nil for nsLabels.
s.antiAffinityCounts.updateWithAntiAffinityTerms(s.podInfo.RequiredAntiAffinityTerms, pInfo.Pod, nil, node, multiplier, s.enableNamespaceSelector)
}
type topologyPair struct {
@ -103,60 +106,58 @@ func (m topologyToMatchedTermCount) clone() topologyToMatchedTermCount {
return copy
}
// updateWithAffinityTerms updates the topologyToMatchedTermCount map with the specified value
func (m topologyToMatchedTermCount) update(node *v1.Node, tk string, value int64) {
if tv, ok := node.Labels[tk]; ok {
pair := topologyPair{key: tk, value: tv}
m[pair] += value
// value could be negative, hence we delete the entry if it is down to zero.
if m[pair] == 0 {
delete(m, pair)
}
}
}
// updates the topologyToMatchedTermCount map with the specified value
// for each affinity term if "targetPod" matches ALL terms.
func (m topologyToMatchedTermCount) updateWithAffinityTerms(targetPod *v1.Pod, targetPodNode *v1.Node, affinityTerms []framework.AffinityTerm, value int64) {
if podMatchesAllAffinityTerms(targetPod, affinityTerms) {
for _, t := range affinityTerms {
if topologyValue, ok := targetPodNode.Labels[t.TopologyKey]; ok {
pair := topologyPair{key: t.TopologyKey, value: topologyValue}
m[pair] += value
// value could be a negative value, hence we delete the entry if
// the entry is down to zero.
if m[pair] == 0 {
delete(m, pair)
}
}
func (m topologyToMatchedTermCount) updateWithAffinityTerms(
terms []framework.AffinityTerm, pod *v1.Pod, node *v1.Node, value int64, enableNamespaceSelector bool) {
if podMatchesAllAffinityTerms(terms, pod, enableNamespaceSelector) {
for _, t := range terms {
m.update(node, t.TopologyKey, value)
}
}
}
// updateWithAntiAffinityTerms updates the topologyToMatchedTermCount map with the specified value
// updates the topologyToMatchedTermCount map with the specified value
// for each anti-affinity term matched the target pod.
func (m topologyToMatchedTermCount) updateWithAntiAffinityTerms(targetPod *v1.Pod, targetPodNode *v1.Node, antiAffinityTerms []framework.AffinityTerm, value int64) {
func (m topologyToMatchedTermCount) updateWithAntiAffinityTerms(terms []framework.AffinityTerm, pod *v1.Pod, nsLabels labels.Set, node *v1.Node, value int64, enableNamespaceSelector bool) {
// Check anti-affinity terms.
for _, a := range antiAffinityTerms {
if schedutil.PodMatchesTermsNamespaceAndSelector(targetPod, a.Namespaces, a.Selector) {
if topologyValue, ok := targetPodNode.Labels[a.TopologyKey]; ok {
pair := topologyPair{key: a.TopologyKey, value: topologyValue}
m[pair] += value
// value could be a negative value, hence we delete the entry if
// the entry is down to zero.
if m[pair] == 0 {
delete(m, pair)
}
}
for _, t := range terms {
if t.Matches(pod, nsLabels, enableNamespaceSelector) {
m.update(node, t.TopologyKey, value)
}
}
}
// podMatchesAllAffinityTerms returns true IFF the given pod matches all the given terms.
func podMatchesAllAffinityTerms(pod *v1.Pod, terms []framework.AffinityTerm) bool {
// returns true IFF the given pod matches all the given terms.
func podMatchesAllAffinityTerms(terms []framework.AffinityTerm, pod *v1.Pod, enableNamespaceSelector bool) bool {
if len(terms) == 0 {
return false
}
for _, term := range terms {
if !schedutil.PodMatchesTermsNamespaceAndSelector(pod, term.Namespaces, term.Selector) {
for _, t := range terms {
// The incoming pod NamespaceSelector was merged into the Namespaces set, and so
// we are not explicitly passing in namespace labels.
if !t.Matches(pod, nil, enableNamespaceSelector) {
return false
}
}
return true
}
// getTPMapMatchingExistingAntiAffinity calculates the following for each existing pod on each node:
// calculates the following for each existing pod on each node:
// (1) Whether it has PodAntiAffinity
// (2) Whether any AffinityTerm matches the incoming pod
func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, nodes []*framework.NodeInfo) topologyToMatchedTermCount {
func getExistingAntiAffinityCounts(pod *v1.Pod, nsLabels labels.Set, nodes []*framework.NodeInfo, enableNamespaceSelector bool) topologyToMatchedTermCount {
topoMaps := make([]topologyToMatchedTermCount, len(nodes))
index := int32(-1)
processNode := func(i int) {
@ -168,7 +169,7 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, nodes []*framework.NodeIn
}
topoMap := make(topologyToMatchedTermCount)
for _, existingPod := range nodeInfo.PodsWithRequiredAntiAffinity {
topoMap.updateWithAntiAffinityTerms(pod, node, existingPod.RequiredAntiAffinityTerms, 1)
topoMap.updateWithAntiAffinityTerms(existingPod.RequiredAntiAffinityTerms, pod, nsLabels, node, 1, enableNamespaceSelector)
}
if len(topoMap) != 0 {
topoMaps[atomic.AddInt32(&index, 1)] = topoMap
@ -184,11 +185,11 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, nodes []*framework.NodeIn
return result
}
// getTPMapMatchingIncomingAffinityAntiAffinity finds existing Pods that match affinity terms of the given "pod".
// finds existing Pods that match affinity terms of the incoming pod's (anti)affinity terms.
// It returns a topologyToMatchedTermCount that are checked later by the affinity
// predicate. With this topologyToMatchedTermCount available, the affinity predicate does not
// need to check all the pods in the cluster.
func getTPMapMatchingIncomingAffinityAntiAffinity(podInfo *framework.PodInfo, allNodes []*framework.NodeInfo) (topologyToMatchedTermCount, topologyToMatchedTermCount) {
func getIncomingAffinityAntiAffinityCounts(podInfo *framework.PodInfo, allNodes []*framework.NodeInfo, enableNamespaceSelector bool) (topologyToMatchedTermCount, topologyToMatchedTermCount) {
affinityCounts := make(topologyToMatchedTermCount)
antiAffinityCounts := make(topologyToMatchedTermCount)
if len(podInfo.RequiredAffinityTerms) == 0 && len(podInfo.RequiredAntiAffinityTerms) == 0 {
@ -208,11 +209,10 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(podInfo *framework.PodInfo, al
affinity := make(topologyToMatchedTermCount)
antiAffinity := make(topologyToMatchedTermCount)
for _, existingPod := range nodeInfo.Pods {
// Check affinity terms.
affinity.updateWithAffinityTerms(existingPod.Pod, node, podInfo.RequiredAffinityTerms, 1)
// Check anti-affinity terms.
antiAffinity.updateWithAntiAffinityTerms(existingPod.Pod, node, podInfo.RequiredAntiAffinityTerms, 1)
affinity.updateWithAffinityTerms(podInfo.RequiredAffinityTerms, existingPod.Pod, node, 1, enableNamespaceSelector)
// The incoming pod's terms have the namespaceSelector merged into the namespaces, and so
// here we don't lookup the existing pod's namespace labels, hence passing nil for nsLabels.
antiAffinity.updateWithAntiAffinityTerms(podInfo.RequiredAntiAffinityTerms, existingPod.Pod, nil, node, 1, enableNamespaceSelector)
}
if len(affinity) > 0 || len(antiAffinity) > 0 {
@ -243,25 +243,32 @@ func (pl *InterPodAffinity) PreFilter(ctx context.Context, cycleState *framework
return framework.AsStatus(fmt.Errorf("failed to list NodeInfos with pods with affinity: %w", err))
}
podInfo := framework.NewPodInfo(pod)
if podInfo.ParseError != nil {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("parsing pod: %+v", podInfo.ParseError))
}
// existingPodAntiAffinityMap will be used later for efficient check on existing pods' anti-affinity
existingPodAntiAffinityMap := getTPMapMatchingExistingAntiAffinity(pod, nodesWithRequiredAntiAffinityPods)
// incomingPodAffinityMap will be used later for efficient check on incoming pod's affinity
// incomingPodAntiAffinityMap will be used later for efficient check on incoming pod's anti-affinity
incomingPodAffinityMap, incomingPodAntiAffinityMap := getTPMapMatchingIncomingAffinityAntiAffinity(podInfo, allNodes)
s := &preFilterState{
topologyToMatchedAffinityTerms: incomingPodAffinityMap,
topologyToMatchedAntiAffinityTerms: incomingPodAntiAffinityMap,
topologyToMatchedExistingAntiAffinityTerms: existingPodAntiAffinityMap,
podInfo: podInfo,
enableNamespaceSelector: pl.enableNamespaceSelector,
}
s.podInfo = framework.NewPodInfo(pod)
if s.podInfo.ParseError != nil {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("parsing pod: %+v", s.podInfo.ParseError))
}
if pl.enableNamespaceSelector {
for i := range s.podInfo.RequiredAffinityTerms {
if err := pl.mergeAffinityTermNamespacesIfNotEmpty(&s.podInfo.RequiredAffinityTerms[i]); err != nil {
return framework.AsStatus(err)
}
}
for i := range s.podInfo.RequiredAntiAffinityTerms {
if err := pl.mergeAffinityTermNamespacesIfNotEmpty(&s.podInfo.RequiredAntiAffinityTerms[i]); err != nil {
return framework.AsStatus(err)
}
}
s.namespaceLabels = GetNamespaceLabelsSnapshot(pod.Namespace, pl.nsLister)
}
s.existingAntiAffinityCounts = getExistingAntiAffinityCounts(pod, s.namespaceLabels, nodesWithRequiredAntiAffinityPods, pl.enableNamespaceSelector)
s.affinityCounts, s.antiAffinityCounts = getIncomingAffinityAntiAffinityCounts(s.podInfo, allNodes, pl.enableNamespaceSelector)
cycleState.Write(preFilterStateKey, s)
return nil
}
@ -308,12 +315,12 @@ func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error
// Checks if scheduling the pod onto this node would break any anti-affinity
// terms indicated by the existing pods.
func satisfyExistingPodsAntiAffinity(state *preFilterState, nodeInfo *framework.NodeInfo) bool {
if len(state.topologyToMatchedExistingAntiAffinityTerms) > 0 {
if len(state.existingAntiAffinityCounts) > 0 {
// Iterate over topology pairs to get any of the pods being affected by
// the scheduled pod anti-affinity terms
for topologyKey, topologyValue := range nodeInfo.Node().Labels {
tp := topologyPair{key: topologyKey, value: topologyValue}
if state.topologyToMatchedExistingAntiAffinityTerms[tp] > 0 {
if state.existingAntiAffinityCounts[tp] > 0 {
return false
}
}
@ -323,11 +330,11 @@ func satisfyExistingPodsAntiAffinity(state *preFilterState, nodeInfo *framework.
// Checks if the node satisfies the incoming pod's anti-affinity rules.
func satisfyPodAntiAffinity(state *preFilterState, nodeInfo *framework.NodeInfo) bool {
if len(state.topologyToMatchedAntiAffinityTerms) > 0 {
if len(state.antiAffinityCounts) > 0 {
for _, term := range state.podInfo.RequiredAntiAffinityTerms {
if topologyValue, ok := nodeInfo.Node().Labels[term.TopologyKey]; ok {
tp := topologyPair{key: term.TopologyKey, value: topologyValue}
if state.topologyToMatchedAntiAffinityTerms[tp] > 0 {
if state.antiAffinityCounts[tp] > 0 {
return false
}
}
@ -342,7 +349,7 @@ func satisfyPodAffinity(state *preFilterState, nodeInfo *framework.NodeInfo) boo
for _, term := range state.podInfo.RequiredAffinityTerms {
if topologyValue, ok := nodeInfo.Node().Labels[term.TopologyKey]; ok {
tp := topologyPair{key: term.TopologyKey, value: topologyValue}
if state.topologyToMatchedAffinityTerms[tp] <= 0 {
if state.affinityCounts[tp] <= 0 {
podsExist = false
}
} else {
@ -357,8 +364,7 @@ func satisfyPodAffinity(state *preFilterState, nodeInfo *framework.NodeInfo) boo
// in the cluster matches the namespace and selector of this pod, the pod matches
// its own terms, and the node has all the requested topologies, then we allow the pod
// to pass the affinity check.
podInfo := state.podInfo
if len(state.topologyToMatchedAffinityTerms) == 0 && podMatchesAllAffinityTerms(podInfo.Pod, podInfo.RequiredAffinityTerms) {
if len(state.affinityCounts) == 0 && podMatchesAllAffinityTerms(state.podInfo.RequiredAffinityTerms, state.podInfo.Pod, state.enableNamespaceSelector) {
return true
}
return false

View File

@ -24,7 +24,11 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
plugintesting "k8s.io/kubernetes/pkg/scheduler/framework/plugins/testing"
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
)
@ -62,18 +66,20 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
podLabel2 := map[string]string{"security": "S1"}
node1 := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labels1}}
tests := []struct {
pod *v1.Pod
pods []*v1.Pod
node *v1.Node
name string
wantStatus *framework.Status
pod *v1.Pod
pods []*v1.Pod
node *v1.Node
name string
wantStatus *framework.Status
disableNSSelector bool
}{
{
name: "A pod that has no required pod affinity scheduling rules can schedule onto a node with no existing pods",
pod: new(v1.Pod),
node: &node1,
name: "A pod that has no required pod affinity scheduling rules can schedule onto a node with no existing pods",
},
{
name: "satisfies with requiredDuringSchedulingIgnoredDuringExecution in PodAffinity using In operator that matches the existing pod",
pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel2,
[]v1.PodAffinityTerm{
{
@ -91,9 +97,9 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
}, nil),
pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabel}}},
node: &node1,
name: "satisfies with requiredDuringSchedulingIgnoredDuringExecution in PodAffinity using In operator that matches the existing pod",
},
{
name: "satisfies the pod with requiredDuringSchedulingIgnoredDuringExecution in PodAffinity using not in operator in labelSelector that matches the existing pod",
pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel2,
[]v1.PodAffinityTerm{
{
@ -111,9 +117,9 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
}, nil),
pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabel}}},
node: &node1,
name: "satisfies the pod with requiredDuringSchedulingIgnoredDuringExecution in PodAffinity using not in operator in labelSelector that matches the existing pod",
},
{
name: "Does not satisfy the PodAffinity with labelSelector because of diff Namespace",
pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel2,
[]v1.PodAffinityTerm{
{
@ -131,7 +137,6 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
}, nil),
pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabel, Namespace: "ns"}}},
node: &node1,
name: "Does not satisfy the PodAffinity with labelSelector because of diff Namespace",
wantStatus: framework.NewStatus(
framework.UnschedulableAndUnresolvable,
ErrReasonAffinityNotMatch,
@ -139,6 +144,7 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
),
},
{
name: "Doesn't satisfy the PodAffinity because of unmatching labelSelector with the existing pod",
pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel,
[]v1.PodAffinityTerm{
{
@ -155,7 +161,6 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
}, nil),
pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabel}}},
node: &node1,
name: "Doesn't satisfy the PodAffinity because of unmatching labelSelector with the existing pod",
wantStatus: framework.NewStatus(
framework.UnschedulableAndUnresolvable,
ErrReasonAffinityNotMatch,
@ -163,6 +168,7 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
),
},
{
name: "satisfies the PodAffinity with different label Operators in multiple RequiredDuringSchedulingIgnoredDuringExecution ",
pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel2,
[]v1.PodAffinityTerm{
{
@ -197,9 +203,9 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
}, nil),
pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabel}}},
node: &node1,
name: "satisfies the PodAffinity with different label Operators in multiple RequiredDuringSchedulingIgnoredDuringExecution ",
},
{
name: "The labelSelector requirements(items of matchExpressions) are ANDed, the pod cannot schedule onto the node because one of the matchExpression item don't match.",
pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel2,
[]v1.PodAffinityTerm{
{
@ -234,7 +240,6 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
}, nil),
pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabel}}},
node: &node1,
name: "The labelSelector requirements(items of matchExpressions) are ANDed, the pod cannot schedule onto the node because one of the matchExpression item don't match.",
wantStatus: framework.NewStatus(
framework.UnschedulableAndUnresolvable,
ErrReasonAffinityNotMatch,
@ -242,6 +247,7 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
),
},
{
name: "satisfies the PodAffinity and PodAntiAffinity with the existing pod",
pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel2,
[]v1.PodAffinityTerm{
{
@ -273,9 +279,9 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
}),
pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabel}}},
node: &node1,
name: "satisfies the PodAffinity and PodAntiAffinity with the existing pod",
},
{
name: "satisfies the PodAffinity and PodAntiAffinity and PodAntiAffinity symmetry with the existing pod",
pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel2,
[]v1.PodAffinityTerm{
{
@ -323,9 +329,9 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
}),
},
node: &node1,
name: "satisfies the PodAffinity and PodAntiAffinity and PodAntiAffinity symmetry with the existing pod",
},
{
name: "satisfies the PodAffinity but doesn't satisfy the PodAntiAffinity with the existing pod",
pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel2,
[]v1.PodAffinityTerm{
{
@ -357,7 +363,6 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
}),
pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabel}}},
node: &node1,
name: "satisfies the PodAffinity but doesn't satisfy the PodAntiAffinity with the existing pod",
wantStatus: framework.NewStatus(
framework.Unschedulable,
ErrReasonAffinityNotMatch,
@ -365,6 +370,7 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
),
},
{
name: "satisfies the PodAffinity and PodAntiAffinity but doesn't satisfy PodAntiAffinity symmetry with the existing pod",
pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel,
[]v1.PodAffinityTerm{
{
@ -412,7 +418,6 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
}),
},
node: &node1,
name: "satisfies the PodAffinity and PodAntiAffinity but doesn't satisfy PodAntiAffinity symmetry with the existing pod",
wantStatus: framework.NewStatus(
framework.Unschedulable,
ErrReasonAffinityNotMatch,
@ -420,6 +425,7 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
),
},
{
name: "pod matches its own Label in PodAffinity and that matches the existing pod Labels",
pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel,
[]v1.PodAffinityTerm{
{
@ -437,7 +443,6 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
}, nil),
pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabel}}},
node: &node1,
name: "pod matches its own Label in PodAffinity and that matches the existing pod Labels",
wantStatus: framework.NewStatus(
framework.UnschedulableAndUnresolvable,
ErrReasonAffinityNotMatch,
@ -445,6 +450,7 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
),
},
{
name: "verify that PodAntiAffinity from existing pod is respected when pod has no AntiAffinity constraints. doesn't satisfy PodAntiAffinity symmetry with the existing pod",
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: podLabel,
@ -468,7 +474,6 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
}),
},
node: &node1,
name: "verify that PodAntiAffinity from existing pod is respected when pod has no AntiAffinity constraints. doesn't satisfy PodAntiAffinity symmetry with the existing pod",
wantStatus: framework.NewStatus(
framework.Unschedulable,
ErrReasonAffinityNotMatch,
@ -476,6 +481,7 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
),
},
{
name: "verify that PodAntiAffinity from existing pod is respected when pod has no AntiAffinity constraints. satisfy PodAntiAffinity symmetry with the existing pod",
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: podLabel,
@ -499,52 +505,51 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
}),
},
node: &node1,
name: "verify that PodAntiAffinity from existing pod is respected when pod has no AntiAffinity constraints. satisfy PodAntiAffinity symmetry with the existing pod",
},
{
pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel, nil,
[]v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "service",
Operator: metav1.LabelSelectorOpExists,
},
},
},
TopologyKey: "region",
},
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "security",
Operator: metav1.LabelSelectorOpExists,
},
},
},
TopologyKey: "region",
},
}),
pods: []*v1.Pod{
createPodWithAffinityTerms(defaultNamespace, "machine1", podLabel2, nil,
[]v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "security",
Operator: metav1.LabelSelectorOpExists,
},
},
},
TopologyKey: "zone",
},
}),
},
node: &node1,
name: "satisfies the PodAntiAffinity with existing pod but doesn't satisfy PodAntiAffinity symmetry with incoming pod",
pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel, nil,
[]v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "service",
Operator: metav1.LabelSelectorOpExists,
},
},
},
TopologyKey: "region",
},
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "security",
Operator: metav1.LabelSelectorOpExists,
},
},
},
TopologyKey: "region",
},
}),
pods: []*v1.Pod{
createPodWithAffinityTerms(defaultNamespace, "machine1", podLabel2, nil,
[]v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "security",
Operator: metav1.LabelSelectorOpExists,
},
},
},
TopologyKey: "zone",
},
}),
},
node: &node1,
wantStatus: framework.NewStatus(
framework.Unschedulable,
ErrReasonAffinityNotMatch,
@ -552,6 +557,7 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
),
},
{
name: "PodAntiAffinity symmetry check a1: incoming pod and existing pod partially match each other on AffinityTerms",
pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel, nil,
[]v1.PodAffinityTerm{
{
@ -599,9 +605,9 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
ErrReasonAffinityNotMatch,
ErrReasonAntiAffinityRulesNotMatch,
),
name: "PodAntiAffinity symmetry check a1: incoming pod and existing pod partially match each other on AffinityTerms",
},
{
name: "PodAntiAffinity symmetry check a2: incoming pod and existing pod partially match each other on AffinityTerms",
pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel2, nil,
[]v1.PodAffinityTerm{
{
@ -649,9 +655,9 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
ErrReasonAffinityNotMatch,
ErrReasonExistingAntiAffinityRulesNotMatch,
),
name: "PodAntiAffinity symmetry check a2: incoming pod and existing pod partially match each other on AffinityTerms",
},
{
name: "PodAntiAffinity symmetry check b1: incoming pod and existing pod partially match each other on AffinityTerms",
pod: createPodWithAffinityTerms(defaultNamespace, "", map[string]string{"abc": "", "xyz": ""}, nil,
[]v1.PodAffinityTerm{
{
@ -710,9 +716,9 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
ErrReasonAffinityNotMatch,
ErrReasonAntiAffinityRulesNotMatch,
),
name: "PodAntiAffinity symmetry check b1: incoming pod and existing pod partially match each other on AffinityTerms",
},
{
name: "PodAntiAffinity symmetry check b2: incoming pod and existing pod partially match each other on AffinityTerms",
pod: createPodWithAffinityTerms(defaultNamespace, "", map[string]string{"def": "", "xyz": ""}, nil,
[]v1.PodAffinityTerm{
{
@ -771,7 +777,6 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
ErrReasonAffinityNotMatch,
ErrReasonAntiAffinityRulesNotMatch,
),
name: "PodAntiAffinity symmetry check b2: incoming pod and existing pod partially match each other on AffinityTerms",
},
{
name: "PodAffinity fails PreFilter with an invalid affinity label syntax",
@ -847,23 +852,179 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
`Invalid value: "{{.bad-value.}}"`,
),
},
{
name: "affinity with NamespaceSelector",
pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel2,
[]v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "service",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"securityscan", "value2"},
},
},
},
NamespaceSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "team",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"team1"},
},
},
},
TopologyKey: "region",
},
}, nil),
pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabel}}},
node: &node1,
},
{
name: "affinity with non-matching NamespaceSelector",
pod: createPodWithAffinityTerms(defaultNamespace, "", podLabel2,
[]v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "service",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"securityscan", "value2"},
},
},
},
NamespaceSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "team",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"team1"},
},
},
},
TopologyKey: "region",
},
}, nil),
pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team2", Labels: podLabel}}},
node: &node1,
wantStatus: framework.NewStatus(
framework.UnschedulableAndUnresolvable,
ErrReasonAffinityNotMatch,
ErrReasonAffinityRulesNotMatch,
),
},
{
name: "anti-affinity with matching NamespaceSelector",
pod: createPodWithAffinityTerms("subteam1.team1", "", podLabel2, nil,
[]v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "service",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"securityscan", "value2"},
},
},
},
NamespaceSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "team",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"team1"},
},
},
},
TopologyKey: "zone",
},
}),
pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam2.team1", Labels: podLabel}}},
node: &node1,
wantStatus: framework.NewStatus(
framework.Unschedulable,
ErrReasonAffinityNotMatch,
ErrReasonAntiAffinityRulesNotMatch,
),
},
{
name: "anti-affinity with matching all NamespaceSelector",
pod: createPodWithAffinityTerms("subteam1.team1", "", podLabel2, nil,
[]v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "service",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"securityscan", "value2"},
},
},
},
NamespaceSelector: &metav1.LabelSelector{},
TopologyKey: "zone",
},
}),
pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam2.team1", Labels: podLabel}}},
node: &node1,
wantStatus: framework.NewStatus(
framework.Unschedulable,
ErrReasonAffinityNotMatch,
ErrReasonAntiAffinityRulesNotMatch,
),
},
{
name: "anti-affinity with non-matching NamespaceSelector",
pod: createPodWithAffinityTerms("subteam1.team1", "", podLabel2, nil,
[]v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "service",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"securityscan", "value2"},
},
},
},
NamespaceSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "team",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"team1"},
},
},
},
TopologyKey: "zone",
},
}),
pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team2", Labels: podLabel}}},
node: &node1,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
snapshot := cache.NewSnapshot(test.pods, []*v1.Node{test.node})
p := &InterPodAffinity{
sharedLister: snapshot,
n := func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) {
return New(plArgs, fh, feature.Features{
EnablePodAffinityNamespaceSelector: !test.disableNSSelector,
})
}
p := plugintesting.SetupPlugin(ctx, t, n, &config.InterPodAffinityArgs{}, snapshot, namespaces)
state := framework.NewCycleState()
preFilterStatus := p.PreFilter(context.Background(), state, test.pod)
preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, state, test.pod)
if !preFilterStatus.IsSuccess() {
if !strings.Contains(preFilterStatus.Message(), test.wantStatus.Message()) {
t.Errorf("prefilter failed with status: %v", preFilterStatus)
}
} else {
nodeInfo := mustGetNodeInfo(t, snapshot, test.node.Name)
gotStatus := p.Filter(context.Background(), state, test.pod, nodeInfo)
gotStatus := p.(framework.FilterPlugin).Filter(ctx, state, test.pod, nodeInfo)
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
}
@ -1438,7 +1599,10 @@ func TestRequiredAffinityMultipleNodes(t *testing.T) {
}),
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Name: "podA", Labels: map[string]string{"foo": "", "bar": ""}},
ObjectMeta: metav1.ObjectMeta{
Name: "podA",
Labels: map[string]string{"foo": "", "bar": ""},
},
Spec: v1.PodSpec{
NodeName: "nodeA",
},
@ -1738,18 +1902,23 @@ func TestRequiredAffinityMultipleNodes(t *testing.T) {
for indexTest, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
snapshot := cache.NewSnapshot(test.pods, test.nodes)
n := func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) {
return New(plArgs, fh, feature.Features{})
}
p := plugintesting.SetupPlugin(ctx, t, n, &config.InterPodAffinityArgs{}, snapshot,
[]runtime.Object{
&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "NS1"}},
})
for indexNode, node := range test.nodes {
p := &InterPodAffinity{
sharedLister: snapshot,
}
state := framework.NewCycleState()
preFilterStatus := p.PreFilter(context.Background(), state, test.pod)
preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, state, test.pod)
if !preFilterStatus.IsSuccess() {
t.Errorf("prefilter failed with status: %v", preFilterStatus)
}
nodeInfo := mustGetNodeInfo(t, snapshot, node.Name)
gotStatus := p.Filter(context.Background(), state, test.pod, nodeInfo)
gotStatus := p.(framework.FilterPlugin).Filter(ctx, state, test.pod, nodeInfo)
if !reflect.DeepEqual(gotStatus, test.wantStatuses[indexNode]) {
t.Errorf("index: %d status does not match: %v, want: %v", indexTest, gotStatus, test.wantStatuses[indexNode])
}
@ -1865,7 +2034,6 @@ func TestPreFilterStateAddRemovePod(t *testing.T) {
addedPod *v1.Pod
existingPods []*v1.Pod
nodes []*v1.Node
services []*v1.Service
expectedAntiAffinity topologyToMatchedTermCount
expectedAffinity topologyToMatchedTermCount
}{
@ -1968,7 +2136,6 @@ func TestPreFilterStateAddRemovePod(t *testing.T) {
},
},
},
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector1}}},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "nodeA", Labels: label1}},
{ObjectMeta: metav1.ObjectMeta{Name: "nodeB", Labels: label2}},
@ -2014,7 +2181,6 @@ func TestPreFilterStateAddRemovePod(t *testing.T) {
},
},
},
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector1}}},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "nodeA", Labels: label1}},
{ObjectMeta: metav1.ObjectMeta{Name: "nodeB", Labels: label2}},
@ -2030,15 +2196,16 @@ func TestPreFilterStateAddRemovePod(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
// getMeta creates predicate meta data given the list of pods.
getState := func(pods []*v1.Pod) (*InterPodAffinity, *framework.CycleState, *preFilterState, *cache.Snapshot) {
snapshot := cache.NewSnapshot(pods, test.nodes)
p := &InterPodAffinity{
sharedLister: snapshot,
n := func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) {
return New(plArgs, fh, feature.Features{})
}
p := plugintesting.SetupPlugin(ctx, t, n, &config.InterPodAffinityArgs{}, snapshot, nil)
cycleState := framework.NewCycleState()
preFilterStatus := p.PreFilter(context.Background(), cycleState, test.pendingPod)
preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, cycleState, test.pendingPod)
if !preFilterStatus.IsSuccess() {
t.Errorf("prefilter failed with status: %v", preFilterStatus)
}
@ -2048,7 +2215,7 @@ func TestPreFilterStateAddRemovePod(t *testing.T) {
t.Errorf("failed to get preFilterState from cycleState: %v", err)
}
return p, cycleState, state, snapshot
return p.(*InterPodAffinity), cycleState, state, snapshot
}
// allPodsState is the state produced when all pods, including test.addedPod are given to prefilter.
@ -2061,7 +2228,7 @@ func TestPreFilterStateAddRemovePod(t *testing.T) {
// Add test.addedPod to state1 and verify it is equal to allPodsState.
nodeInfo := mustGetNodeInfo(t, snapshot, test.addedPod.Spec.NodeName)
if err := ipa.AddPod(context.Background(), cycleState, test.pendingPod, framework.NewPodInfo(test.addedPod), nodeInfo); err != nil {
if err := ipa.AddPod(ctx, cycleState, test.pendingPod, framework.NewPodInfo(test.addedPod), nodeInfo); err != nil {
t.Errorf("error adding pod to meta: %v", err)
}
@ -2070,12 +2237,12 @@ func TestPreFilterStateAddRemovePod(t *testing.T) {
t.Errorf("failed to get preFilterState from cycleState: %v", err)
}
if !reflect.DeepEqual(newState.topologyToMatchedAntiAffinityTerms, test.expectedAntiAffinity) {
t.Errorf("State is not equal, got: %v, want: %v", newState.topologyToMatchedAntiAffinityTerms, test.expectedAntiAffinity)
if !reflect.DeepEqual(newState.antiAffinityCounts, test.expectedAntiAffinity) {
t.Errorf("State is not equal, got: %v, want: %v", newState.antiAffinityCounts, test.expectedAntiAffinity)
}
if !reflect.DeepEqual(newState.topologyToMatchedAffinityTerms, test.expectedAffinity) {
t.Errorf("State is not equal, got: %v, want: %v", newState.topologyToMatchedAffinityTerms, test.expectedAffinity)
if !reflect.DeepEqual(newState.affinityCounts, test.expectedAffinity) {
t.Errorf("State is not equal, got: %v, want: %v", newState.affinityCounts, test.expectedAffinity)
}
if !reflect.DeepEqual(allPodsState, state) {
@ -2095,15 +2262,15 @@ func TestPreFilterStateAddRemovePod(t *testing.T) {
func TestPreFilterStateClone(t *testing.T) {
source := &preFilterState{
topologyToMatchedExistingAntiAffinityTerms: topologyToMatchedTermCount{
existingAntiAffinityCounts: topologyToMatchedTermCount{
{key: "name", value: "machine1"}: 1,
{key: "name", value: "machine2"}: 1,
},
topologyToMatchedAffinityTerms: topologyToMatchedTermCount{
affinityCounts: topologyToMatchedTermCount{
{key: "name", value: "nodeA"}: 1,
{key: "name", value: "nodeC"}: 2,
},
topologyToMatchedAntiAffinityTerms: topologyToMatchedTermCount{
antiAffinityCounts: topologyToMatchedTermCount{
{key: "name", value: "nodeN"}: 3,
{key: "name", value: "nodeM"}: 1,
},
@ -2319,7 +2486,7 @@ func TestGetTPMapMatchingIncomingAffinityAntiAffinity(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
s := cache.NewSnapshot(tt.existingPods, tt.nodes)
l, _ := s.NodeInfos().List()
gotAffinityPodsMap, gotAntiAffinityPodsMap := getTPMapMatchingIncomingAffinityAntiAffinity(framework.NewPodInfo(tt.pod), l)
gotAffinityPodsMap, gotAntiAffinityPodsMap := getIncomingAffinityAntiAffinityCounts(framework.NewPodInfo(tt.pod), l, true)
if !reflect.DeepEqual(gotAffinityPodsMap, tt.wantAffinityPodsMap) {
t.Errorf("getTPMapMatchingIncomingAffinityAntiAffinity() gotAffinityPodsMap = %#v, want %#v", gotAffinityPodsMap, tt.wantAffinityPodsMap)
}

View File

@ -19,10 +19,14 @@ package interpodaffinity
import (
"fmt"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
)
const (
@ -37,8 +41,10 @@ var _ framework.ScorePlugin = &InterPodAffinity{}
// InterPodAffinity is a plugin that checks inter pod affinity
type InterPodAffinity struct {
args config.InterPodAffinityArgs
sharedLister framework.SharedLister
args config.InterPodAffinityArgs
sharedLister framework.SharedLister
nsLister listersv1.NamespaceLister
enableNamespaceSelector bool
}
// Name returns name of the plugin. It is used in logs, etc.
@ -47,7 +53,7 @@ func (pl *InterPodAffinity) Name() string {
}
// New initializes a new plugin and returns it.
func New(plArgs runtime.Object, h framework.Handle) (framework.Plugin, error) {
func New(plArgs runtime.Object, h framework.Handle, fts feature.Features) (framework.Plugin, error) {
if h.SnapshotSharedLister() == nil {
return nil, fmt.Errorf("SnapshotSharedlister is nil")
}
@ -58,10 +64,16 @@ func New(plArgs runtime.Object, h framework.Handle) (framework.Plugin, error) {
if err := validation.ValidateInterPodAffinityArgs(args); err != nil {
return nil, err
}
return &InterPodAffinity{
args: args,
sharedLister: h.SnapshotSharedLister(),
}, nil
pl := &InterPodAffinity{
args: args,
sharedLister: h.SnapshotSharedLister(),
enableNamespaceSelector: fts.EnablePodAffinityNamespaceSelector,
}
if pl.enableNamespaceSelector {
pl.nsLister = h.SharedInformerFactory().Core().V1().Namespaces().Lister()
}
return pl, nil
}
func getArgs(obj runtime.Object) (config.InterPodAffinityArgs, error) {
@ -71,3 +83,37 @@ func getArgs(obj runtime.Object) (config.InterPodAffinityArgs, error) {
}
return *ptr, nil
}
// Updates Namespaces with the set of namespaces identified by NamespaceSelector.
// If successful, NamespaceSelector is set to nil.
// The assumption is that the term is for an incoming pod, in which case
// namespaceSelector is either unrolled into Namespaces (and so the selector
// is set to Nothing()) or is Empty(), which means match everything. Therefore,
// there when matching against this term, there is no need to lookup the existing
// pod's namespace labels to match them against term's namespaceSelector explicitly.
func (pl *InterPodAffinity) mergeAffinityTermNamespacesIfNotEmpty(at *framework.AffinityTerm) error {
if at.NamespaceSelector.Empty() {
return nil
}
ns, err := pl.nsLister.List(at.NamespaceSelector)
if err != nil {
return err
}
for _, n := range ns {
at.Namespaces.Insert(n.Name)
}
at.NamespaceSelector = labels.Nothing()
return nil
}
// GetNamespaceLabelsSnapshot returns a snapshot of the labels associated with
// the namespace.
func GetNamespaceLabelsSnapshot(ns string, nsLister listersv1.NamespaceLister) (nsLabels labels.Set) {
podNS, err := nsLister.Get(ns)
if err == nil {
// Create and return snapshot of the labels.
return labels.Merge(podNS.Labels, nil)
}
klog.ErrorS(err, "getting namespace, assuming empty set of namespace labels", "namespace", ns)
return
}

View File

@ -22,10 +22,10 @@ import (
"math"
"sync/atomic"
v1 "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
)
// preScoreStateKey is the key in CycleState to InterPodAffinity pre-computed data for Scoring.
@ -37,6 +37,8 @@ type scoreMap map[string]map[string]int64
type preScoreState struct {
topologyScore scoreMap
podInfo *framework.PodInfo
// A copy of the incoming pod's namespace labels.
namespaceLabels labels.Set
}
// Clone implements the mandatory Clone interface. We don't really copy the data since
@ -45,30 +47,20 @@ func (s *preScoreState) Clone() framework.StateData {
return s
}
func (m scoreMap) processTerm(
term *framework.WeightedAffinityTerm,
podToCheck *v1.Pod,
fixedNode *v1.Node,
multiplier int,
) {
if len(fixedNode.Labels) == 0 {
return
}
match := schedutil.PodMatchesTermsNamespaceAndSelector(podToCheck, term.Namespaces, term.Selector)
tpValue, tpValueExist := fixedNode.Labels[term.TopologyKey]
if match && tpValueExist {
if m[term.TopologyKey] == nil {
m[term.TopologyKey] = make(map[string]int64)
func (m scoreMap) processTerm(term *framework.AffinityTerm, weight int32, pod *v1.Pod, nsLabels labels.Set, node *v1.Node, multiplier int32, enableNamespaceSelector bool) {
if term.Matches(pod, nsLabels, enableNamespaceSelector) {
if tpValue, tpValueExist := node.Labels[term.TopologyKey]; tpValueExist {
if m[term.TopologyKey] == nil {
m[term.TopologyKey] = make(map[string]int64)
}
m[term.TopologyKey][tpValue] += int64(weight * multiplier)
}
m[term.TopologyKey][tpValue] += int64(term.Weight * int32(multiplier))
}
return
}
func (m scoreMap) processTerms(terms []framework.WeightedAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) {
func (m scoreMap) processTerms(terms []framework.WeightedAffinityTerm, pod *v1.Pod, nsLabels labels.Set, node *v1.Node, multiplier int32, enableNamespaceSelector bool) {
for _, term := range terms {
m.processTerm(&term, podToCheck, fixedNode, multiplier)
m.processTerm(&term.AffinityTerm, term.Weight, pod, nsLabels, node, multiplier, enableNamespaceSelector)
}
}
@ -93,36 +85,42 @@ func (pl *InterPodAffinity) processExistingPod(
topoScore scoreMap,
) {
existingPodNode := existingPodNodeInfo.Node()
if len(existingPodNode.Labels) == 0 {
return
}
// For every soft pod affinity term of <pod>, if <existingPod> matches the term,
// increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPods>`s node by the term`s weight.
topoScore.processTerms(state.podInfo.PreferredAffinityTerms, existingPod.Pod, existingPodNode, 1)
// Note that the incoming pod's terms have the namespaceSelector merged into the namespaces, and so
// here we don't lookup the existing pod's namespace labels, hence passing nil for nsLabels.
topoScore.processTerms(state.podInfo.PreferredAffinityTerms, existingPod.Pod, nil, existingPodNode, 1, pl.enableNamespaceSelector)
// For every soft pod anti-affinity term of <pod>, if <existingPod> matches the term,
// decrement <p.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>`s node by the term`s weight.
topoScore.processTerms(state.podInfo.PreferredAntiAffinityTerms, existingPod.Pod, existingPodNode, -1)
// Note that the incoming pod's terms have the namespaceSelector merged into the namespaces, and so
// here we don't lookup the existing pod's namespace labels, hence passing nil for nsLabels.
topoScore.processTerms(state.podInfo.PreferredAntiAffinityTerms, existingPod.Pod, nil, existingPodNode, -1, pl.enableNamespaceSelector)
// For every hard pod affinity term of <existingPod>, if <pod> matches the term,
// increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the constant <args.hardPodAffinityWeight>
if pl.args.HardPodAffinityWeight > 0 {
for _, term := range existingPod.RequiredAffinityTerms {
t := framework.WeightedAffinityTerm{AffinityTerm: term, Weight: pl.args.HardPodAffinityWeight}
topoScore.processTerm(&t, incomingPod, existingPodNode, 1)
if pl.args.HardPodAffinityWeight > 0 && len(existingPodNode.Labels) != 0 {
for _, t := range existingPod.RequiredAffinityTerms {
topoScore.processTerm(&t, pl.args.HardPodAffinityWeight, incomingPod, state.namespaceLabels, existingPodNode, 1, pl.enableNamespaceSelector)
}
}
// For every soft pod affinity term of <existingPod>, if <pod> matches the term,
// increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight.
topoScore.processTerms(existingPod.PreferredAffinityTerms, incomingPod, existingPodNode, 1)
topoScore.processTerms(existingPod.PreferredAffinityTerms, incomingPod, state.namespaceLabels, existingPodNode, 1, pl.enableNamespaceSelector)
// For every soft pod anti-affinity term of <existingPod>, if <pod> matches the term,
// decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight.
topoScore.processTerms(existingPod.PreferredAntiAffinityTerms, incomingPod, existingPodNode, -1)
topoScore.processTerms(existingPod.PreferredAntiAffinityTerms, incomingPod, state.namespaceLabels, existingPodNode, -1, pl.enableNamespaceSelector)
}
// PreScore builds and writes cycle state used by Score and NormalizeScore.
@ -161,15 +159,28 @@ func (pl *InterPodAffinity) PreScore(
}
}
podInfo := framework.NewPodInfo(pod)
if podInfo.ParseError != nil {
// Ideally we never reach here, because errors will be caught by PreFilter
return framework.AsStatus(fmt.Errorf("failed to parse pod: %w", podInfo.ParseError))
}
state := &preScoreState{
topologyScore: make(map[string]map[string]int64),
podInfo: podInfo,
}
state.podInfo = framework.NewPodInfo(pod)
if state.podInfo.ParseError != nil {
// Ideally we never reach here, because errors will be caught by PreFilter
return framework.AsStatus(fmt.Errorf("failed to parse pod: %w", state.podInfo.ParseError))
}
if pl.enableNamespaceSelector {
for i := range state.podInfo.PreferredAffinityTerms {
if err := pl.mergeAffinityTermNamespacesIfNotEmpty(&state.podInfo.PreferredAffinityTerms[i].AffinityTerm); err != nil {
return framework.AsStatus(fmt.Errorf("updating PreferredAffinityTerms: %w", err))
}
}
for i := range state.podInfo.PreferredAntiAffinityTerms {
if err := pl.mergeAffinityTermNamespacesIfNotEmpty(&state.podInfo.PreferredAntiAffinityTerms[i].AffinityTerm); err != nil {
return framework.AsStatus(fmt.Errorf("updating PreferredAntiAffinityTerms: %w", err))
}
}
state.namespaceLabels = GetNamespaceLabelsSnapshot(pod.Namespace, pl.nsLister)
}
topoScores := make([]scoreMap, len(allNodes))

View File

@ -22,14 +22,25 @@ import (
"strings"
"testing"
v1 "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
plugintesting "k8s.io/kubernetes/pkg/scheduler/framework/plugins/testing"
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
)
var nsLabelT1 = map[string]string{"team": "team1"}
var nsLabelT2 = map[string]string{"team": "team2"}
var namespaces = []runtime.Object{
&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "subteam1.team1", Labels: nsLabelT1}},
&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "subteam2.team1", Labels: nsLabelT1}},
&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "subteam1.team2", Labels: nsLabelT2}},
&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "subteam2.team2", Labels: nsLabelT2}},
}
func TestPreferredAffinity(t *testing.T) {
labelRgChina := map[string]string{
"region": "China",
@ -253,6 +264,68 @@ func TestPreferredAffinity(t *testing.T) {
},
}
affinityNamespaceSelector := &v1.Affinity{
PodAffinity: &v1.PodAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{
{
Weight: 5,
PodAffinityTerm: v1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "security",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"S1"},
},
},
},
TopologyKey: "region",
Namespaces: []string{"subteam2.team2"},
NamespaceSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "team",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"team1"},
},
},
},
},
},
},
},
}
antiAffinityNamespaceSelector := &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{
{
Weight: 5,
PodAffinityTerm: v1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "security",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"S1"},
},
},
},
TopologyKey: "region",
Namespaces: []string{"subteam2.team2"},
NamespaceSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "team",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"team1"},
},
},
},
},
},
},
},
}
invalidAffinityLabels := &v1.Affinity{
PodAffinity: &v1.PodAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{
@ -297,27 +370,30 @@ func TestPreferredAffinity(t *testing.T) {
}
tests := []struct {
pod *v1.Pod
pods []*v1.Pod
nodes []*v1.Node
expectedList framework.NodeScoreList
name string
wantStatus *framework.Status
pod *v1.Pod
pods []*v1.Pod
nodes []*v1.Node
expectedList framework.NodeScoreList
name string
wantStatus *framework.Status
disableNSSelector bool
}{
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
name: "all machines are same priority as Affinity is nil",
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: 0}, {Name: "machine3", Score: 0}},
name: "all machines are same priority as Affinity is nil",
},
// the node(machine1) that have the label {"region": "China"} (match the topology key) and that have existing pods that match the labelSelector get high score
// the node(machine3) that don't have the label {"region": "whatever the value is"} (mismatch the topology key) but that have existing pods that match the labelSelector get low score
// the node(machine2) that have the label {"region": "China"} (match the topology key) but that have existing pods that mismatch the labelSelector get low score
{
name: "Affinity: pod that matches topology key & pods in nodes will get high score comparing to others" +
"which doesn't match either pods in nodes or in topology key",
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS1InRegion}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
@ -330,15 +406,14 @@ func TestPreferredAffinity(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 0}, {Name: "machine3", Score: 0}},
name: "Affinity: pod that matches topology key & pods in nodes will get high score comparing to others" +
"which doesn't match either pods in nodes or in topology key",
},
// the node1(machine1) that have the label {"region": "China"} (match the topology key) and that have existing pods that match the labelSelector get high score
// the node2(machine2) that have the label {"region": "China"}, match the topology key and have the same label value with node1, get the same high score with node1
// the node3(machine3) that have the label {"region": "India"}, match the topology key but have a different label value, don't have existing pods that match the labelSelector,
// get a low score.
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS1InRegion}},
name: "All the nodes that have the same topology key & label value with one of them has an existing pod that match the affinity rules, have the same score",
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS1InRegion}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
},
@ -348,14 +423,14 @@ func TestPreferredAffinity(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelRgIndia}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: framework.MaxNodeScore}, {Name: "machine3", Score: 0}},
name: "All the nodes that have the same topology key & label value with one of them has an existing pod that match the affinity rules, have the same score",
},
// there are 2 regions, say regionChina(machine1,machine3,machine4) and regionIndia(machine2,machine5), both regions have nodes that match the preference.
// But there are more nodes(actually more existing pods) in regionChina that match the preference than regionIndia.
// Then, nodes in regionChina get higher score than nodes in regionIndia, and all the nodes in regionChina should get a same score(high score),
// while all the nodes in regionIndia should get another same score(low score).
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS2InRegion}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
name: "Affinity: nodes in one region has more matching pods comparing to other region, so the region which has more matches will get high score",
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS2InRegion}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
@ -372,11 +447,11 @@ func TestPreferredAffinity(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "machine5", Labels: labelRgIndia}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 0}, {Name: "machine3", Score: framework.MaxNodeScore}, {Name: "machine4", Score: framework.MaxNodeScore}, {Name: "machine5", Score: 0}},
name: "Affinity: nodes in one region has more matching pods comparing to other region, so the region which has more matches will get high score",
},
// Test with the different operators and values for pod affinity scheduling preference, including some match failures.
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: affinity3}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
name: "Affinity: different Label operators and values for pod affinity scheduling preference, including some match failures ",
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: affinity3}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
@ -388,12 +463,12 @@ func TestPreferredAffinity(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: 20}, {Name: "machine2", Score: framework.MaxNodeScore}, {Name: "machine3", Score: 0}},
name: "Affinity: different Label operators and values for pod affinity scheduling preference, including some match failures ",
},
// Test the symmetry cases for affinity, the difference between affinity and symmetry is not the pod wants to run together with some existing pods,
// but the existing pods have the inter pod affinity preference while the pod to schedule satisfy the preference.
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
name: "Affinity symmetry: considered only the preferredDuringSchedulingIgnoredDuringExecution in pod affinity symmetry",
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1", Affinity: stayWithS1InRegion}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine2", Affinity: stayWithS2InRegion}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
@ -404,10 +479,38 @@ func TestPreferredAffinity(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: framework.MaxNodeScore}, {Name: "machine3", Score: 0}},
name: "Affinity symmetry: considered only the preferredDuringSchedulingIgnoredDuringExecution in pod affinity symmetry",
},
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
name: "Affinity symmetry with namespace selector",
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelSecurityS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1", Affinity: affinityNamespaceSelector}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine2", Affinity: stayWithS2InRegion}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 0}, {Name: "machine3", Score: 0}},
},
{
name: "AntiAffinity symmetry with namespace selector",
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelSecurityS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1", Affinity: antiAffinityNamespaceSelector}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine2", Affinity: stayWithS2InRegion}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: framework.MaxNodeScore}, {Name: "machine3", Score: framework.MaxNodeScore}},
},
{
name: "Affinity symmetry: considered RequiredDuringSchedulingIgnoredDuringExecution in pod affinity symmetry",
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1", Affinity: hardAffinity}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine2", Affinity: hardAffinity}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
@ -418,7 +521,6 @@ func TestPreferredAffinity(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: framework.MaxNodeScore}, {Name: "machine3", Score: 0}},
name: "Affinity symmetry: considered RequiredDuringSchedulingIgnoredDuringExecution in pod affinity symmetry",
},
// The pod to schedule prefer to stay away from some existing pods at node level using the pod anti affinity.
@ -428,7 +530,8 @@ func TestPreferredAffinity(t *testing.T) {
// there are 2 nodes, say node1 and node2, both nodes have pods that match the labelSelector and have topology-key in node.Labels.
// But there are more pods on node1 that match the preference than node2. Then, node1 get a lower score than node2.
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: awayFromS1InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
name: "Anti Affinity: pod that does not match existing pods in node will get high score ",
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: awayFromS1InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
@ -438,10 +541,10 @@ func TestPreferredAffinity(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgChina}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: framework.MaxNodeScore}},
name: "Anti Affinity: pod that does not match existing pods in node will get high score ",
},
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: awayFromS1InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
name: "Anti Affinity: pod that does not match topology key & match the pods in nodes will get higher score comparing to others ",
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: awayFromS1InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
@ -451,10 +554,10 @@ func TestPreferredAffinity(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgChina}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: framework.MaxNodeScore}},
name: "Anti Affinity: pod that does not match topology key & match the pods in nodes will get higher score comparing to others ",
},
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: awayFromS1InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
name: "Anti Affinity: one node has more matching pods comparing to other node, so the node which has more unmatches will get high score",
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: awayFromS1InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
@ -465,11 +568,11 @@ func TestPreferredAffinity(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: framework.MaxNodeScore}},
name: "Anti Affinity: one node has more matching pods comparing to other node, so the node which has more unmatches will get high score",
},
// Test the symmetry cases for anti affinity
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
name: "Anti Affinity symmetry: the existing pods in node which has anti affinity match will get high score",
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1", Affinity: awayFromS2InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine2", Affinity: awayFromS1InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
@ -479,11 +582,11 @@ func TestPreferredAffinity(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelAzAz2}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: framework.MaxNodeScore}},
name: "Anti Affinity symmetry: the existing pods in node which has anti affinity match will get high score",
},
// Test both affinity and anti-affinity
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS1InRegionAwayFromS2InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
name: "Affinity and Anti Affinity: considered only preferredDuringSchedulingIgnoredDuringExecution in both pod affinity & anti affinity",
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS1InRegionAwayFromS2InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
@ -493,14 +596,14 @@ func TestPreferredAffinity(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelAzAz1}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 0}},
name: "Affinity and Anti Affinity: considered only preferredDuringSchedulingIgnoredDuringExecution in both pod affinity & anti affinity",
},
// Combined cases considering both affinity and anti-affinity, the pod to schedule and existing pods have the same labels (they are in the same RC/service),
// the pod prefer to run together with its brother pods in the same region, but wants to stay away from them at node level,
// so that all the pods of a RC/service can stay in a same region but trying to separate with each other
// machine-1,machine-3,machine-4 are in ChinaRegion others machine-2,machine-5 are in IndiaRegion
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS1InRegionAwayFromS2InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
name: "Affinity and Anti Affinity: considering both affinity and anti-affinity, the pod to schedule and existing pods have the same labels",
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS1InRegionAwayFromS2InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
@ -518,7 +621,6 @@ func TestPreferredAffinity(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "machine5", Labels: labelRgIndia}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 0}, {Name: "machine3", Score: framework.MaxNodeScore}, {Name: "machine4", Score: framework.MaxNodeScore}, {Name: "machine5", Score: 0}},
name: "Affinity and Anti Affinity: considering both affinity and anti-affinity, the pod to schedule and existing pods have the same labels",
},
// Consider Affinity, Anti Affinity and symmetry together.
// for Affinity, the weights are: 8, 0, 0, 0
@ -526,7 +628,8 @@ func TestPreferredAffinity(t *testing.T) {
// for Affinity symmetry, the weights are: 0, 0, 8, 0
// for Anti Affinity symmetry, the weights are: 0, 0, 0, -5
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS1InRegionAwayFromS2InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
name: "Affinity and Anti Affinity and symmetry: considered only preferredDuringSchedulingIgnoredDuringExecution in both pod affinity & anti affinity & symmetry",
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: "", Affinity: stayWithS1InRegionAwayFromS2InAz}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS2}},
@ -540,13 +643,13 @@ func TestPreferredAffinity(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "machine4", Labels: labelAzAz2}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 0}, {Name: "machine3", Score: framework.MaxNodeScore}, {Name: "machine4", Score: 0}},
name: "Affinity and Anti Affinity and symmetry: considered only preferredDuringSchedulingIgnoredDuringExecution in both pod affinity & anti affinity & symmetry",
},
// Cover https://github.com/kubernetes/kubernetes/issues/82796 which panics upon:
// 1. Some nodes in a topology don't have pods with affinity, but other nodes in the same topology have.
// 2. The incoming pod doesn't have affinity.
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
name: "Avoid panic when partial nodes in a topology don't have pods with affinity",
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine2", Affinity: stayWithS1InRegionAwayFromS2InAz}},
@ -556,7 +659,6 @@ func TestPreferredAffinity(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgChina}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: 0}},
name: "Avoid panic when partial nodes in a topology don't have pods with affinity",
},
{
name: "invalid Affinity fails PreScore",
@ -576,19 +678,79 @@ func TestPreferredAffinity(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgChina}},
},
},
{
name: "Affinity with pods matching NamespaceSelector",
pod: &v1.Pod{Spec: v1.PodSpec{Affinity: affinityNamespaceSelector}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelSecurityS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team2", Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam2.team1", Labels: podLabelSecurityS1}},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 0}},
},
{
name: "Affinity with pods matching both NamespaceSelector and Namespaces fields",
pod: &v1.Pod{Spec: v1.PodSpec{Affinity: affinityNamespaceSelector}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelSecurityS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam2.team2", Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam2.team1", Labels: podLabelSecurityS1}},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 0}},
},
{
name: "Affinity with pods matching NamespaceSelector",
pod: &v1.Pod{Spec: v1.PodSpec{Affinity: antiAffinityNamespaceSelector}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelSecurityS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team2", Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam2.team1", Labels: podLabelSecurityS1}},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: framework.MaxNodeScore}},
},
{
name: "Affinity with pods matching both NamespaceSelector and Namespaces fields",
pod: &v1.Pod{Spec: v1.PodSpec{Affinity: antiAffinityNamespaceSelector}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelSecurityS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam2.team2", Labels: podLabelSecurityS1}},
{Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Namespace: "subteam2.team1", Labels: podLabelSecurityS1}},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}},
},
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: framework.MaxNodeScore}},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
state := framework.NewCycleState()
snapshot := cache.NewSnapshot(test.pods, test.nodes)
p := &InterPodAffinity{
args: config.InterPodAffinityArgs{
HardPodAffinityWeight: 1,
},
sharedLister: snapshot,
n := func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) {
return New(plArgs, fh, feature.Features{
EnablePodAffinityNamespaceSelector: !test.disableNSSelector,
})
}
status := p.PreScore(context.Background(), state, test.pod, test.nodes)
p := plugintesting.SetupPlugin(ctx, t, n, &config.InterPodAffinityArgs{HardPodAffinityWeight: 1}, snapshot, namespaces)
status := p.(framework.PreScorePlugin).PreScore(ctx, state, test.pod, test.nodes)
if !status.IsSuccess() {
if !strings.Contains(status.Message(), test.wantStatus.Message()) {
t.Errorf("unexpected error: %v", status)
@ -597,14 +759,14 @@ func TestPreferredAffinity(t *testing.T) {
var gotList framework.NodeScoreList
for _, n := range test.nodes {
nodeName := n.ObjectMeta.Name
score, status := p.Score(context.Background(), state, test.pod, nodeName)
score, status := p.(framework.ScorePlugin).Score(ctx, state, test.pod, nodeName)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}
gotList = append(gotList, framework.NodeScore{Name: nodeName, Score: score})
}
status = p.ScoreExtensions().NormalizeScore(context.Background(), state, test.pod, gotList)
status = p.(framework.ScorePlugin).ScoreExtensions().NormalizeScore(ctx, state, test.pod, gotList)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}
@ -644,6 +806,16 @@ func TestPreferredAffinityWithHardPodAffinitySymmetricWeight(t *testing.T) {
},
},
},
Namespaces: []string{"", "subteam2.team2"},
NamespaceSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "team",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"team1"},
},
},
},
TopologyKey: "region",
},
},
@ -656,9 +828,11 @@ func TestPreferredAffinityWithHardPodAffinitySymmetricWeight(t *testing.T) {
hardPodAffinityWeight int32
expectedList framework.NodeScoreList
name string
disableNSSelector bool
}{
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelServiceS1}},
name: "with default weight",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: podLabelServiceS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1", Affinity: hardPodAffinity}},
{Spec: v1.PodSpec{NodeName: "machine2", Affinity: hardPodAffinity}},
@ -670,10 +844,10 @@ func TestPreferredAffinityWithHardPodAffinitySymmetricWeight(t *testing.T) {
},
hardPodAffinityWeight: v1.DefaultHardPodAffinitySymmetricWeight,
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: framework.MaxNodeScore}, {Name: "machine3", Score: 0}},
name: "Hard Pod Affinity symmetry: hard pod affinity symmetry weights 1 by default, then nodes that match the hard pod affinity symmetry rules, get a high score",
},
{
pod: &v1.Pod{Spec: v1.PodSpec{NodeName: ""}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelServiceS1}},
name: "with zero weight",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: podLabelServiceS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1", Affinity: hardPodAffinity}},
{Spec: v1.PodSpec{NodeName: "machine2", Affinity: hardPodAffinity}},
@ -685,35 +859,79 @@ func TestPreferredAffinityWithHardPodAffinitySymmetricWeight(t *testing.T) {
},
hardPodAffinityWeight: 0,
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: 0}, {Name: "machine3", Score: 0}},
name: "Hard Pod Affinity symmetry: hard pod affinity symmetry is closed(weights 0), then nodes that match the hard pod affinity symmetry rules, get same score with those not match",
},
{
name: "with no matching namespace",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team2", Labels: podLabelServiceS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1", Affinity: hardPodAffinity}},
{Spec: v1.PodSpec{NodeName: "machine2", Affinity: hardPodAffinity}},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}},
},
hardPodAffinityWeight: v1.DefaultHardPodAffinitySymmetricWeight,
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: 0}, {Name: "machine3", Score: 0}},
},
{
name: "with matching NamespaceSelector",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "subteam1.team1", Labels: podLabelServiceS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1", Affinity: hardPodAffinity}},
{Spec: v1.PodSpec{NodeName: "machine2", Affinity: hardPodAffinity}},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}},
},
hardPodAffinityWeight: v1.DefaultHardPodAffinitySymmetricWeight,
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: framework.MaxNodeScore}, {Name: "machine3", Score: 0}},
},
{
name: "with matching Namespaces",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "subteam2.team2", Labels: podLabelServiceS1}},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1", Affinity: hardPodAffinity}},
{Spec: v1.PodSpec{NodeName: "machine2", Affinity: hardPodAffinity}},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labelRgIndia}},
{ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labelAzAz1}},
},
hardPodAffinityWeight: v1.DefaultHardPodAffinitySymmetricWeight,
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: framework.MaxNodeScore}, {Name: "machine3", Score: 0}},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
state := framework.NewCycleState()
snapshot := cache.NewSnapshot(test.pods, test.nodes)
fh, _ := runtime.NewFramework(nil, nil, nil, runtime.WithSnapshotSharedLister(snapshot))
args := &config.InterPodAffinityArgs{HardPodAffinityWeight: test.hardPodAffinityWeight}
p, err := New(args, fh)
if err != nil {
t.Fatal(err)
n := func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) {
return New(plArgs, fh, feature.Features{
EnablePodAffinityNamespaceSelector: !test.disableNSSelector,
})
}
status := p.(framework.PreScorePlugin).PreScore(context.Background(), state, test.pod, test.nodes)
p := plugintesting.SetupPlugin(ctx, t, n, &config.InterPodAffinityArgs{HardPodAffinityWeight: test.hardPodAffinityWeight}, snapshot, namespaces)
status := p.(framework.PreScorePlugin).PreScore(ctx, state, test.pod, test.nodes)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}
var gotList framework.NodeScoreList
for _, n := range test.nodes {
nodeName := n.ObjectMeta.Name
score, status := p.(framework.ScorePlugin).Score(context.Background(), state, test.pod, nodeName)
score, status := p.(framework.ScorePlugin).Score(ctx, state, test.pod, nodeName)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}
gotList = append(gotList, framework.NodeScore{Name: nodeName, Score: score})
}
status = p.(framework.ScorePlugin).ScoreExtensions().NormalizeScore(context.Background(), state, test.pod, gotList)
status = p.(framework.ScorePlugin).ScoreExtensions().NormalizeScore(ctx, state, test.pod, gotList)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}

View File

@ -17,8 +17,13 @@ limitations under the License.
package plugins
import (
apiruntime "k8s.io/apimachinery/pkg/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpreemption"
plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/imagelocality"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity"
@ -44,6 +49,10 @@ import (
// A scheduler that runs out of tree plugins can register additional plugins
// through the WithFrameworkOutOfTreeRegistry option.
func NewInTreeRegistry() runtime.Registry {
fts := plfeature.Features{
EnablePodAffinityNamespaceSelector: utilfeature.DefaultFeatureGate.Enabled(features.PodAffinityNamespaceSelector),
}
return runtime.Registry{
selectorspread.Name: selectorspread.New,
imagelocality.Name: imagelocality.New,
@ -67,11 +76,13 @@ func NewInTreeRegistry() runtime.Registry {
nodevolumelimits.GCEPDName: nodevolumelimits.NewGCEPD,
nodevolumelimits.AzureDiskName: nodevolumelimits.NewAzureDisk,
nodevolumelimits.CinderName: nodevolumelimits.NewCinder,
interpodaffinity.Name: interpodaffinity.New,
nodelabel.Name: nodelabel.New,
serviceaffinity.Name: serviceaffinity.New,
queuesort.Name: queuesort.New,
defaultbinder.Name: defaultbinder.New,
defaultpreemption.Name: defaultpreemption.New,
interpodaffinity.Name: func(plArgs apiruntime.Object, fh framework.Handle) (framework.Plugin, error) {
return interpodaffinity.New(plArgs, fh, fts)
},
nodelabel.Name: nodelabel.New,
serviceaffinity.Name: serviceaffinity.New,
queuesort.Name: queuesort.New,
defaultbinder.Name: defaultbinder.New,
defaultpreemption.Name: defaultpreemption.New,
}
}

View File

@ -0,0 +1,59 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
import (
"context"
"testing"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/scheduler/framework"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
)
// SetupPlugin creates a plugin using a framework handle that includes
// the provided sharedLister and a SharedInformerFactory with the provided objects.
// The function also creates an empty namespace (since most tests creates pods with
// empty namespace), and start informer factory.
func SetupPlugin(
ctx context.Context,
t *testing.T,
pf frameworkruntime.PluginFactory,
config runtime.Object,
sharedLister framework.SharedLister,
objs []runtime.Object,
) framework.Plugin {
objs = append([]runtime.Object{&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ""}}}, objs...)
informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(objs...), 0)
fh, err := frameworkruntime.NewFramework(nil, nil, nil,
frameworkruntime.WithSnapshotSharedLister(sharedLister),
frameworkruntime.WithInformerFactory(informerFactory))
if err != nil {
t.Fatalf("Failed creating framework runtime: %v", err)
}
p, err := pf(config, fh)
if err != nil {
t.Fatal(err)
}
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
return p
}

View File

@ -25,7 +25,7 @@ import (
"sync/atomic"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
@ -161,12 +161,12 @@ func (pi *PodInfo) Update(pod *v1.Pod) {
// Attempt to parse the affinity terms
var parseErrs []error
requiredAffinityTerms, err := getAffinityTerms(pod, schedutil.GetPodAffinityTerms(pod.Spec.Affinity))
requiredAffinityTerms, err := getAffinityTerms(pod, getPodAffinityTerms(pod.Spec.Affinity))
if err != nil {
parseErrs = append(parseErrs, fmt.Errorf("requiredAffinityTerms: %w", err))
}
requiredAntiAffinityTerms, err := getAffinityTerms(pod,
schedutil.GetPodAntiAffinityTerms(pod.Spec.Affinity))
getPodAntiAffinityTerms(pod.Spec.Affinity))
if err != nil {
parseErrs = append(parseErrs, fmt.Errorf("requiredAntiAffinityTerms: %w", err))
}
@ -189,9 +189,18 @@ func (pi *PodInfo) Update(pod *v1.Pod) {
// AffinityTerm is a processed version of v1.PodAffinityTerm.
type AffinityTerm struct {
Namespaces sets.String
Selector labels.Selector
TopologyKey string
Namespaces sets.String
Selector labels.Selector
TopologyKey string
NamespaceSelector labels.Selector
}
// Matches returns true if the pod matches the label selector and namespaces or namespace selector.
func (at *AffinityTerm) Matches(pod *v1.Pod, nsLabels labels.Set, nsSelectorEnabled bool) bool {
if at.Namespaces.Has(pod.Namespace) || (nsSelectorEnabled && at.NamespaceSelector.Matches(nsLabels)) {
return at.Selector.Matches(labels.Set(pod.Labels))
}
return false
}
// WeightedAffinityTerm is a "processed" representation of v1.WeightedAffinityTerm.
@ -240,12 +249,18 @@ func (f *FitError) Error() string {
}
func newAffinityTerm(pod *v1.Pod, term *v1.PodAffinityTerm) (*AffinityTerm, error) {
namespaces := schedutil.GetNamespacesFromPodAffinityTerm(pod, term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
return nil, err
}
return &AffinityTerm{Namespaces: namespaces, Selector: selector, TopologyKey: term.TopologyKey}, nil
namespaces := getNamespacesFromPodAffinityTerm(pod, term)
nsSelector, err := metav1.LabelSelectorAsSelector(term.NamespaceSelector)
if err != nil {
return nil, err
}
return &AffinityTerm{Namespaces: namespaces, Selector: selector, TopologyKey: term.TopologyKey, NamespaceSelector: nsSelector}, nil
}
// getAffinityTerms receives a Pod and affinity terms and returns the namespaces and
@ -292,6 +307,44 @@ func NewPodInfo(pod *v1.Pod) *PodInfo {
return pInfo
}
func getPodAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm) {
if affinity != nil && affinity.PodAffinity != nil {
if len(affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
terms = affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution
}
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
}
return terms
}
func getPodAntiAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm) {
if affinity != nil && affinity.PodAntiAffinity != nil {
if len(affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
terms = affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution
}
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(affinity.PodAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, affinity.PodAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
}
return terms
}
// returns a set of names according to the namespaces indicated in podAffinityTerm.
// If namespaces is empty it considers the given pod's namespace.
func getNamespacesFromPodAffinityTerm(pod *v1.Pod, podAffinityTerm *v1.PodAffinityTerm) sets.String {
names := sets.String{}
if len(podAffinityTerm.Namespaces) == 0 && podAffinityTerm.NamespaceSelector == nil {
names.Insert(pod.Namespace)
} else {
names.Insert(podAffinityTerm.Namespaces...)
}
return names
}
// ImageStateSummary provides summarized information about the state of an image.
type ImageStateSummary struct {
// Size of the image

View File

@ -22,10 +22,12 @@ import (
"strings"
"testing"
"github.com/google/go-cmp/cmp"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
)
func TestNewResource(t *testing.T) {
@ -1303,3 +1305,45 @@ func TestHostPortInfo_Check(t *testing.T) {
})
}
}
func TestGetNamespacesFromPodAffinityTerm(t *testing.T) {
tests := []struct {
name string
term *v1.PodAffinityTerm
want sets.String
}{
{
name: "podAffinityTerm_namespace_empty",
term: &v1.PodAffinityTerm{},
want: sets.String{metav1.NamespaceDefault: sets.Empty{}},
},
{
name: "podAffinityTerm_namespace_not_empty",
term: &v1.PodAffinityTerm{
Namespaces: []string{metav1.NamespacePublic, metav1.NamespaceSystem},
},
want: sets.NewString(metav1.NamespacePublic, metav1.NamespaceSystem),
},
{
name: "podAffinityTerm_namespace_selector_not_nil",
term: &v1.PodAffinityTerm{
NamespaceSelector: &metav1.LabelSelector{},
},
want: sets.String{},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got := getNamespacesFromPodAffinityTerm(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "topologies_pod",
Namespace: metav1.NamespaceDefault,
},
}, test.term)
if diff := cmp.Diff(test.want, got); diff != "" {
t.Errorf("unexpected diff (-want, +got):\n%s", diff)
}
})
}
}

View File

@ -29,15 +29,19 @@ import (
"sync"
"time"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
ktypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
"k8s.io/kubernetes/pkg/scheduler/internal/heap"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util"
@ -95,8 +99,11 @@ type SchedulingQueue interface {
}
// NewSchedulingQueue initializes a priority queue as a new scheduling queue.
func NewSchedulingQueue(lessFn framework.LessFunc, opts ...Option) SchedulingQueue {
return NewPriorityQueue(lessFn, opts...)
func NewSchedulingQueue(
lessFn framework.LessFunc,
informerFactory informers.SharedInformerFactory,
opts ...Option) SchedulingQueue {
return NewPriorityQueue(lessFn, informerFactory, opts...)
}
// NominatedNodeName returns nominated node name of a Pod.
@ -148,6 +155,8 @@ type PriorityQueue struct {
// closed indicates that the queue is closed.
// It is mainly used to let Pop() exit its control loop while waiting for an item.
closed bool
nsLister listersv1.NamespaceLister
}
type priorityQueueOptions struct {
@ -218,6 +227,7 @@ func newQueuedPodInfoForLookup(pod *v1.Pod, plugins ...string) *framework.Queued
// NewPriorityQueue creates a PriorityQueue object.
func NewPriorityQueue(
lessFn framework.LessFunc,
informerFactory informers.SharedInformerFactory,
opts ...Option,
) *PriorityQueue {
options := defaultPriorityQueueOptions
@ -248,6 +258,9 @@ func NewPriorityQueue(
}
pq.cond.L = &pq.lock
pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
if utilfeature.DefaultFeatureGate.Enabled(features.PodAffinityNamespaceSelector) {
pq.nsLister = informerFactory.Core().V1().Namespaces().Lister()
}
return pq
}
@ -559,17 +572,15 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.
// any affinity term that matches "pod".
// NOTE: this function assumes lock has been acquired in caller.
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*framework.QueuedPodInfo {
nsSelectorEnabled := p.nsLister != nil
var nsLabels labels.Set
if nsSelectorEnabled {
nsLabels = interpodaffinity.GetNamespaceLabelsSnapshot(pod.Namespace, p.nsLister)
}
var podsToMove []*framework.QueuedPodInfo
for _, pInfo := range p.unschedulableQ.podInfoMap {
up := pInfo.Pod
terms := util.GetPodAffinityTerms(up.Spec.Affinity)
for _, term := range terms {
namespaces := util.GetNamespacesFromPodAffinityTerm(up, &term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
klog.ErrorS(err, "Error getting label selectors for pod", "pod", klog.KObj(up))
}
if util.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
for _, term := range pInfo.RequiredAffinityTerms {
if term.Matches(pod, nsLabels, nsSelectorEnabled) {
podsToMove = append(podsToMove, pInfo)
break
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package queue
import (
"context"
"fmt"
"reflect"
"strings"
@ -24,7 +25,7 @@ import (
"testing"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
ktypes "k8s.io/apimachinery/pkg/types"
@ -117,7 +118,7 @@ func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod {
}
func TestPriorityQueue_Add(t *testing.T) {
q := NewPriorityQueue(newDefaultQueueSort())
q := NewTestQueue(context.Background(), newDefaultQueueSort())
if err := q.Add(medPriorityPodInfo.Pod); err != nil {
t.Errorf("add failed: %v", err)
}
@ -159,7 +160,7 @@ func newDefaultQueueSort() framework.LessFunc {
}
func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) {
q := NewPriorityQueue(newDefaultQueueSort())
q := NewTestQueue(context.Background(), newDefaultQueueSort())
if err := q.Add(medPriorityPodInfo.Pod); err != nil {
t.Errorf("add failed: %v", err)
}
@ -175,7 +176,7 @@ func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) {
}
func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
q := NewPriorityQueue(newDefaultQueueSort())
q := NewTestQueue(context.Background(), newDefaultQueueSort())
q.Add(highPriNominatedPodInfo.Pod)
q.AddUnschedulableIfNotPresent(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod), q.SchedulingCycle()) // Must not add anything.
q.AddUnschedulableIfNotPresent(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod), q.SchedulingCycle())
@ -207,7 +208,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
// Pods in and before current scheduling cycle will be put back to activeQueue
// if we were trying to schedule them when we received move request.
func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
q := NewPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(time.Now())))
q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(clock.NewFakeClock(time.Now())))
totalNum := 10
expectedPods := make([]v1.Pod, 0, totalNum)
for i := 0; i < totalNum; i++ {
@ -272,7 +273,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
}
func TestPriorityQueue_Pop(t *testing.T) {
q := NewPriorityQueue(newDefaultQueueSort())
q := NewTestQueue(context.Background(), newDefaultQueueSort())
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
@ -289,7 +290,7 @@ func TestPriorityQueue_Pop(t *testing.T) {
}
func TestPriorityQueue_Update(t *testing.T) {
q := NewPriorityQueue(newDefaultQueueSort())
q := NewTestQueue(context.Background(), newDefaultQueueSort())
q.Update(nil, highPriorityPodInfo.Pod)
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(highPriorityPodInfo.Pod)); !exists {
t.Errorf("Expected %v to be added to activeQ.", highPriorityPodInfo.Pod.Name)
@ -337,7 +338,7 @@ func TestPriorityQueue_Update(t *testing.T) {
}
func TestPriorityQueue_Delete(t *testing.T) {
q := NewPriorityQueue(newDefaultQueueSort())
q := NewTestQueue(context.Background(), newDefaultQueueSort())
q.Update(highPriorityPodInfo.Pod, highPriNominatedPodInfo.Pod)
q.Add(unschedulablePodInfo.Pod)
if err := q.Delete(highPriNominatedPodInfo.Pod); err != nil {
@ -413,7 +414,7 @@ func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) {
}
}
q := NewPriorityQueue(newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m))
q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m))
// Init pods in unschedulableQ.
for j := 0; j < podsInUnschedulableQ; j++ {
@ -459,7 +460,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
m := map[framework.ClusterEvent]sets.String{
{Resource: framework.Node, ActionType: framework.Add}: sets.NewString("fooPlugin"),
}
q := NewPriorityQueue(newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m))
q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m))
q.Add(medPriorityPodInfo.Pod)
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"), q.SchedulingCycle())
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin"), q.SchedulingCycle())
@ -548,7 +549,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
m := map[framework.ClusterEvent]sets.String{
{Resource: framework.Pod, ActionType: framework.Add}: sets.NewString("fakePlugin"),
}
q := NewPriorityQueue(newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m))
q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m))
q.Add(medPriorityPodInfo.Pod)
// Add a couple of pods to the unschedulableQ.
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fakePlugin"), q.SchedulingCycle())
@ -572,7 +573,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
}
func TestPriorityQueue_NominatedPodsForNode(t *testing.T) {
q := NewPriorityQueue(newDefaultQueueSort())
q := NewTestQueue(context.Background(), newDefaultQueueSort())
q.Add(medPriorityPodInfo.Pod)
q.Add(unschedulablePodInfo.Pod)
q.Add(highPriorityPodInfo.Pod)
@ -597,7 +598,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
return pendingSet
}
q := NewPriorityQueue(newDefaultQueueSort())
q := NewTestQueue(context.Background(), newDefaultQueueSort())
q.Add(medPriorityPodInfo.Pod)
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(unschedulablePodInfo.Pod), q.SchedulingCycle())
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(highPriorityPodInfo.Pod), q.SchedulingCycle())
@ -614,7 +615,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
}
func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
q := NewPriorityQueue(newDefaultQueueSort())
q := NewTestQueue(context.Background(), newDefaultQueueSort())
if err := q.Add(medPriorityPodInfo.Pod); err != nil {
t.Errorf("add failed: %v", err)
}
@ -683,7 +684,7 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
}
func TestPriorityQueue_NewWithOptions(t *testing.T) {
q := NewPriorityQueue(
q := NewTestQueue(context.Background(),
newDefaultQueueSort(),
WithPodInitialBackoffDuration(2*time.Second),
WithPodMaxBackoffDuration(20*time.Second),
@ -849,7 +850,7 @@ func TestUnschedulablePodsMap(t *testing.T) {
}
func TestSchedulingQueue_Close(t *testing.T) {
q := NewPriorityQueue(newDefaultQueueSort())
q := NewTestQueue(context.Background(), newDefaultQueueSort())
wantErr := fmt.Errorf(queueClosed)
wg := sync.WaitGroup{}
wg.Add(1)
@ -873,7 +874,7 @@ func TestSchedulingQueue_Close(t *testing.T) {
// are frequent events that move pods to the active queue.
func TestRecentlyTriedPodsGoBack(t *testing.T) {
c := clock.NewFakeClock(time.Now())
q := NewPriorityQueue(newDefaultQueueSort(), WithClock(c))
q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c))
// Add a few pods to priority queue.
for i := 0; i < 5; i++ {
p := v1.Pod{
@ -930,7 +931,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) {
// are frequent events that move pods to the active queue.
func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
c := clock.NewFakeClock(time.Now())
q := NewPriorityQueue(newDefaultQueueSort(), WithClock(c))
q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c))
// Add an unschedulable pod to a priority queue.
// This makes a situation that the pod was tried to schedule
@ -1021,7 +1022,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
// TestHighPriorityBackoff tests that a high priority pod does not block
// other pods if it is unschedulable
func TestHighPriorityBackoff(t *testing.T) {
q := NewPriorityQueue(newDefaultQueueSort())
q := NewTestQueue(context.Background(), newDefaultQueueSort())
midPod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
@ -1088,7 +1089,7 @@ func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) {
m := map[framework.ClusterEvent]sets.String{
{Resource: framework.Node, ActionType: framework.Add}: sets.NewString("fakePlugin"),
}
q := NewPriorityQueue(newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m))
q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m))
midPod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-midpod",
@ -1272,7 +1273,7 @@ func TestPodTimestamp(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
queue := NewPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp)))
queue := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp)))
var podInfoList []*framework.QueuedPodInfo
for i, op := range test.operations {
@ -1429,7 +1430,7 @@ scheduler_pending_pods{queue="unschedulable"} 0
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
resetMetrics()
queue := NewPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp)))
queue := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp)))
for i, op := range test.operations {
for _, pInfo := range test.operands[i] {
op(queue, pInfo)
@ -1458,7 +1459,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
// Case 1: A pod is created and scheduled after 1 attempt. The queue operations are
// Add -> Pop.
c := clock.NewFakeClock(timestamp)
queue := NewPriorityQueue(newDefaultQueueSort(), WithClock(c))
queue := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c))
queue.Add(pod)
pInfo, err := queue.Pop()
if err != nil {
@ -1469,7 +1470,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
// Case 2: A pod is created and scheduled after 2 attempts. The queue operations are
// Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Pop.
c = clock.NewFakeClock(timestamp)
queue = NewPriorityQueue(newDefaultQueueSort(), WithClock(c))
queue = NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c))
queue.Add(pod)
pInfo, err = queue.Pop()
if err != nil {
@ -1489,7 +1490,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
// Case 3: Similar to case 2, but before the second pop, call update, the queue operations are
// Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Update -> Pop.
c = clock.NewFakeClock(timestamp)
queue = NewPriorityQueue(newDefaultQueueSort(), WithClock(c))
queue = NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c))
queue.Add(pod)
pInfo, err = queue.Pop()
if err != nil {
@ -1587,7 +1588,7 @@ func TestIncomingPodsMetrics(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
metrics.SchedulerQueueIncomingPods.Reset()
queue := NewPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp)))
queue := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp)))
for _, op := range test.operations {
for _, pInfo := range pInfos {
op(queue, pInfo)
@ -1613,7 +1614,7 @@ func checkPerPodSchedulingMetrics(name string, t *testing.T, pInfo *framework.Qu
func TestBackOffFlow(t *testing.T) {
cl := clock.NewFakeClock(time.Now())
q := NewPriorityQueue(newDefaultQueueSort(), WithClock(cl))
q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(cl))
steps := []struct {
wantBackoff time.Duration
}{
@ -1772,7 +1773,7 @@ func TestPodMatchesEvent(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
q := NewPriorityQueue(newDefaultQueueSort())
q := NewTestQueue(context.Background(), newDefaultQueueSort())
q.clusterEventMap = tt.clusterEventMap
if got := q.podMatchesEvent(tt.podInfo, tt.event); got != tt.want {
t.Errorf("Want %v, but got %v", tt.want, got)

View File

@ -0,0 +1,46 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue
import (
"context"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
// NewTestQueueWithObjects creates a priority queue with an informer factory
// populated with the provided objects.
func NewTestQueueWithObjects(
ctx context.Context,
lessFn framework.LessFunc,
objs []runtime.Object,
opts ...Option,
) *PriorityQueue {
informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(objs...), 0)
pq := NewPriorityQueue(lessFn, informerFactory, opts...)
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
return pq
}
// NewTestQueue creates a priority queue with an empty informer factory.
func NewTestQueue(ctx context.Context, lessFn framework.LessFunc, opts ...Option) *PriorityQueue {
return NewTestQueueWithObjects(ctx, lessFn, nil, opts...)
}

View File

@ -448,7 +448,9 @@ func TestSchedulerMultipleProfilesScheduling(t *testing.T) {
// Set up scheduler for the 3 nodes.
// We use a fake filter that only allows one particular node. We create two
// profiles, each with a different node in the filter configuration.
client := clientsetfake.NewSimpleClientset(nodes...)
objs := append([]runtime.Object{
&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ""}}}, nodes...)
client := clientsetfake.NewSimpleClientset(objs...)
broadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

View File

@ -1,81 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
)
// GetNamespacesFromPodAffinityTerm returns a set of names
// according to the namespaces indicated in podAffinityTerm.
// If namespaces is empty it considers the given pod's namespace.
func GetNamespacesFromPodAffinityTerm(pod *v1.Pod, podAffinityTerm *v1.PodAffinityTerm) sets.String {
names := sets.String{}
if len(podAffinityTerm.Namespaces) == 0 {
names.Insert(pod.Namespace)
} else {
names.Insert(podAffinityTerm.Namespaces...)
}
return names
}
// PodMatchesTermsNamespaceAndSelector returns true if the given <pod>
// matches the namespace and selector defined by <affinityPod>`s <term>.
func PodMatchesTermsNamespaceAndSelector(pod *v1.Pod, namespaces sets.String, selector labels.Selector) bool {
if !namespaces.Has(pod.Namespace) {
return false
}
if !selector.Matches(labels.Set(pod.Labels)) {
return false
}
return true
}
// NodesHaveSameTopologyKey checks if nodeA and nodeB have same label value with given topologyKey as label key.
// Returns false if topologyKey is empty.
func NodesHaveSameTopologyKey(nodeA, nodeB *v1.Node, topologyKey string) bool {
if len(topologyKey) == 0 {
return false
}
if nodeA.Labels == nil || nodeB.Labels == nil {
return false
}
nodeALabel, okA := nodeA.Labels[topologyKey]
nodeBLabel, okB := nodeB.Labels[topologyKey]
// If found label in both nodes, check the label
if okB && okA {
return nodeALabel == nodeBLabel
}
return false
}
// Topologies contains topologies information of nodes.
type Topologies struct {
DefaultKeys []string
}
// NodesHaveSameTopologyKey checks if nodeA and nodeB have same label value with given topologyKey as label key.
func (tps *Topologies) NodesHaveSameTopologyKey(nodeA, nodeB *v1.Node, topologyKey string) bool {
return NodesHaveSameTopologyKey(nodeA, nodeB, topologyKey)
}

View File

@ -1,260 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/util/sets"
)
func fakePod() *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "topologies_pod",
Namespace: metav1.NamespaceDefault,
UID: "551f5a43-9f2f-11e7-a589-fa163e148d75",
},
}
}
func TestGetNamespacesFromPodAffinityTerm(t *testing.T) {
tests := []struct {
name string
podAffinityTerm *v1.PodAffinityTerm
expectedValue sets.String
}{
{
"podAffinityTerm_namespace_empty",
&v1.PodAffinityTerm{},
sets.String{metav1.NamespaceDefault: sets.Empty{}},
},
{
"podAffinityTerm_namespace_not_empty",
&v1.PodAffinityTerm{
Namespaces: []string{metav1.NamespacePublic, metav1.NamespaceSystem},
},
sets.String{metav1.NamespacePublic: sets.Empty{}, metav1.NamespaceSystem: sets.Empty{}},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
realValue := GetNamespacesFromPodAffinityTerm(fakePod(), test.podAffinityTerm)
assert.EqualValuesf(t, test.expectedValue, realValue, "Failed to test: %s", test.name)
})
}
}
func TestPodMatchesTermsNamespaceAndSelector(t *testing.T) {
fakeNamespaces := sets.String{metav1.NamespacePublic: sets.Empty{}, metav1.NamespaceSystem: sets.Empty{}}
fakeRequirement, _ := labels.NewRequirement("service", selection.In, []string{"topologies_service1", "topologies_service2"})
fakeSelector := labels.NewSelector().Add(*fakeRequirement)
tests := []struct {
name string
podNamespaces string
podLabels map[string]string
expectedResult bool
}{
{
"namespace_not_in",
metav1.NamespaceDefault,
map[string]string{"service": "topologies_service1"},
false,
},
{
"label_not_match",
metav1.NamespacePublic,
map[string]string{"service": "topologies_service3"},
false,
},
{
"normal_case",
metav1.NamespacePublic,
map[string]string{"service": "topologies_service1"},
true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeTestPod := fakePod()
fakeTestPod.Namespace = test.podNamespaces
fakeTestPod.Labels = test.podLabels
realValue := PodMatchesTermsNamespaceAndSelector(fakeTestPod, fakeNamespaces, fakeSelector)
assert.EqualValuesf(t, test.expectedResult, realValue, "Failed to test: %s", test.name)
})
}
}
func TestNodesHaveSameTopologyKey(t *testing.T) {
tests := []struct {
name string
nodeA, nodeB *v1.Node
topologyKey string
expected bool
}{
{
name: "nodeA{'a':'a'} vs. empty label in nodeB",
nodeA: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"a": "a",
},
},
},
nodeB: &v1.Node{},
expected: false,
topologyKey: "a",
},
{
name: "nodeA{'a':'a'} vs. nodeB{'a':'a'}",
nodeA: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"a": "a",
},
},
},
nodeB: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"a": "a",
},
},
},
expected: true,
topologyKey: "a",
},
{
name: "nodeA{'a':''} vs. empty label in nodeB",
nodeA: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"a": "",
},
},
},
nodeB: &v1.Node{},
expected: false,
topologyKey: "a",
},
{
name: "nodeA{'a':''} vs. nodeB{'a':''}",
nodeA: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"a": "",
},
},
},
nodeB: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"a": "",
},
},
},
expected: true,
topologyKey: "a",
},
{
name: "nodeA{'a':'a'} vs. nodeB{'a':'a'} by key{'b'}",
nodeA: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"a": "a",
},
},
},
nodeB: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"a": "a",
},
},
},
expected: false,
topologyKey: "b",
},
{
name: "topologyKey empty",
nodeA: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"a": "",
},
},
},
nodeB: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"a": "",
},
},
},
expected: false,
topologyKey: "",
},
{
name: "nodeA label nil vs. nodeB{'a':''} by key('a')",
nodeA: &v1.Node{
ObjectMeta: metav1.ObjectMeta{},
},
nodeB: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"a": "",
},
},
},
expected: false,
topologyKey: "a",
},
{
name: "nodeA{'a':''} vs. nodeB label is nil by key('a')",
nodeA: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"a": "",
},
},
},
nodeB: &v1.Node{
ObjectMeta: metav1.ObjectMeta{},
},
expected: false,
topologyKey: "a",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got := NodesHaveSameTopologyKey(test.nodeA, test.nodeB, test.topologyKey)
assert.Equalf(t, test.expected, got, "Failed to test: %s", test.name)
})
}
}

View File

@ -90,34 +90,6 @@ func MoreImportantPod(pod1, pod2 *v1.Pod) bool {
return GetPodStartTime(pod1).Before(GetPodStartTime(pod2))
}
// GetPodAffinityTerms gets pod affinity terms by a pod affinity object.
func GetPodAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm) {
if affinity != nil && affinity.PodAffinity != nil {
if len(affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
terms = affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution
}
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
}
return terms
}
// GetPodAntiAffinityTerms gets pod affinity terms by a pod anti-affinity.
func GetPodAntiAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm) {
if affinity != nil && affinity.PodAntiAffinity != nil {
if len(affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
terms = affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution
}
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(affinity.PodAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, affinity.PodAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
}
return terms
}
// PatchPod calculates the delta bytes change from <old> to <new>,
// and then submit a request to API server to patch the pod changes.
func PatchPod(cs kubernetes.Interface, old *v1.Pod, new *v1.Pod) error {