From 0cfe4438e9188e94d35d59255ac75e7dad93b024 Mon Sep 17 00:00:00 2001 From: nayihz Date: Mon, 25 Dec 2023 10:09:05 +0800 Subject: [PATCH] interpodaffinity: scheduler queueing hints --- .../plugins/interpodaffinity/plugin.go | 88 +++++++- .../plugins/interpodaffinity/plugin_test.go | 201 ++++++++++++++++++ pkg/scheduler/framework/types.go | 12 +- 3 files changed, 293 insertions(+), 8 deletions(-) create mode 100644 pkg/scheduler/framework/plugins/interpodaffinity/plugin_test.go diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go b/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go index 98b16e89cfd..e89afc0f7dc 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go @@ -20,6 +20,7 @@ import ( "context" "fmt" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" listersv1 "k8s.io/client-go/listers/core/v1" @@ -29,6 +30,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" + "k8s.io/kubernetes/pkg/scheduler/util" ) // Name is the name of the plugin used in the plugin registry and configurations. @@ -73,8 +75,8 @@ func (pl *InterPodAffinity) EventsToRegister() []framework.ClusterEventWithHint // As a workaround, we add UpdateNodeTaint event to catch the case. // We can remove UpdateNodeTaint when we remove the preCheck feature. // See: https://github.com/kubernetes/kubernetes/issues/110175 - {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}}, - {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}}, + {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}, QueueingHintFn: pl.isSchedulableAfterPodChange}, + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}, QueueingHintFn: pl.isSchedulableAfterNodeChange}, } } @@ -141,3 +143,85 @@ func GetNamespaceLabelsSnapshot(logger klog.Logger, ns string, nsLister listersv logger.V(3).Info("getting namespace, assuming empty set of namespace labels", "namespace", ns, "err", err) return } + +func (pl *InterPodAffinity) isSchedulableAfterPodChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { + originalPod, modifiedPod, err := util.As[*v1.Pod](oldObj, newObj) + if err != nil { + return framework.Queue, err + } + if (modifiedPod != nil && modifiedPod.Spec.NodeName == "") || (originalPod != nil && originalPod.Spec.NodeName == "") { + logger.V(5).Info("the added/updated/deleted pod is unscheduled, so it doesn't make the target pod schedulable", + "pod", klog.KObj(pod), "originalPod", klog.KObj(originalPod), "modifiedPod", klog.KObj(modifiedPod)) + return framework.QueueSkip, nil + } + + terms, err := framework.GetAffinityTerms(pod, framework.GetPodAffinityTerms(pod.Spec.Affinity)) + if err != nil { + return framework.Queue, err + } + + antiTerms, err := framework.GetAffinityTerms(pod, framework.GetPodAntiAffinityTerms(pod.Spec.Affinity)) + if err != nil { + return framework.Queue, err + } + + // Pod is updated. Return Queue when the updated pod matching the target pod's affinity or not matching anti-affinity. + if modifiedPod != nil && originalPod != nil { + if !podMatchesAllAffinityTerms(terms, originalPod) && podMatchesAllAffinityTerms(terms, modifiedPod) { + logger.V(5).Info("a scheduled pod was updated to match the target pod's affinity, and the pod may be schedulable now", + "pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod)) + return framework.Queue, nil + } + if podMatchesAllAffinityTerms(antiTerms, originalPod) && !podMatchesAllAffinityTerms(antiTerms, modifiedPod) { + logger.V(5).Info("a scheduled pod was updated not to match the target pod's anti affinity, and the pod may be schedulable now", + "pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod)) + return framework.Queue, nil + } + logger.V(5).Info("a scheduled pod was updated but it doesn't match the target pod's affinity or does match the target pod's anti-affinity", + "pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod)) + return framework.QueueSkip, nil + } + + // Pod is added. Return Queue when the added pod matching the target pod's affinity. + if modifiedPod != nil { + if podMatchesAllAffinityTerms(terms, modifiedPod) { + logger.V(5).Info("a scheduled pod was added and it matches the target pod's affinity", + "pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod)) + return framework.Queue, nil + } + logger.V(5).Info("a scheduled pod was added and it doesn't match the target pod's affinity", + "pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod)) + return framework.QueueSkip, nil + } + + // Pod is deleted. Return Queue when the deleted pod matching the target pod's anti-affinity. + if podMatchesAllAffinityTerms(antiTerms, originalPod) { + logger.V(5).Info("a scheduled pod was deteled but it matches the target pod's anti-affinity", + "pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod)) + return framework.QueueSkip, nil + } + logger.V(5).Info("a scheduled pod was deleted but it doesn't match the target pod's anti-affinity, and the pod may be schedulable now", + "pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod)) + return framework.Queue, nil +} + +func (pl *InterPodAffinity) isSchedulableAfterNodeChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { + _, modifiedNode, err := util.As[*v1.Node](oldObj, newObj) + if err != nil { + return framework.Queue, err + } + + terms, err := framework.GetAffinityTerms(pod, framework.GetPodAffinityTerms(pod.Spec.Affinity)) + if err != nil { + return framework.Queue, err + } + + for _, term := range terms { + if _, ok := modifiedNode.Labels[term.TopologyKey]; ok { + logger.V(5).Info("a node with topologyKey was added/updated and it may make pod schedulable", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode)) + return framework.Queue, err + } + } + logger.V(5).Info("a node is added/updated but doesn't have any topologyKey of pod affinity", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode)) + return framework.QueueSkip, nil +} diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/plugin_test.go b/pkg/scheduler/framework/plugins/interpodaffinity/plugin_test.go new file mode 100644 index 00000000000..d71fddc0dd2 --- /dev/null +++ b/pkg/scheduler/framework/plugins/interpodaffinity/plugin_test.go @@ -0,0 +1,201 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package interpodaffinity + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + v1 "k8s.io/api/core/v1" + "k8s.io/klog/v2/ktesting" + "k8s.io/kubernetes/pkg/scheduler/apis/config" + "k8s.io/kubernetes/pkg/scheduler/framework" + plugintesting "k8s.io/kubernetes/pkg/scheduler/framework/plugins/testing" + "k8s.io/kubernetes/pkg/scheduler/internal/cache" + st "k8s.io/kubernetes/pkg/scheduler/testing" +) + +func Test_isSchedulableAfterPodChange(t *testing.T) { + tests := []struct { + name string + pod *v1.Pod + oldPod, newPod *v1.Pod + expectedHint framework.QueueingHint + }{ + { + name: "add a pod which matches the pod affinity", + pod: st.MakePod().Name("p").PodAffinityIn("service", "region", []string{"securityscan", "value2"}, st.PodAffinityWithRequiredReq).Obj(), + newPod: st.MakePod().Node("fake-node").Label("service", "securityscan").Obj(), + expectedHint: framework.Queue, + }, + { + name: "add an un-scheduled pod", + pod: st.MakePod().Name("p").PodAffinityIn("service", "region", []string{"securityscan", "value2"}, st.PodAffinityWithRequiredReq).Obj(), + newPod: st.MakePod().Label("service", "securityscan").Obj(), + expectedHint: framework.QueueSkip, + }, + { + name: "add a pod which doesn't match the pod affinity", + pod: st.MakePod().Name("p").PodAffinityIn("service", "region", []string{"securityscan", "value2"}, st.PodAffinityWithRequiredReq).Obj(), + newPod: st.MakePod().Node("fake-node").Label("aaa", "a").Obj(), + expectedHint: framework.QueueSkip, + }, + { + name: "update a pod from non-match to match pod affinity", + pod: st.MakePod().Name("p").PodAffinityIn("service", "region", []string{"securityscan", "value2"}, st.PodAffinityWithRequiredReq).Obj(), + newPod: st.MakePod().Node("fake-node").Label("service", "securityscan").Obj(), + oldPod: st.MakePod().Node("fake-node").Label("aaa", "a").Obj(), + expectedHint: framework.Queue, + }, + { + name: "the updating pod matches target pod's affinity both before and after label changes", + pod: st.MakePod().Name("p").PodAffinityIn("service", "region", []string{"securityscan", "value2"}, st.PodAffinityWithRequiredReq).Obj(), + newPod: st.MakePod().Node("fake-node").Label("service", "securityscan").Obj(), + oldPod: st.MakePod().Node("fake-node").Label("service", "value2").Obj(), + expectedHint: framework.QueueSkip, + }, + { + name: "update an un-scheduled pod", + pod: st.MakePod().Name("p").PodAffinityIn("service", "region", []string{"securityscan", "value2"}, st.PodAffinityWithRequiredReq).Obj(), + newPod: st.MakePod().Label("service", "securityscan").Obj(), + oldPod: st.MakePod().Label("service", "securityscan").Obj(), + expectedHint: framework.QueueSkip, + }, + { + name: "update a pod from match to non-match the pod affinity", + pod: st.MakePod().Name("p").PodAffinityIn("service", "region", []string{"securityscan", "value2"}, st.PodAffinityWithRequiredReq).Obj(), + newPod: st.MakePod().Node("fake-node").Label("aaa", "a").Obj(), + oldPod: st.MakePod().Node("fake-node").Label("service", "securityscan").Obj(), + expectedHint: framework.QueueSkip, + }, + { + name: "update a pod from match to non-match pod's affinity - multiple terms case", + pod: st.MakePod().Name("p").PodAffinityExists("aaa", "hostname", st.PodAffinityWithRequiredReq). + PodAffinityExists("bbb", "hostname", st.PodAffinityWithRequiredReq).Obj(), + newPod: st.MakePod().Node("fake-node").Label("service", "securityscan").Obj(), + oldPod: st.MakePod().Node("fake-node").Label("aaa", "a").Obj(), + expectedHint: framework.QueueSkip, + }, + { + name: "update a pod from non-match to match pod's affinity - multiple terms case", + pod: st.MakePod().Name("p").PodAffinityExists("aaa", "hostname", st.PodAffinityWithRequiredReq). + PodAffinityExists("bbb", "hostname", st.PodAffinityWithRequiredReq).Obj(), + newPod: st.MakePod().Node("fake-node").Label("aaa", "").Label("bbb", "").Obj(), + oldPod: st.MakePod().Node("fake-node").Label("aaa", "a").Obj(), + expectedHint: framework.Queue, + }, + { + name: "modify pod label to change it from satisfying pod anti-affinity to not satisfying anti-affinity", + pod: st.MakePod().Name("p").PodAntiAffinityIn("service", "region", []string{"securityscan", "value2"}, st.PodAntiAffinityWithRequiredReq).Obj(), + newPod: st.MakePod().Node("fake-node").Label("service", "aaaa").Obj(), + oldPod: st.MakePod().Node("fake-node").Label("service", "securityscan").Obj(), + expectedHint: framework.Queue, + }, + { + name: "modify pod label to change it from not satisfying pod anti-affinity to satisfying anti-affinity", + pod: st.MakePod().Name("p").PodAntiAffinityIn("service", "region", []string{"securityscan", "value2"}, st.PodAntiAffinityWithRequiredReq).Obj(), + newPod: st.MakePod().Node("fake-node").Label("service", "securityscan").Obj(), + oldPod: st.MakePod().Node("fake-node").Label("service", "bbb").Obj(), + expectedHint: framework.QueueSkip, + }, + { + name: "delete a pod which doesn't match pod's anti-affinity", + pod: st.MakePod().Name("p").PodAntiAffinityIn("service", "region", []string{"securityscan", "value2"}, st.PodAntiAffinityWithRequiredReq).Obj(), + oldPod: st.MakePod().Node("fake-node").Label("service", "securityscan").Obj(), + expectedHint: framework.QueueSkip, + }, + { + name: "delete a pod which matches pod's anti-affinity", + pod: st.MakePod().Name("p").PodAntiAffinityIn("service", "region", []string{"securityscan", "value2"}, st.PodAntiAffinityWithRequiredReq).Obj(), + oldPod: st.MakePod().Node("fake-node").Label("aaa", "a").Obj(), + expectedHint: framework.Queue, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + snapshot := cache.NewSnapshot(nil, nil) + pl := plugintesting.SetupPluginWithInformers(ctx, t, New, &config.InterPodAffinityArgs{}, snapshot, namespaces) + p := pl.(*InterPodAffinity) + actualHint, err := p.isSchedulableAfterPodChange(logger, tc.pod, tc.oldPod, tc.newPod) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if diff := cmp.Diff(tc.expectedHint, actualHint); diff != "" { + t.Errorf("expected QueuingHint doesn't match (-want,+got): \n %s", diff) + } + }) + } +} + +func Test_isSchedulableAfterNodeChange(t *testing.T) { + testcases := []struct { + name string + pod *v1.Pod + oldNode, newNode *v1.Node + expectedHint framework.QueueingHint + }{ + { + name: "add a new node with matched topologyKey", + pod: st.MakePod().Name("p").PodAffinityIn("service", "zone", []string{"securityscan", "value2"}, st.PodAffinityWithRequiredReq).Obj(), + newNode: st.MakeNode().Name("node-a").Label("zone", "zone1").Obj(), + expectedHint: framework.Queue, + }, + { + name: "add a new node without matched topologyKey", + pod: st.MakePod().Name("p").PodAffinityIn("service", "region", []string{"securityscan", "value2"}, st.PodAffinityWithRequiredReq).Obj(), + newNode: st.MakeNode().Name("node-a").Label("zone", "zone1").Obj(), + expectedHint: framework.QueueSkip, + }, + { + name: "update node topologyKey", + pod: st.MakePod().Name("p").PodAffinityIn("service", "zone", []string{"securityscan", "value2"}, st.PodAffinityWithRequiredReq).Obj(), + oldNode: st.MakeNode().Name("node-a").Label("zone", "zone1").Obj(), + newNode: st.MakeNode().Name("node-a").Label("zone", "zone2").Obj(), + expectedHint: framework.Queue, + }, + { + name: "update node lable but not topologyKey", + pod: st.MakePod().Name("p").PodAffinityIn("service", "zone", []string{"securityscan", "value2"}, st.PodAffinityWithRequiredReq).Obj(), + oldNode: st.MakeNode().Name("node-a").Label("aaa", "a").Obj(), + newNode: st.MakeNode().Name("node-a").Label("aaa", "b").Obj(), + expectedHint: framework.QueueSkip, + }, + } + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + snapshot := cache.NewSnapshot(nil, nil) + pl := plugintesting.SetupPluginWithInformers(ctx, t, New, &config.InterPodAffinityArgs{}, snapshot, namespaces) + p := pl.(*InterPodAffinity) + actualHint, err := p.isSchedulableAfterNodeChange(logger, tc.pod, tc.oldNode, tc.newNode) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if diff := cmp.Diff(tc.expectedHint, actualHint); diff != "" { + t.Errorf("expected QueuingHint doesn't match (-want,+got): \n %s", diff) + } + }) + } +} diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index 16421d86a8e..bdedd76f0c9 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -272,12 +272,12 @@ func (pi *PodInfo) Update(pod *v1.Pod) error { // Attempt to parse the affinity terms var parseErrs []error - requiredAffinityTerms, err := getAffinityTerms(pod, getPodAffinityTerms(pod.Spec.Affinity)) + requiredAffinityTerms, err := GetAffinityTerms(pod, GetPodAffinityTerms(pod.Spec.Affinity)) if err != nil { parseErrs = append(parseErrs, fmt.Errorf("requiredAffinityTerms: %w", err)) } - requiredAntiAffinityTerms, err := getAffinityTerms(pod, - getPodAntiAffinityTerms(pod.Spec.Affinity)) + requiredAntiAffinityTerms, err := GetAffinityTerms(pod, + GetPodAntiAffinityTerms(pod.Spec.Affinity)) if err != nil { parseErrs = append(parseErrs, fmt.Errorf("requiredAntiAffinityTerms: %w", err)) } @@ -435,7 +435,7 @@ func newAffinityTerm(pod *v1.Pod, term *v1.PodAffinityTerm) (*AffinityTerm, erro // getAffinityTerms receives a Pod and affinity terms and returns the namespaces and // selectors of the terms. -func getAffinityTerms(pod *v1.Pod, v1Terms []v1.PodAffinityTerm) ([]AffinityTerm, error) { +func GetAffinityTerms(pod *v1.Pod, v1Terms []v1.PodAffinityTerm) ([]AffinityTerm, error) { if v1Terms == nil { return nil, nil } @@ -477,7 +477,7 @@ func NewPodInfo(pod *v1.Pod) (*PodInfo, error) { return pInfo, err } -func getPodAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm) { +func GetPodAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm) { if affinity != nil && affinity.PodAffinity != nil { if len(affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 { terms = affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution @@ -490,7 +490,7 @@ func getPodAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm) { return terms } -func getPodAntiAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm) { +func GetPodAntiAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm) { if affinity != nil && affinity.PodAntiAffinity != nil { if len(affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 { terms = affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution