From d66f8f94135093651765c37a88b6e1e4755113e4 Mon Sep 17 00:00:00 2001 From: AxeZhan Date: Fri, 8 Dec 2023 16:12:13 +0800 Subject: [PATCH] schedulingQueue update pod by queueHint --- .../framework/plugins/feature/feature.go | 1 + .../framework/plugins/noderesources/fit.go | 16 +- .../plugins/noderesources/fit_test.go | 57 ++-- pkg/scheduler/framework/plugins/registry.go | 5 +- .../schedulinggates/scheduling_gates.go | 47 +++- .../schedulinggates/scheduling_gates_test.go | 61 ++++- .../tainttoleration/taint_toleration.go | 46 +++- .../tainttoleration/taint_toleration_test.go | 133 ++++++++- pkg/scheduler/framework/types.go | 11 + pkg/scheduler/internal/queue/events.go | 2 + .../internal/queue/scheduling_queue.go | 40 ++- .../internal/queue/scheduling_queue_test.go | 119 +++++--- .../scheduler/plugins/plugins_test.go | 258 ++++++++++-------- test/integration/scheduler/queue_test.go | 89 +++++- 14 files changed, 671 insertions(+), 214 deletions(-) diff --git a/pkg/scheduler/framework/plugins/feature/feature.go b/pkg/scheduler/framework/plugins/feature/feature.go index ace53c7b34e..77158c12e05 100644 --- a/pkg/scheduler/framework/plugins/feature/feature.go +++ b/pkg/scheduler/framework/plugins/feature/feature.go @@ -27,4 +27,5 @@ type Features struct { EnablePodDisruptionConditions bool EnableInPlacePodVerticalScaling bool EnableSidecarContainers bool + EnableSchedulingQueueHint bool } diff --git a/pkg/scheduler/framework/plugins/noderesources/fit.go b/pkg/scheduler/framework/plugins/noderesources/fit.go index e3461c9cfec..8e4c3c34c99 100644 --- a/pkg/scheduler/framework/plugins/noderesources/fit.go +++ b/pkg/scheduler/framework/plugins/noderesources/fit.go @@ -300,18 +300,20 @@ func (f *Fit) isSchedulableAfterPodChange(logger klog.Logger, pod *v1.Pod, oldOb return framework.Queue, nil } -// isResourceScaleDown checks whether the resource request of the modified pod is less than the original pod -// for the resources requested by the pod we are trying to schedule. -func (f *Fit) isResourceScaleDown(targetPod, originalOtherPod, modifiedOtherPod *v1.Pod) bool { - if modifiedOtherPod.Spec.NodeName == "" { - // no resource is freed up whatever the pod is modified. +// isResourceScaleDown checks whether an update event may make the pod schedulable. Specifically: +// - Returns true when an update event shows a scheduled pod's resource request got reduced. +// - Returns true when an update event is for the unscheduled pod itself, and it shows the pod's resource request got reduced. +func (f *Fit) isResourceScaleDown(targetPod, originalPod, modifiedPod *v1.Pod) bool { + if modifiedPod.UID != targetPod.UID && modifiedPod.Spec.NodeName == "" { + // If the update event is not for targetPod and a scheduled Pod, + // it wouldn't make targetPod schedulable. return false } // the other pod was scheduled, so modification or deletion may free up some resources. originalMaxResourceReq, modifiedMaxResourceReq := &framework.Resource{}, &framework.Resource{} - originalMaxResourceReq.SetMaxResource(resource.PodRequests(originalOtherPod, resource.PodResourcesOptions{InPlacePodVerticalScalingEnabled: f.enableInPlacePodVerticalScaling})) - modifiedMaxResourceReq.SetMaxResource(resource.PodRequests(modifiedOtherPod, resource.PodResourcesOptions{InPlacePodVerticalScalingEnabled: f.enableInPlacePodVerticalScaling})) + originalMaxResourceReq.SetMaxResource(resource.PodRequests(originalPod, resource.PodResourcesOptions{InPlacePodVerticalScalingEnabled: f.enableInPlacePodVerticalScaling})) + modifiedMaxResourceReq.SetMaxResource(resource.PodRequests(modifiedPod, resource.PodResourcesOptions{InPlacePodVerticalScalingEnabled: f.enableInPlacePodVerticalScaling})) // check whether the resource request of the modified pod is less than the original pod. podRequests := resource.PodRequests(targetPod, resource.PodResourcesOptions{InPlacePodVerticalScalingEnabled: f.enableInPlacePodVerticalScaling}) diff --git a/pkg/scheduler/framework/plugins/noderesources/fit_test.go b/pkg/scheduler/framework/plugins/noderesources/fit_test.go index 0c551577b43..c3b5a332988 100644 --- a/pkg/scheduler/framework/plugins/noderesources/fit_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/fit_test.go @@ -1145,9 +1145,9 @@ func Test_isSchedulableAfterPodChange(t *testing.T) { expectedHint: framework.Queue, expectedErr: true, }, - "queue-on-deleted": { - pod: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), - oldObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Node("fake").Obj(), + "queue-on-other-pod-deleted": { + pod: st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), + oldObj: st.MakePod().Name("pod2").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Node("fake").Obj(), enableInPlacePodVerticalScaling: true, expectedHint: framework.Queue, }, @@ -1158,44 +1158,51 @@ func Test_isSchedulableAfterPodChange(t *testing.T) { expectedHint: framework.QueueSkip, }, "skip-queue-on-disable-inplace-pod-vertical-scaling": { - pod: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), - oldObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Node("fake").Obj(), - newObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Node("fake").Obj(), + pod: st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), + oldObj: st.MakePod().Name("pod2").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Node("fake").Obj(), + newObj: st.MakePod().Name("pod2").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Node("fake").Obj(), enableInPlacePodVerticalScaling: false, expectedHint: framework.QueueSkip, }, - "skip-queue-on-unscheduled-pod": { - pod: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), - oldObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(), - newObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), + "skip-queue-on-other-unscheduled-pod": { + pod: st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).UID("uid0").Obj(), + oldObj: st.MakePod().Name("pod2").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).UID("uid1").Obj(), + newObj: st.MakePod().Name("pod2").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).UID("uid1").Obj(), enableInPlacePodVerticalScaling: true, expectedHint: framework.QueueSkip, }, - "skip-queue-on-non-resource-changes": { + "skip-queue-on-other-pod-non-resource-changes": { pod: &v1.Pod{}, - oldObj: st.MakePod().Label("k", "v").Node("fake").Obj(), - newObj: st.MakePod().Label("foo", "bar").Node("fake").Obj(), + oldObj: st.MakePod().Name("pod2").Label("k", "v").Node("fake").Obj(), + newObj: st.MakePod().Name("pod2").Label("foo", "bar").Node("fake").Obj(), enableInPlacePodVerticalScaling: true, expectedHint: framework.QueueSkip, }, - "skip-queue-on-unrelated-resource-changes": { - pod: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), - oldObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceMemory: "2"}).Node("fake").Obj(), - newObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceMemory: "1"}).Node("fake").Obj(), + "skip-queue-on-other-pod-unrelated-resource-changes": { + pod: st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), + oldObj: st.MakePod().Name("pod2").Req(map[v1.ResourceName]string{v1.ResourceMemory: "2"}).Node("fake").Obj(), + newObj: st.MakePod().Name("pod2").Req(map[v1.ResourceName]string{v1.ResourceMemory: "1"}).Node("fake").Obj(), enableInPlacePodVerticalScaling: true, expectedHint: framework.QueueSkip, }, - "skip-queue-on-resource-scale-up": { - pod: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), - oldObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Node("fake").Obj(), - newObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Node("fake").Obj(), + "skip-queue-on-other-pod-resource-scale-up": { + pod: st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), + oldObj: st.MakePod().Name("pod2").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Node("fake").Obj(), + newObj: st.MakePod().Name("pod2").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Node("fake").Obj(), enableInPlacePodVerticalScaling: true, expectedHint: framework.QueueSkip, }, - "queue-on-some-resource-scale-down": { - pod: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), - oldObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Node("fake").Obj(), - newObj: st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Node("fake").Obj(), + "queue-on-other-pod-some-resource-scale-down": { + pod: st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), + oldObj: st.MakePod().Name("pod2").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Node("fake").Obj(), + newObj: st.MakePod().Name("pod2").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Node("fake").Obj(), + enableInPlacePodVerticalScaling: true, + expectedHint: framework.Queue, + }, + "queue-on-target-pod-some-resource-scale-down": { + pod: st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), + oldObj: st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(), + newObj: st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), enableInPlacePodVerticalScaling: true, expectedHint: framework.Queue, }, diff --git a/pkg/scheduler/framework/plugins/registry.go b/pkg/scheduler/framework/plugins/registry.go index a62f823f5d3..27df67b1e75 100644 --- a/pkg/scheduler/framework/plugins/registry.go +++ b/pkg/scheduler/framework/plugins/registry.go @@ -53,12 +53,13 @@ func NewInTreeRegistry() runtime.Registry { EnablePodDisruptionConditions: feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions), EnableInPlacePodVerticalScaling: feature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling), EnableSidecarContainers: feature.DefaultFeatureGate.Enabled(features.SidecarContainers), + EnableSchedulingQueueHint: feature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints), } registry := runtime.Registry{ dynamicresources.Name: runtime.FactoryAdapter(fts, dynamicresources.New), imagelocality.Name: imagelocality.New, - tainttoleration.Name: tainttoleration.New, + tainttoleration.Name: runtime.FactoryAdapter(fts, tainttoleration.New), nodename.Name: nodename.New, nodeports.Name: nodeports.New, nodeaffinity.Name: nodeaffinity.New, @@ -78,7 +79,7 @@ func NewInTreeRegistry() runtime.Registry { queuesort.Name: queuesort.New, defaultbinder.Name: defaultbinder.New, defaultpreemption.Name: runtime.FactoryAdapter(fts, defaultpreemption.New), - schedulinggates.Name: schedulinggates.New, + schedulinggates.Name: runtime.FactoryAdapter(fts, schedulinggates.New), } return registry diff --git a/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go b/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go index a729c60d43c..2b8c4564f0e 100644 --- a/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go +++ b/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go @@ -22,15 +22,21 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" + "k8s.io/kubernetes/pkg/scheduler/util" ) // Name of the plugin used in the plugin registry and configurations. const Name = names.SchedulingGates // SchedulingGates checks if a Pod carries .spec.schedulingGates. -type SchedulingGates struct{} +type SchedulingGates struct { + enableSchedulingQueueHint bool +} var _ framework.PreEnqueuePlugin = &SchedulingGates{} var _ framework.EnqueueExtensions = &SchedulingGates{} @@ -50,13 +56,42 @@ func (pl *SchedulingGates) PreEnqueue(ctx context.Context, p *v1.Pod) *framework return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("waiting for scheduling gates: %v", gates)) } -// EventsToRegister returns nil here to indicate that schedulingGates plugin is not -// interested in any event but its own update. +// EventsToRegister returns the possible events that may make a Pod +// failed by this plugin schedulable. func (pl *SchedulingGates) EventsToRegister() []framework.ClusterEventWithHint { - return nil + if !pl.enableSchedulingQueueHint { + return nil + } + // When the QueueingHint feature is enabled, + // the scheduling queue uses Pod/Update Queueing Hint + // to determine whether a Pod's update makes the Pod schedulable or not. + // https://github.com/kubernetes/kubernetes/pull/122234 + return []framework.ClusterEventWithHint{ + // Pods can be more schedulable once it's gates are removed + {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodChange}, + } } // New initializes a new plugin and returns it. -func New(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { - return &SchedulingGates{}, nil +func New(_ context.Context, _ runtime.Object, _ framework.Handle, fts feature.Features) (framework.Plugin, error) { + return &SchedulingGates{ + enableSchedulingQueueHint: fts.EnableSchedulingQueueHint, + }, nil +} + +func (pl *SchedulingGates) isSchedulableAfterPodChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { + _, modifiedPod, err := util.As[*v1.Pod](oldObj, newObj) + if err != nil { + return framework.Queue, err + } + + if modifiedPod.UID != pod.UID { + // If the update event is not for targetPod, it wouldn't make targetPod schedulable. + return framework.QueueSkip, nil + } + + if len(modifiedPod.Spec.SchedulingGates) == 0 { + return framework.Queue, nil + } + return framework.QueueSkip, nil } diff --git a/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates_test.go b/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates_test.go index 3c6ceea470f..aa0d50d1e47 100644 --- a/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates_test.go +++ b/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates_test.go @@ -20,9 +20,11 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" st "k8s.io/kubernetes/pkg/scheduler/testing" "k8s.io/kubernetes/test/utils/ktesting" ) @@ -48,7 +50,7 @@ func TestPreEnqueue(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { _, ctx := ktesting.NewTestContext(t) - p, err := New(ctx, nil, nil) + p, err := New(ctx, nil, nil, feature.Features{}) if err != nil { t.Fatalf("Creating plugin: %v", err) } @@ -60,3 +62,60 @@ func TestPreEnqueue(t *testing.T) { }) } } + +func Test_isSchedulableAfterPodChange(t *testing.T) { + testcases := map[string]struct { + pod *v1.Pod + oldObj, newObj interface{} + expectedHint framework.QueueingHint + expectedErr bool + }{ + "backoff-wrong-old-object": { + pod: &v1.Pod{}, + oldObj: "not-a-pod", + expectedHint: framework.Queue, + expectedErr: true, + }, + "backoff-wrong-new-object": { + pod: &v1.Pod{}, + newObj: "not-a-pod", + expectedHint: framework.Queue, + expectedErr: true, + }, + "skip-queue-on-other-pod-updated": { + pod: st.MakePod().Name("p").SchedulingGates([]string{"foo", "bar"}).UID("uid0").Obj(), + oldObj: st.MakePod().Name("p1").SchedulingGates([]string{"foo", "bar"}).UID("uid1").Obj(), + newObj: st.MakePod().Name("p1").SchedulingGates([]string{"foo"}).UID("uid1").Obj(), + expectedHint: framework.QueueSkip, + }, + "skip-queue-on-gates-not-empty": { + pod: st.MakePod().Name("p").SchedulingGates([]string{"foo", "bar"}).Obj(), + oldObj: st.MakePod().Name("p").SchedulingGates([]string{"foo", "bar"}).Obj(), + newObj: st.MakePod().Name("p").SchedulingGates([]string{"foo"}).Obj(), + expectedHint: framework.QueueSkip, + }, + "queue-on-gates-become-empty": { + pod: st.MakePod().Name("p").SchedulingGates([]string{"foo"}).Obj(), + oldObj: st.MakePod().Name("p").SchedulingGates([]string{"foo"}).Obj(), + newObj: st.MakePod().Name("p").SchedulingGates([]string{}).Obj(), + expectedHint: framework.Queue, + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + p, err := New(ctx, nil, nil, feature.Features{}) + if err != nil { + t.Fatalf("Creating plugin: %v", err) + } + actualHint, err := p.(*SchedulingGates).isSchedulableAfterPodChange(logger, tc.pod, tc.oldObj, tc.newObj) + if tc.expectedErr { + require.Error(t, err) + return + } + require.NoError(t, err) + require.Equal(t, tc.expectedHint, actualHint) + }) + } +} diff --git a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go index 3e95a06d156..ca7a1d88565 100644 --- a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go +++ b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go @@ -25,6 +25,7 @@ import ( v1helper "k8s.io/component-helpers/scheduling/corev1" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" "k8s.io/kubernetes/pkg/scheduler/util" @@ -32,7 +33,8 @@ import ( // TaintToleration is a plugin that checks if a pod tolerates a node's taints. type TaintToleration struct { - handle framework.Handle + handle framework.Handle + enableSchedulingQueueHint bool } var _ framework.FilterPlugin = &TaintToleration{} @@ -57,9 +59,19 @@ func (pl *TaintToleration) Name() string { // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. func (pl *TaintToleration) EventsToRegister() []framework.ClusterEventWithHint { - return []framework.ClusterEventWithHint{ + clusterEventWithHint := []framework.ClusterEventWithHint{ {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterNodeChange}, } + if !pl.enableSchedulingQueueHint { + return clusterEventWithHint + } + // When the QueueingHint feature is enabled, + // the scheduling queue uses Pod/Update Queueing Hint + // to determine whether a Pod's update makes the Pod schedulable or not. + // https://github.com/kubernetes/kubernetes/pull/122234 + clusterEventWithHint = append(clusterEventWithHint, + framework.ClusterEventWithHint{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodChange}) + return clusterEventWithHint } // isSchedulableAfterNodeChange is invoked for all node events reported by @@ -191,6 +203,32 @@ func (pl *TaintToleration) ScoreExtensions() framework.ScoreExtensions { } // New initializes a new plugin and returns it. -func New(_ context.Context, _ runtime.Object, h framework.Handle) (framework.Plugin, error) { - return &TaintToleration{handle: h}, nil +func New(_ context.Context, _ runtime.Object, h framework.Handle, fts feature.Features) (framework.Plugin, error) { + return &TaintToleration{ + handle: h, + enableSchedulingQueueHint: fts.EnableSchedulingQueueHint, + }, nil +} + +// isSchedulableAfterPodChange is invoked whenever a pod changed. It checks whether +// that change made a previously unschedulable pod schedulable. +// When an unscheduled Pod, which was rejected by TaintToleration, is updated to have a new toleration, +// it may make the Pod schedulable. +func (pl *TaintToleration) isSchedulableAfterPodChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { + originalPod, modifiedPod, err := util.As[*v1.Pod](oldObj, newObj) + if err != nil { + return framework.Queue, err + } + + if pod.UID == modifiedPod.UID && + len(originalPod.Spec.Tolerations) != len(modifiedPod.Spec.Tolerations) { + // An unscheduled Pod got a new toleration. + // Due to API validation, the user can add, but cannot modify or remove tolerations. + // So, it's enough to just check the length of tolerations to notice the update. + // And, any updates in tolerations could make Pod schedulable. + logger.V(5).Info("a new toleration is added for the Pod, and it may make it schedulable", "pod", klog.KObj(modifiedPod)) + return framework.Queue, nil + } + + return framework.QueueSkip, nil } diff --git a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration_test.go b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration_test.go index 1f31f675277..ac542b94b46 100644 --- a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration_test.go +++ b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration_test.go @@ -21,10 +21,12 @@ import ( "reflect" "testing" + "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/runtime" "k8s.io/kubernetes/pkg/scheduler/internal/cache" tf "k8s.io/kubernetes/pkg/scheduler/testing/framework" @@ -238,7 +240,7 @@ func TestTaintTolerationScore(t *testing.T) { snapshot := cache.NewSnapshot(nil, test.nodes) fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot)) - p, err := New(ctx, nil, fh) + p, err := New(ctx, nil, fh, feature.Features{}) if err != nil { t.Fatalf("creating plugin: %v", err) } @@ -342,7 +344,7 @@ func TestTaintTolerationFilter(t *testing.T) { _, ctx := ktesting.NewTestContext(t) nodeInfo := framework.NewNodeInfo() nodeInfo.SetNode(test.node) - p, err := New(ctx, nil, nil) + p, err := New(ctx, nil, nil, feature.Features{}) if err != nil { t.Fatalf("creating plugin: %v", err) } @@ -418,3 +420,130 @@ func TestIsSchedulableAfterNodeChange(t *testing.T) { }) } } + +func Test_isSchedulableAfterPodChange(t *testing.T) { + testcases := map[string]struct { + pod *v1.Pod + oldObj, newObj interface{} + expectedHint framework.QueueingHint + expectedErr bool + }{ + "backoff-wrong-new-object": { + pod: &v1.Pod{}, + newObj: "not-a-pod", + expectedHint: framework.Queue, + expectedErr: true, + }, + "backoff-wrong-old-object": { + pod: &v1.Pod{}, + oldObj: "not-a-pod", + newObj: &v1.Pod{}, + expectedHint: framework.Queue, + expectedErr: true, + }, + "skip-updates-other-pod": { + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "ns-1", + UID: "uid0", + }}, + oldObj: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-2", + Namespace: "ns-1", + UID: "uid1", + }}, + newObj: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-2", + Namespace: "ns-1", + UID: "uid1", + }, + Spec: v1.PodSpec{ + Tolerations: []v1.Toleration{ + { + Key: "foo", + Effect: v1.TaintEffectNoSchedule, + }, + }, + }, + }, + expectedHint: framework.QueueSkip, + expectedErr: false, + }, + "skip-updates-not-toleration": { + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "ns-1", + }}, + oldObj: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "ns-1", + }}, + newObj: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "ns-1", + Labels: map[string]string{"foo": "bar"}, + }, + }, + expectedHint: framework.QueueSkip, + expectedErr: false, + }, + "queue-on-toleration-added": { + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "ns-1", + }}, + oldObj: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "ns-1", + }}, + newObj: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "ns-1", + }, + Spec: v1.PodSpec{ + Tolerations: []v1.Toleration{ + { + Key: "foo", + Effect: v1.TaintEffectNoSchedule, + }, + }, + }, + }, + expectedHint: framework.Queue, + expectedErr: false, + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + p, err := New(ctx, nil, nil, feature.Features{}) + if err != nil { + t.Fatalf("creating plugin: %v", err) + } + actualHint, err := p.(*TaintToleration).isSchedulableAfterPodChange(logger, tc.pod, tc.oldObj, tc.newObj) + if tc.expectedErr { + if err == nil { + t.Errorf("unexpected success") + } + return + } + if err != nil { + t.Errorf("unexpected error") + return + } + if diff := cmp.Diff(tc.expectedHint, actualHint); diff != "" { + t.Errorf("Unexpected hint (-want, +got):\n%s", diff) + } + }) + } +} diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index 164e6aa4095..4910851e5f0 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -72,6 +72,17 @@ const ( // - a Pod that is deleted // - a Pod that was assumed, but gets un-assumed due to some errors in the binding cycle. // - an existing Pod that was unscheduled but gets scheduled to a Node. + // + // Note that the Pod event type includes the events for the unscheduled Pod itself. + // i.e., when unscheduled Pods are updated, the scheduling queue checks with Pod/Update QueueingHint(s) whether the update may make the pods schedulable, + // and requeues them to activeQ/backoffQ when at least one QueueingHint(s) return Queue. + // Plugins **have to** implement a QueueingHint for Pod/Update event + // if the rejection from them could be resolved by updating unscheduled Pods themselves. + // Example: Pods that require excessive resources may be rejected by the noderesources plugin, + // if this unscheduled pod is updated to require fewer resources, + // the previous rejection from noderesources plugin can be resolved. + // this plugin would implement QueueingHint for Pod/Update event + // that returns Queue when such label changes are made in unscheduled Pods. Pod GVK = "Pod" // A note about NodeAdd event and UpdateNodeTaint event: // NodeAdd QueueingHint isn't always called because of the internal feature called preCheck. diff --git a/pkg/scheduler/internal/queue/events.go b/pkg/scheduler/internal/queue/events.go index 1d6e8a907e3..22b409a9ec1 100644 --- a/pkg/scheduler/internal/queue/events.go +++ b/pkg/scheduler/internal/queue/events.go @@ -43,6 +43,8 @@ var ( // AssignedPodUpdate is the event when a pod is updated that causes pods with matching affinity // terms to be more schedulable. AssignedPodUpdate = framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update, Label: "AssignedPodUpdate"} + // UnscheduledPodUpdate is the event when an unscheduled pod is updated. + UnscheduledPodUpdate = framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update, Label: "UnschedulablePodUpdate"} // AssignedPodDelete is the event when a pod is deleted that causes pods with matching affinity // terms to be more schedulable. AssignedPodDelete = framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete, Label: "AssignedPodDelete"} diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 93a8333a4a5..292e72916f6 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -1007,27 +1007,45 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error if usPodInfo := p.unschedulablePods.get(newPod); usPodInfo != nil { pInfo := updatePod(usPodInfo, newPod) p.updateNominatedPodUnlocked(logger, oldPod, pInfo.PodInfo) + gated := usPodInfo.Gated + if p.isSchedulingQueueHintEnabled { + // When unscheduled Pods are updated, we check with QueueingHint + // whether the update may make the pods schedulable. + // Plugins have to implement a QueueingHint for Pod/Update event + // if the rejection from them could be resolved by updating unscheduled Pods itself. + hint := p.isPodWorthRequeuing(logger, pInfo, UnscheduledPodUpdate, oldPod, newPod) + queue := p.requeuePodViaQueueingHint(logger, pInfo, hint, UnscheduledPodUpdate.Label) + if queue != unschedulablePods { + logger.V(5).Info("Pod moved to an internal scheduling queue because the Pod is updated", "pod", klog.KObj(newPod), "event", PodUpdate, "queue", queue) + p.unschedulablePods.delete(usPodInfo.Pod, gated) + } + if queue == activeQ { + p.cond.Broadcast() + } + return nil + } if isPodUpdated(oldPod, newPod) { - gated := usPodInfo.Gated + if p.isPodBackingoff(usPodInfo) { if err := p.podBackoffQ.Add(pInfo); err != nil { return err } p.unschedulablePods.delete(usPodInfo.Pod, gated) logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", PodUpdate, "queue", backoffQ) - } else { - if added, err := p.addToActiveQ(logger, pInfo); !added { - return err - } - p.unschedulablePods.delete(usPodInfo.Pod, gated) - logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", BackoffComplete, "queue", activeQ) - p.cond.Broadcast() + return nil } - } else { - // Pod update didn't make it schedulable, keep it in the unschedulable queue. - p.unschedulablePods.addOrUpdate(pInfo) + + if added, err := p.addToActiveQ(logger, pInfo); !added { + return err + } + p.unschedulablePods.delete(usPodInfo.Pod, gated) + logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", BackoffComplete, "queue", activeQ) + p.cond.Broadcast() + return nil } + // Pod update didn't make it schedulable, keep it in the unschedulable queue. + p.unschedulablePods.addOrUpdate(pInfo) return nil } // If pod is not in any of the queues, we put it in the active queue. diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 1b2a29da921..c2ad3d8ce67 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -43,6 +43,7 @@ import ( podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/framework" + plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/schedulinggates" "k8s.io/kubernetes/pkg/scheduler/metrics" @@ -975,6 +976,23 @@ func TestPriorityQueue_Pop(t *testing.T) { func TestPriorityQueue_Update(t *testing.T) { c := testingclock.NewFakeClock(time.Now()) + queuePlugin := "queuePlugin" + skipPlugin := "skipPlugin" + queueingHintMap := QueueingHintMapPerProfile{ + "": { + UnscheduledPodUpdate: { + { + PluginName: queuePlugin, + QueueingHintFn: queueHintReturnQueue, + }, + { + PluginName: skipPlugin, + QueueingHintFn: queueHintReturnSkip, + }, + }, + }, + } + tests := []struct { name string wantQ string @@ -984,6 +1002,8 @@ func TestPriorityQueue_Update(t *testing.T) { // This function returns three values; // - oldPod/newPod: each test will call Update() with these oldPod and newPod. prepareFunc func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) + // schedulingHintsEnablement shows which value of QHint feature gate we test a test case with. + schedulingHintsEnablement []bool }{ { name: "add highPriorityPodInfo to activeQ", @@ -991,6 +1011,7 @@ func TestPriorityQueue_Update(t *testing.T) { prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { return nil, highPriorityPodInfo.Pod }, + schedulingHintsEnablement: []bool{false, true}, }, { name: "Update pod that didn't exist in the queue", @@ -1000,6 +1021,7 @@ func TestPriorityQueue_Update(t *testing.T) { updatedPod.Annotations["foo"] = "test" return medPriorityPodInfo.Pod, updatedPod }, + schedulingHintsEnablement: []bool{false, true}, }, { name: "Update highPriorityPodInfo and add a nominatedNodeName to it", @@ -1008,6 +1030,7 @@ func TestPriorityQueue_Update(t *testing.T) { prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { return highPriorityPodInfo.Pod, highPriNominatedPodInfo.Pod }, + schedulingHintsEnablement: []bool{false, true}, }, { name: "When updating a pod that is already in activeQ, the pod should remain in activeQ after Update()", @@ -1019,6 +1042,7 @@ func TestPriorityQueue_Update(t *testing.T) { } return highPriorityPodInfo.Pod, highPriorityPodInfo.Pod }, + schedulingHintsEnablement: []bool{false, true}, }, { name: "When updating a pod that is in backoff queue and is still backing off, it will be updated in backoff queue", @@ -1030,22 +1054,24 @@ func TestPriorityQueue_Update(t *testing.T) { } return podInfo.Pod, podInfo.Pod }, + schedulingHintsEnablement: []bool{false, true}, }, { name: "when updating a pod which is in unschedulable queue and is backing off, it will be moved to backoff queue", wantQ: backoffQ, prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { - q.unschedulablePods.addOrUpdate(q.newQueuedPodInfo(medPriorityPodInfo.Pod, "plugin")) + q.unschedulablePods.addOrUpdate(q.newQueuedPodInfo(medPriorityPodInfo.Pod, queuePlugin)) updatedPod := medPriorityPodInfo.Pod.DeepCopy() updatedPod.Annotations["foo"] = "test" return medPriorityPodInfo.Pod, updatedPod }, + schedulingHintsEnablement: []bool{false, true}, }, { name: "when updating a pod which is in unschedulable queue and is not backing off, it will be moved to active queue", wantQ: activeQ, prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { - q.unschedulablePods.addOrUpdate(q.newQueuedPodInfo(medPriorityPodInfo.Pod, "plugin")) + q.unschedulablePods.addOrUpdate(q.newQueuedPodInfo(medPriorityPodInfo.Pod, queuePlugin)) updatedPod := medPriorityPodInfo.Pod.DeepCopy() updatedPod.Annotations["foo"] = "test1" // Move clock by podInitialBackoffDuration, so that pods in the unschedulablePods would pass the backing off, @@ -1053,56 +1079,71 @@ func TestPriorityQueue_Update(t *testing.T) { c.Step(q.podInitialBackoffDuration) return medPriorityPodInfo.Pod, updatedPod }, + schedulingHintsEnablement: []bool{false, true}, + }, + { + name: "when updating a pod which is in unschedulable pods but the plugin returns skip, it will remain in unschedulablePods", + wantQ: unschedulablePods, + prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { + q.unschedulablePods.addOrUpdate(q.newQueuedPodInfo(medPriorityPodInfo.Pod, skipPlugin)) + updatedPod := medPriorityPodInfo.Pod.DeepCopy() + updatedPod.Annotations["foo"] = "test1" + return medPriorityPodInfo.Pod, updatedPod + }, + schedulingHintsEnablement: []bool{true}, }, } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - logger, ctx := ktesting.NewTestContext(t) - objs := []runtime.Object{highPriorityPodInfo.Pod, unschedulablePodInfo.Pod, medPriorityPodInfo.Pod} - ctx, cancel := context.WithCancel(ctx) - defer cancel() - q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs, WithClock(c)) + for _, qHintEnabled := range tt.schedulingHintsEnablement { + t.Run(fmt.Sprintf("%s, with queuehint(%v)", tt.name, qHintEnabled), func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, qHintEnabled) + logger, ctx := ktesting.NewTestContext(t) + objs := []runtime.Object{highPriorityPodInfo.Pod, unschedulablePodInfo.Pod, medPriorityPodInfo.Pod} + ctx, cancel := context.WithCancel(ctx) + defer cancel() + q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs, WithClock(c), WithQueueingHintMapPerProfile(queueingHintMap)) - oldPod, newPod := tt.prepareFunc(t, logger, q) + oldPod, newPod := tt.prepareFunc(t, logger, q) - if err := q.Update(logger, oldPod, newPod); err != nil { - t.Fatalf("unexpected error from Update: %v", err) - } - - var pInfo *framework.QueuedPodInfo - - // validate expected queue - if obj, exists, _ := q.podBackoffQ.Get(newQueuedPodInfoForLookup(newPod)); exists { - if tt.wantQ != backoffQ { - t.Errorf("expected pod %s not to be queued to backoffQ, but it was", newPod.Name) + if err := q.Update(logger, oldPod, newPod); err != nil { + t.Fatalf("unexpected error from Update: %v", err) } - pInfo = obj.(*framework.QueuedPodInfo) - } - if obj, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(newPod)); exists { - if tt.wantQ != activeQ { - t.Errorf("expected pod %s not to be queued to activeQ, but it was", newPod.Name) + var pInfo *framework.QueuedPodInfo + + // validate expected queue + if obj, exists, _ := q.podBackoffQ.Get(newQueuedPodInfoForLookup(newPod)); exists { + if tt.wantQ != backoffQ { + t.Errorf("expected pod %s not to be queued to backoffQ, but it was", newPod.Name) + } + pInfo = obj.(*framework.QueuedPodInfo) } - pInfo = obj.(*framework.QueuedPodInfo) - } - if pInfoFromUnsched := q.unschedulablePods.get(newPod); pInfoFromUnsched != nil { - if tt.wantQ != unschedulablePods { - t.Errorf("expected pod %s to not be queued to unschedulablePods, but it was", newPod.Name) + if obj, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(newPod)); exists { + if tt.wantQ != activeQ { + t.Errorf("expected pod %s not to be queued to activeQ, but it was", newPod.Name) + } + pInfo = obj.(*framework.QueuedPodInfo) } - pInfo = pInfoFromUnsched - } - if diff := cmp.Diff(newPod, pInfo.PodInfo.Pod); diff != "" { - t.Errorf("Unexpected updated pod diff (-want, +got): %s", diff) - } + if pInfoFromUnsched := q.unschedulablePods.get(newPod); pInfoFromUnsched != nil { + if tt.wantQ != unschedulablePods { + t.Errorf("expected pod %s to not be queued to unschedulablePods, but it was", newPod.Name) + } + pInfo = pInfoFromUnsched + } - if tt.wantAddedToNominated && len(q.nominator.nominatedPods) != 1 { - t.Errorf("Expected one item in nomindatePods map: %v", q.nominator) - } + if diff := cmp.Diff(newPod, pInfo.PodInfo.Pod); diff != "" { + t.Errorf("Unexpected updated pod diff (-want, +got): %s", diff) + } - }) + if tt.wantAddedToNominated && len(q.nominator.nominatedPods) != 1 { + t.Errorf("Expected one item in nomindatePods map: %v", q.nominator) + } + + }) + } } } @@ -3746,7 +3787,7 @@ func Test_isPodWorthRequeuing(t *testing.T) { func Test_queuedPodInfo_gatedSetUponCreationAndUnsetUponUpdate(t *testing.T) { logger, ctx := ktesting.NewTestContext(t) - plugin, _ := schedulinggates.New(ctx, nil, nil) + plugin, _ := schedulinggates.New(ctx, nil, nil, plfeature.Features{}) m := map[string][]framework.PreEnqueuePlugin{"": {plugin.(framework.PreEnqueuePlugin)}} q := NewTestQueue(ctx, newDefaultQueueSort(), WithPreEnqueuePluginMap(m)) diff --git a/test/integration/scheduler/plugins/plugins_test.go b/test/integration/scheduler/plugins/plugins_test.go index 88f9f7117a5..7f2c3d53fc0 100644 --- a/test/integration/scheduler/plugins/plugins_test.go +++ b/test/integration/scheduler/plugins/plugins_test.go @@ -34,9 +34,12 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" listersv1 "k8s.io/client-go/listers/core/v1" + featuregatetesting "k8s.io/component-base/featuregate/testing" configv1 "k8s.io/kube-scheduler/config/v1" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler" schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" @@ -2665,129 +2668,160 @@ func TestSchedulingGatesPluginEventsToRegister(t *testing.T) { } tests := []struct { - name string - enqueuePlugin framework.PreEnqueuePlugin - count int + name string + withEvents bool + count int + queueHintEnabled []bool + expectedScheduled []bool }{ { - name: "preEnqueue plugin without event registered", - enqueuePlugin: &SchedulingGatesPluginWOEvents{SchedulingGates: schedulinggates.SchedulingGates{}}, - count: 2, + name: "preEnqueue plugin without event registered", + withEvents: false, + count: 2, + // This test case doesn't expect that the pod is scheduled again after the pod is updated + // when queuehint is enabled, because it doesn't register any events in EventsToRegister. + queueHintEnabled: []bool{false, true}, + expectedScheduled: []bool{true, false}, }, { - name: "preEnqueue plugin with event registered", - enqueuePlugin: &SchedulingGatesPluginWithEvents{SchedulingGates: schedulinggates.SchedulingGates{}}, - count: 2, + name: "preEnqueue plugin with event registered", + withEvents: true, + count: 2, + queueHintEnabled: []bool{false, true}, + expectedScheduled: []bool{true, true}, }, } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - registry := frameworkruntime.Registry{ - tt.enqueuePlugin.Name(): newPlugin(tt.enqueuePlugin), - } + for i := 0; i < len(tt.queueHintEnabled); i++ { + queueHintEnabled := tt.queueHintEnabled[i] + expectedScheduled := tt.expectedScheduled[i] - // Setup plugins for testing. - cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{ - Profiles: []configv1.KubeSchedulerProfile{{ - SchedulerName: pointer.String(v1.DefaultSchedulerName), - Plugins: &configv1.Plugins{ - PreEnqueue: configv1.PluginSet{ - Enabled: []configv1.Plugin{ - {Name: tt.enqueuePlugin.Name()}, - }, - Disabled: []configv1.Plugin{ - {Name: "*"}, + t.Run(tt.name+fmt.Sprintf(" queueHint(%v)", queueHintEnabled), func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, queueHintEnabled) + + // new plugin every time to clear counts + var plugin framework.PreEnqueuePlugin + if tt.withEvents { + plugin = &SchedulingGatesPluginWithEvents{SchedulingGates: schedulinggates.SchedulingGates{}} + } else { + plugin = &SchedulingGatesPluginWOEvents{SchedulingGates: schedulinggates.SchedulingGates{}} + } + + registry := frameworkruntime.Registry{ + plugin.Name(): newPlugin(plugin), + } + + // Setup plugins for testing. + cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{ + Profiles: []configv1.KubeSchedulerProfile{{ + SchedulerName: pointer.String(v1.DefaultSchedulerName), + Plugins: &configv1.Plugins{ + PreEnqueue: configv1.PluginSet{ + Enabled: []configv1.Plugin{ + {Name: plugin.Name()}, + }, + Disabled: []configv1.Plugin{ + {Name: "*"}, + }, }, }, - }, - }}, + }}, + }) + + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, + scheduler.WithProfiles(cfg.Profiles...), + scheduler.WithFrameworkOutOfTreeRegistry(registry), + ) + defer teardown() + + // Create a pod with schedulingGates. + gatedPod := st.MakePod().Name("p").Namespace(testContext.NS.Name). + SchedulingGates([]string{"foo"}). + PodAffinity("kubernetes.io/hostname", &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}, st.PodAffinityWithRequiredReq). + Container("pause").Obj() + gatedPod, err := testutils.CreatePausePod(testCtx.ClientSet, gatedPod) + if err != nil { + t.Errorf("Error while creating a gated pod: %v", err) + return + } + + if err := testutils.WaitForPodSchedulingGated(testCtx.Ctx, testCtx.ClientSet, gatedPod, 10*time.Second); err != nil { + t.Errorf("Expected the pod to be gated, but got: %v", err) + return + } + if num(plugin) != 1 { + t.Errorf("Expected the preEnqueue plugin to be called once, but got %v", num(plugin)) + return + } + + // Create a best effort pod. + pausePod, err := testutils.CreatePausePod(testCtx.ClientSet, testutils.InitPausePod(&testutils.PausePodConfig{ + Name: "pause-pod", + Namespace: testCtx.NS.Name, + Labels: map[string]string{"foo": "bar"}, + })) + if err != nil { + t.Errorf("Error while creating a pod: %v", err) + return + } + + // Wait for the pod schedulabled. + if err := testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, pausePod, 10*time.Second); err != nil { + t.Errorf("Expected the pod to be schedulable, but got: %v", err) + return + } + + // Update the pod which will trigger the requeue logic if plugin registers the events. + pausePod, err = testCtx.ClientSet.CoreV1().Pods(pausePod.Namespace).Get(testCtx.Ctx, pausePod.Name, metav1.GetOptions{}) + if err != nil { + t.Errorf("Error while getting a pod: %v", err) + return + } + pausePod.Annotations = map[string]string{"foo": "bar"} + _, err = testCtx.ClientSet.CoreV1().Pods(pausePod.Namespace).Update(testCtx.Ctx, pausePod, metav1.UpdateOptions{}) + if err != nil { + t.Errorf("Error while updating a pod: %v", err) + return + } + + // Pod should still be unschedulable because scheduling gates still exist, theoretically, it's a waste rescheduling. + if err := testutils.WaitForPodSchedulingGated(testCtx.Ctx, testCtx.ClientSet, gatedPod, 10*time.Second); err != nil { + t.Errorf("Expected the pod to be gated, but got: %v", err) + return + } + if num(plugin) != tt.count { + t.Errorf("Expected the preEnqueue plugin to be called %v, but got %v", tt.count, num(plugin)) + return + } + + // Remove gated pod's scheduling gates. + gatedPod, err = testCtx.ClientSet.CoreV1().Pods(gatedPod.Namespace).Get(testCtx.Ctx, gatedPod.Name, metav1.GetOptions{}) + if err != nil { + t.Errorf("Error while getting a pod: %v", err) + return + } + gatedPod.Spec.SchedulingGates = nil + _, err = testCtx.ClientSet.CoreV1().Pods(gatedPod.Namespace).Update(testCtx.Ctx, gatedPod, metav1.UpdateOptions{}) + if err != nil { + t.Errorf("Error while updating a pod: %v", err) + return + } + + if expectedScheduled { + if err := testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, gatedPod, 10*time.Second); err != nil { + t.Errorf("Expected the pod to be schedulable, but got: %v", err) + } + return + } + // wait for some time to ensure that the schedulerQueue has completed processing the podUpdate event. + time.Sleep(time.Second) + // pod shouldn't be scheduled if we didn't register podUpdate event for schedulingGates plugin + if err := testutils.WaitForPodSchedulingGated(testCtx.Ctx, testCtx.ClientSet, gatedPod, 10*time.Second); err != nil { + t.Errorf("Expected the pod to be gated, but got: %v", err) + return + } }) - - testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, - scheduler.WithProfiles(cfg.Profiles...), - scheduler.WithFrameworkOutOfTreeRegistry(registry), - ) - defer teardown() - - // Create a pod with schedulingGates. - gatedPod := st.MakePod().Name("p").Namespace(testContext.NS.Name). - SchedulingGates([]string{"foo"}). - PodAffinity("kubernetes.io/hostname", &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}, st.PodAffinityWithRequiredReq). - Container("pause").Obj() - gatedPod, err := testutils.CreatePausePod(testCtx.ClientSet, gatedPod) - if err != nil { - t.Errorf("Error while creating a gated pod: %v", err) - return - } - - if err := testutils.WaitForPodSchedulingGated(testCtx.Ctx, testCtx.ClientSet, gatedPod, 10*time.Second); err != nil { - t.Errorf("Expected the pod to be gated, but got: %v", err) - return - } - if num(tt.enqueuePlugin) != 1 { - t.Errorf("Expected the preEnqueue plugin to be called once, but got %v", num(tt.enqueuePlugin)) - return - } - - // Create a best effort pod. - pausePod, err := testutils.CreatePausePod(testCtx.ClientSet, testutils.InitPausePod(&testutils.PausePodConfig{ - Name: "pause-pod", - Namespace: testCtx.NS.Name, - Labels: map[string]string{"foo": "bar"}, - })) - if err != nil { - t.Errorf("Error while creating a pod: %v", err) - return - } - - // Wait for the pod schedulabled. - if err := testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, pausePod, 10*time.Second); err != nil { - t.Errorf("Expected the pod to be schedulable, but got: %v", err) - return - } - - // Update the pod which will trigger the requeue logic if plugin registers the events. - pausePod, err = testCtx.ClientSet.CoreV1().Pods(pausePod.Namespace).Get(testCtx.Ctx, pausePod.Name, metav1.GetOptions{}) - if err != nil { - t.Errorf("Error while getting a pod: %v", err) - return - } - pausePod.Annotations = map[string]string{"foo": "bar"} - _, err = testCtx.ClientSet.CoreV1().Pods(pausePod.Namespace).Update(testCtx.Ctx, pausePod, metav1.UpdateOptions{}) - if err != nil { - t.Errorf("Error while updating a pod: %v", err) - return - } - - // Pod should still be unschedulable because scheduling gates still exist, theoretically, it's a waste rescheduling. - if err := testutils.WaitForPodSchedulingGated(testCtx.Ctx, testCtx.ClientSet, gatedPod, 10*time.Second); err != nil { - t.Errorf("Expected the pod to be gated, but got: %v", err) - return - } - if num(tt.enqueuePlugin) != tt.count { - t.Errorf("Expected the preEnqueue plugin to be called %v, but got %v", tt.count, num(tt.enqueuePlugin)) - return - } - - // Remove gated pod's scheduling gates. - gatedPod, err = testCtx.ClientSet.CoreV1().Pods(gatedPod.Namespace).Get(testCtx.Ctx, gatedPod.Name, metav1.GetOptions{}) - if err != nil { - t.Errorf("Error while getting a pod: %v", err) - return - } - gatedPod.Spec.SchedulingGates = nil - _, err = testCtx.ClientSet.CoreV1().Pods(gatedPod.Namespace).Update(testCtx.Ctx, gatedPod, metav1.UpdateOptions{}) - if err != nil { - t.Errorf("Error while updating a pod: %v", err) - return - } - - // Ungated pod should be schedulable now. - if err := testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, gatedPod, 10*time.Second); err != nil { - t.Errorf("Expected the pod to be schedulable, but got: %v", err) - return - } - }) + } } } diff --git a/test/integration/scheduler/queue_test.go b/test/integration/scheduler/queue_test.go index 46914c97eee..b32f12384ca 100644 --- a/test/integration/scheduler/queue_test.go +++ b/test/integration/scheduler/queue_test.go @@ -198,6 +198,8 @@ func TestCoreResourceEnqueue(t *testing.T) { triggerFn func(testCtx *testutils.TestContext) error // wantRequeuedPods is the map of Pods that are expected to be requeued after triggerFn. wantRequeuedPods sets.Set[string] + // enableSchedulingQueueHint indicates which feature gate value(s) the test case should run with. + enableSchedulingQueueHint []bool }{ { name: "Pod without a required toleration to a node isn't requeued to activeQ", @@ -218,7 +220,8 @@ func TestCoreResourceEnqueue(t *testing.T) { } return nil }, - wantRequeuedPods: sets.New("pod2"), + wantRequeuedPods: sets.New("pod2"), + enableSchedulingQueueHint: []bool{false, true}, }, { name: "Pod rejected by the PodAffinity plugin is requeued when a new Node is created and turned to ready", @@ -247,14 +250,90 @@ func TestCoreResourceEnqueue(t *testing.T) { } return nil }, - wantRequeuedPods: sets.New("pod2"), + wantRequeuedPods: sets.New("pod2"), + enableSchedulingQueueHint: []bool{false, true}, + }, + { + name: "Pod updated with toleration requeued to activeQ", + initialNode: st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Taints([]v1.Taint{{Key: "taint-key", Effect: v1.TaintEffectNoSchedule}}).Obj(), + pods: []*v1.Pod{ + // - Pod1 doesn't have the required toleration and will be rejected by the TaintToleration plugin. + st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Container("image").Obj(), + }, + triggerFn: func(testCtx *testutils.TestContext) error { + // Trigger a PodUpdate event by adding a toleration to Pod1. + // It makes Pod1 schedulable. + if _, err := testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).Update(testCtx.Ctx, st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Container("image").Toleration("taint-key").Obj(), metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("failed to update the pod: %w", err) + } + return nil + }, + wantRequeuedPods: sets.New("pod1"), + enableSchedulingQueueHint: []bool{false, true}, + }, + { + name: "Pod got resource scaled down requeued to activeQ", + initialNode: st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(), + pods: []*v1.Pod{ + // - Pod1 requests a large amount of CPU and will be rejected by the NodeResourcesFit plugin. + st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Obj(), + }, + triggerFn: func(testCtx *testutils.TestContext) error { + // Trigger a PodUpdate event by reducing cpu requested by pod1. + // It makes Pod1 schedulable. + if _, err := testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).Update(testCtx.Ctx, st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Container("image").Obj(), metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("failed to update the pod: %w", err) + } + return nil + }, + wantRequeuedPods: sets.New("pod1"), + enableSchedulingQueueHint: []bool{false, true}, + }, + { + name: "Updating pod condition doesn't retry scheduling if the Pod was rejected by TaintToleration", + initialNode: st.MakeNode().Name("fake-node").Taints([]v1.Taint{{Key: v1.TaintNodeNotReady, Effect: v1.TaintEffectNoSchedule}}).Obj(), + pods: []*v1.Pod{ + // - Pod1 doesn't have the required toleration and will be rejected by the TaintToleration plugin. + st.MakePod().Name("pod1").Container("image").Obj(), + }, + // Simulate a Pod update by directly calling `SchedulingQueue.Update` instead of actually updating a Pod + // because we don't have a way to confirm the scheduler has handled a Pod update event at the moment. + // TODO: actually update a Pod update and confirm the scheduler has handled a Pod update event with a metric. + // https://github.com/kubernetes/kubernetes/pull/122234#discussion_r1597456808 + triggerFn: func(testCtx *testutils.TestContext) (err error) { + // Trigger a Pod Condition update event. + // It will not make pod1 schedulable + var ( + oldPod *v1.Pod + newPod *v1.Pod + ) + if oldPod, err = testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).Get(testCtx.Ctx, "pod1", metav1.GetOptions{}); err != nil { + return fmt.Errorf("failed to get the pod: %w", err) + } + newPod = oldPod.DeepCopy() + newPod.Status.Conditions[0].Message = "injected message" + + if err := testCtx.Scheduler.SchedulingQueue.Update( + klog.FromContext(testCtx.Ctx), + oldPod, + newPod, + ); err != nil { + return fmt.Errorf("failed to update the pod: %w", err) + } + return nil + }, + wantRequeuedPods: sets.Set[string]{}, + // This behaviour is only true when enabling QHint + // because QHint of TaintToleration would decide to ignore a Pod update. + enableSchedulingQueueHint: []bool{true}, }, } - for _, featureEnabled := range []bool{false, true} { - for _, tt := range tests { + for _, tt := range tests { + for _, featureEnabled := range tt.enableSchedulingQueueHint { t.Run(fmt.Sprintf("%s [SchedulerQueueingHints enabled: %v]", tt.name, featureEnabled), func(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, featureEnabled) + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, true) // Use zero backoff seconds to bypass backoffQ. // It's intended to not start the scheduler's queue, and hence to @@ -271,7 +350,7 @@ func TestCoreResourceEnqueue(t *testing.T) { defer testCtx.Scheduler.SchedulingQueue.Close() cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx - // Create one Node with a taint. + // Create initialNode. if _, err := cs.CoreV1().Nodes().Create(ctx, tt.initialNode, metav1.CreateOptions{}); err != nil { t.Fatalf("Failed to create an initial Node %q: %v", tt.initialNode.Name, err) }