From f77a4543d13ddba85403cdc9ec3487d7a404187a Mon Sep 17 00:00:00 2001 From: kerthcet Date: Fri, 3 Nov 2023 13:48:58 +0800 Subject: [PATCH] Unregister events in schedulingGates plugin Signed-off-by: kerthcet --- pkg/scheduler/framework/interface.go | 4 + .../schedulinggates/scheduling_gates.go | 8 +- pkg/scheduler/scheduler.go | 7 + pkg/scheduler/scheduler_test.go | 30 +++ .../scheduler/plugins/plugins_test.go | 195 ++++++++++++++++++ 5 files changed, 239 insertions(+), 5 deletions(-) diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index 48029ae7226..10017d91db8 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -370,6 +370,10 @@ type EnqueueExtensions interface { // and leveraged to build event handlers dynamically. // Note: the returned list needs to be static (not depend on configuration parameters); // otherwise it would lead to undefined behavior. + // + // The returned events could be nil to indicate that no events other than the pod's own update + // can make the pod re-schedulable. An example is SchedulingGates plugin. + // Appropriate implementation of this function will make Pod's re-scheduling accurate and performant. EventsToRegister() []ClusterEventWithHint } diff --git a/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go b/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go index 1ed5476076f..4d6c33e988e 100644 --- a/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go +++ b/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go @@ -53,12 +53,10 @@ func (pl *SchedulingGates) PreEnqueue(ctx context.Context, p *v1.Pod) *framework return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("waiting for scheduling gates: %v", gates)) } -// EventsToRegister returns the possible events that may make a Pod -// failed by this plugin schedulable. +// EventsToRegister returns nil here to indicate that schedulingGates plugin is not +// interested in any event but its own update. func (pl *SchedulingGates) EventsToRegister() []framework.ClusterEventWithHint { - return []framework.ClusterEventWithHint{ - {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}}, - } + return nil } // New initializes a new plugin and returns it. diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 11e80c99b5c..84b73c44954 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -373,6 +373,13 @@ func buildQueueingHintMap(es []framework.EnqueueExtensions) internalqueue.Queuei for _, e := range es { events := e.EventsToRegister() + // This will happen when plugin registers with empty events, it's usually the case a pod + // will become reschedulable only for self-update, e.g. schedulingGates plugin, the pod + // will enter into the activeQ via priorityQueue.Update(). + if len(events) == 0 { + continue + } + // Note: Rarely, a plugin implements EnqueueExtensions but returns nil. // We treat it as: the plugin is not interested in any event, and hence pod failed by that plugin // cannot be moved by any regular cluster event. diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 63481da74db..b346046610b 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -662,6 +662,7 @@ const ( emptyEventsToRegister = "emptyEventsToRegister" queueSort = "no-op-queue-sort-plugin" fakeBind = "bind-plugin" + emptyEventExtensions = "emptyEventExtensions" ) func Test_buildQueueingHintMap(t *testing.T) { @@ -729,6 +730,23 @@ func Test_buildQueueingHintMap(t *testing.T) { }, }, }, + { + name: "register plugin with empty event", + plugins: []framework.Plugin{&emptyEventPlugin{}}, + want: map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{}, + }, + { + name: "register plugins including emptyEventPlugin", + plugins: []framework.Plugin{&emptyEventPlugin{}, &fakeNodePlugin{}}, + want: map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{ + {Resource: framework.Pod, ActionType: framework.Add}: { + {PluginName: fakePod, QueueingHintFn: fakePodPluginQueueingFn}, + }, + {Resource: framework.Node, ActionType: framework.Add}: { + {PluginName: fakeNode, QueueingHintFn: fakeNodePluginQueueingFn}, + }, + }, + }, } for _, tt := range tests { @@ -1009,6 +1027,18 @@ func (pl *fakePodPlugin) EventsToRegister() []framework.ClusterEventWithHint { } } +type emptyEventPlugin struct{} + +func (*emptyEventPlugin) Name() string { return emptyEventExtensions } + +func (*emptyEventPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status { + return nil +} + +func (pl *emptyEventPlugin) EventsToRegister() []framework.ClusterEventWithHint { + return nil +} + // emptyEventsToRegisterPlugin implement interface framework.EnqueueExtensions, but returns nil from EventsToRegister. // This can simulate a plugin registered at scheduler setup, but does nothing // due to some disabled feature gate. diff --git a/test/integration/scheduler/plugins/plugins_test.go b/test/integration/scheduler/plugins/plugins_test.go index 31caf0d5927..791d1bdd725 100644 --- a/test/integration/scheduler/plugins/plugins_test.go +++ b/test/integration/scheduler/plugins/plugins_test.go @@ -45,6 +45,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/schedulinggates" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" st "k8s.io/kubernetes/pkg/scheduler/testing" schedulerutils "k8s.io/kubernetes/test/integration/scheduler" @@ -2588,3 +2589,197 @@ func TestActivatePods(t *testing.T) { t.Errorf("JobPlugin's pods activation logic is not called") } } + +var _ framework.PreEnqueuePlugin = &SchedulingGatesPluginWithEvents{} +var _ framework.EnqueueExtensions = &SchedulingGatesPluginWithEvents{} +var _ framework.PreEnqueuePlugin = &SchedulingGatesPluginWOEvents{} +var _ framework.EnqueueExtensions = &SchedulingGatesPluginWOEvents{} + +const ( + schedulingGatesPluginWithEvents = "scheduling-gates-with-events" + schedulingGatesPluginWOEvents = "scheduling-gates-without-events" +) + +type SchedulingGatesPluginWithEvents struct { + called int + schedulinggates.SchedulingGates +} + +func (pl *SchedulingGatesPluginWithEvents) Name() string { + return schedulingGatesPluginWithEvents +} + +func (pl *SchedulingGatesPluginWithEvents) PreEnqueue(ctx context.Context, p *v1.Pod) *framework.Status { + pl.called++ + return pl.SchedulingGates.PreEnqueue(ctx, p) +} + +func (pl *SchedulingGatesPluginWithEvents) EventsToRegister() []framework.ClusterEventWithHint { + return []framework.ClusterEventWithHint{ + {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}}, + } +} + +type SchedulingGatesPluginWOEvents struct { + called int + schedulinggates.SchedulingGates +} + +func (pl *SchedulingGatesPluginWOEvents) Name() string { + return schedulingGatesPluginWOEvents +} + +func (pl *SchedulingGatesPluginWOEvents) PreEnqueue(ctx context.Context, p *v1.Pod) *framework.Status { + pl.called++ + return pl.SchedulingGates.PreEnqueue(ctx, p) +} + +func (pl *SchedulingGatesPluginWOEvents) EventsToRegister() []framework.ClusterEventWithHint { + return nil +} + +// This test helps to verify registering nil events for schedulingGates plugin works as expected. +func TestSchedulingGatesPluginEventsToRegister(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodSchedulingReadiness, true)() + + testContext := testutils.InitTestAPIServer(t, "preenqueue-plugin", nil) + + num := func(pl framework.Plugin) int { + switch item := pl.(type) { + case *SchedulingGatesPluginWithEvents: + return item.called + case *SchedulingGatesPluginWOEvents: + return item.called + default: + t.Error("unsupported plugin") + } + return 0 + } + + tests := []struct { + name string + enqueuePlugin framework.PreEnqueuePlugin + count int + }{ + { + name: "preEnqueue plugin without event registered", + enqueuePlugin: &SchedulingGatesPluginWOEvents{SchedulingGates: schedulinggates.SchedulingGates{EnablePodSchedulingReadiness: true}}, + count: 2, + }, + { + name: "preEnqueue plugin with event registered", + enqueuePlugin: &SchedulingGatesPluginWithEvents{SchedulingGates: schedulinggates.SchedulingGates{EnablePodSchedulingReadiness: true}}, + count: 3, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + registry := frameworkruntime.Registry{ + tt.enqueuePlugin.Name(): newPlugin(tt.enqueuePlugin), + } + + // Setup plugins for testing. + cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{ + Profiles: []configv1.KubeSchedulerProfile{{ + SchedulerName: pointer.String(v1.DefaultSchedulerName), + Plugins: &configv1.Plugins{ + PreEnqueue: configv1.PluginSet{ + Enabled: []configv1.Plugin{ + {Name: tt.enqueuePlugin.Name()}, + }, + Disabled: []configv1.Plugin{ + {Name: "*"}, + }, + }, + }, + }}, + }) + + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, + scheduler.WithProfiles(cfg.Profiles...), + scheduler.WithFrameworkOutOfTreeRegistry(registry), + ) + defer teardown() + + // Create a pod with schedulingGates. + gatedPod := st.MakePod().Name("p").Namespace(testContext.NS.Name). + SchedulingGates([]string{"foo"}). + PodAffinity("kubernetes.io/hostname", &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}, st.PodAffinityWithRequiredReq). + Container("pause").Obj() + gatedPod, err := testutils.CreatePausePod(testCtx.ClientSet, gatedPod) + if err != nil { + t.Errorf("Error while creating a gated pod: %v", err) + return + } + + if err := testutils.WaitForPodSchedulingGated(testCtx.ClientSet, gatedPod, 10*time.Second); err != nil { + t.Errorf("Expected the pod to be gated, but got: %v", err) + return + } + if num(tt.enqueuePlugin) != 1 { + t.Errorf("Expected the preEnqueue plugin to be called once, but got %v", num(tt.enqueuePlugin)) + return + } + + // Create a best effort pod. + pausePod, err := testutils.CreatePausePod(testCtx.ClientSet, testutils.InitPausePod(&testutils.PausePodConfig{ + Name: "pause-pod", + Namespace: testCtx.NS.Name, + Labels: map[string]string{"foo": "bar"}, + })) + if err != nil { + t.Errorf("Error while creating a pod: %v", err) + return + } + + // Wait for the pod schedulabled. + if err := testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, pausePod, 10*time.Second); err != nil { + t.Errorf("Expected the pod to be schedulable, but got: %v", err) + return + } + + // Update the pod which will trigger the requeue logic if plugin registers the events. + pausePod, err = testCtx.ClientSet.CoreV1().Pods(pausePod.Namespace).Get(testCtx.Ctx, pausePod.Name, metav1.GetOptions{}) + if err != nil { + t.Errorf("Error while getting a pod: %v", err) + return + } + pausePod.Annotations = map[string]string{"foo": "bar"} + _, err = testCtx.ClientSet.CoreV1().Pods(pausePod.Namespace).Update(testCtx.Ctx, pausePod, metav1.UpdateOptions{}) + if err != nil { + t.Errorf("Error while updating a pod: %v", err) + return + } + + // Pod should still be unschedulable because scheduling gates still exist, theoretically, it's a waste rescheduling. + if err := testutils.WaitForPodSchedulingGated(testCtx.ClientSet, gatedPod, 10*time.Second); err != nil { + t.Errorf("Expected the pod to be gated, but got: %v", err) + return + } + if num(tt.enqueuePlugin) != tt.count { + t.Errorf("Expected the preEnqueue plugin to be called %v, but got %v", tt.count, num(tt.enqueuePlugin)) + return + } + + // Remove gated pod's scheduling gates. + gatedPod, err = testCtx.ClientSet.CoreV1().Pods(gatedPod.Namespace).Get(testCtx.Ctx, gatedPod.Name, metav1.GetOptions{}) + if err != nil { + t.Errorf("Error while getting a pod: %v", err) + return + } + gatedPod.Spec.SchedulingGates = nil + _, err = testCtx.ClientSet.CoreV1().Pods(gatedPod.Namespace).Update(testCtx.Ctx, gatedPod, metav1.UpdateOptions{}) + if err != nil { + t.Errorf("Error while updating a pod: %v", err) + return + } + + // Ungated pod should be schedulable now. + if err := testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, gatedPod, 10*time.Second); err != nil { + t.Errorf("Expected the pod to be schedulable, but got: %v", err) + return + } + }) + } +}