diff --git a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go index ca8263d0685..41527583a9e 100644 --- a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go +++ b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go @@ -25,11 +25,13 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config/validation" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" + "k8s.io/kubernetes/pkg/scheduler/util" ) // NodeAffinity is a plugin that checks if a pod node selector matches the node label. @@ -83,10 +85,51 @@ func (s *preFilterState) Clone() framework.StateData { // failed by this plugin schedulable. func (pl *NodeAffinity) EventsToRegister() []framework.ClusterEventWithHint { return []framework.ClusterEventWithHint{ - {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}}, + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterNodeChange}, } } +// isSchedulableAfterNodeChange is invoked whenever a node changed. It checks whether +// that change made a previously unschedulable pod schedulable. +func (pl *NodeAffinity) isSchedulableAfterNodeChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { + originalNode, modifiedNode, err := util.As[*v1.Node](oldObj, newObj) + if err != nil { + return framework.QueueAfterBackoff, err + } + + if pl.addedNodeSelector != nil && !pl.addedNodeSelector.Match(modifiedNode) { + logger.V(4).Info("added or modified node didn't match scheduler-enforced node affinity and this event won't make the Pod schedulable", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode)) + return framework.QueueSkip, nil + } + + requiredNodeAffinity := nodeaffinity.GetRequiredNodeAffinity(pod) + isMatched, err := requiredNodeAffinity.Match(modifiedNode) + if err != nil { + return framework.QueueAfterBackoff, err + } + if !isMatched { + logger.V(4).Info("node was created or updated, but doesn't matches with the pod's NodeAffinity", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode)) + return framework.QueueSkip, nil + } + + wasMatched := false + if originalNode != nil { + wasMatched, err = requiredNodeAffinity.Match(originalNode) + if err != nil { + return framework.QueueAfterBackoff, err + } + } + + if !wasMatched { + // This modification makes this Node match with Pod's NodeAffinity. + logger.V(4).Info("node was created or updated, and matches with the pod's NodeAffinity", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode)) + return framework.QueueAfterBackoff, nil + } + + logger.V(4).Info("node was created or updated, but it doesn't make this pod schedulable", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode)) + return framework.QueueSkip, nil +} + // PreFilter builds and writes cycle state used by Filter. func (pl *NodeAffinity) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) { affinity := pod.Spec.Affinity diff --git a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go index 18fbf91630f..d9f3d1b9ac1 100644 --- a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go +++ b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" @@ -1174,3 +1175,139 @@ func TestNodeAffinityPriority(t *testing.T) { }) } } + +func Test_isSchedulableAfterNodeChange(t *testing.T) { + podWithNodeAffinity := st.MakePod().NodeAffinityIn("foo", []string{"bar"}) + testcases := map[string]struct { + args *config.NodeAffinityArgs + pod *v1.Pod + oldObj, newObj interface{} + expectedHint framework.QueueingHint + expectedErr bool + }{ + "backoff-wrong-new-object": { + args: &config.NodeAffinityArgs{}, + pod: podWithNodeAffinity.Obj(), + newObj: "not-a-node", + expectedHint: framework.QueueAfterBackoff, + expectedErr: true, + }, + "backoff-wrong-old-object": { + args: &config.NodeAffinityArgs{}, + pod: podWithNodeAffinity.Obj(), + oldObj: "not-a-node", + newObj: st.MakeNode().Obj(), + expectedHint: framework.QueueAfterBackoff, + expectedErr: true, + }, + "skip-queue-on-add": { + args: &config.NodeAffinityArgs{}, + pod: podWithNodeAffinity.Obj(), + newObj: st.MakeNode().Obj(), + expectedHint: framework.QueueSkip, + }, + "queue-on-add": { + args: &config.NodeAffinityArgs{}, + pod: podWithNodeAffinity.Obj(), + newObj: st.MakeNode().Label("foo", "bar").Obj(), + expectedHint: framework.QueueAfterBackoff, + }, + "skip-unrelated-changes": { + args: &config.NodeAffinityArgs{}, + pod: podWithNodeAffinity.Obj(), + oldObj: st.MakeNode().Obj(), + newObj: st.MakeNode().Capacity(nil).Obj(), + expectedHint: framework.QueueSkip, + }, + "skip-unrelated-changes-on-labels": { + args: &config.NodeAffinityArgs{}, + pod: podWithNodeAffinity.DeepCopy(), + oldObj: st.MakeNode().Obj(), + newObj: st.MakeNode().Label("k", "v").Obj(), + expectedHint: framework.QueueSkip, + }, + "skip-labels-changes-on-suitable-node": { + args: &config.NodeAffinityArgs{}, + pod: podWithNodeAffinity.DeepCopy(), + oldObj: st.MakeNode().Label("foo", "bar").Obj(), + newObj: st.MakeNode().Label("foo", "bar").Label("k", "v").Obj(), + expectedHint: framework.QueueSkip, + }, + "skip-labels-changes-on-node-from-suitable-to-unsuitable": { + args: &config.NodeAffinityArgs{}, + pod: podWithNodeAffinity.DeepCopy(), + oldObj: st.MakeNode().Label("foo", "bar").Obj(), + newObj: st.MakeNode().Label("k", "v").Obj(), + expectedHint: framework.QueueSkip, + }, + "queue-on-labels-change-makes-pod-schedulable": { + args: &config.NodeAffinityArgs{}, + pod: podWithNodeAffinity.Obj(), + oldObj: st.MakeNode().Obj(), + newObj: st.MakeNode().Label("foo", "bar").Obj(), + expectedHint: framework.QueueAfterBackoff, + }, + "skip-queue-on-add-scheduler-enforced-node-affinity": { + args: &config.NodeAffinityArgs{ + AddedAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "foo", + Operator: v1.NodeSelectorOpIn, + Values: []string{"bar"}, + }, + }, + }, + }, + }, + }, + }, + pod: podWithNodeAffinity.Obj(), + newObj: st.MakeNode().Obj(), + expectedHint: framework.QueueSkip, + }, + "queue-on-add-scheduler-enforced-node-affinity": { + args: &config.NodeAffinityArgs{ + AddedAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "foo", + Operator: v1.NodeSelectorOpIn, + Values: []string{"bar"}, + }, + }, + }, + }, + }, + }, + }, + pod: podWithNodeAffinity.Obj(), + newObj: st.MakeNode().Label("foo", "bar").Obj(), + expectedHint: framework.QueueAfterBackoff, + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + p, err := New(ctx, tc.args, nil) + if err != nil { + t.Fatalf("Creating plugin: %v", err) + } + + actualHint, err := p.(*NodeAffinity).isSchedulableAfterNodeChange(logger, tc.pod, tc.oldObj, tc.newObj) + if tc.expectedErr { + require.Error(t, err) + return + } + require.NoError(t, err) + require.Equal(t, tc.expectedHint, actualHint) + }) + } +}