From 278a8376e178d1622076c3da5af7757da0f1217b Mon Sep 17 00:00:00 2001 From: kerthcet Date: Fri, 7 Jul 2023 10:35:59 +0800 Subject: [PATCH] Fix: fiterror in permit plugin not handled perfectly We only added failed plulgins, but actually this will not work unless we make the status with a fitError because we only copy the failured plugins to podInfo if it is a fitError Signed-off-by: kerthcet --- pkg/scheduler/framework/runtime/framework.go | 3 +- .../framework/runtime/framework_test.go | 1 - pkg/scheduler/framework/types.go | 1 + pkg/scheduler/framework/types_test.go | 11 ++ pkg/scheduler/schedule_one.go | 25 ++- .../scheduler/plugins/plugins_test.go | 139 +++----------- .../scheduler/rescheduling_test.go | 171 ++++++++++++++++++ test/integration/scheduler/util.go | 120 ++++++++++++ 8 files changed, 344 insertions(+), 127 deletions(-) create mode 100644 test/integration/scheduler/rescheduling_test.go diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 9986abf3850..88e31ebd3d0 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -1333,8 +1333,7 @@ func (f *frameworkImpl) RunPermitPlugins(ctx context.Context, state *framework.C if !status.IsSuccess() { if status.IsUnschedulable() { logger.V(4).Info("Pod rejected by plugin", "pod", klog.KObj(pod), "plugin", pl.Name(), "status", status.Message()) - status.SetFailedPlugin(pl.Name()) - return status + return status.WithFailedPlugin(pl.Name()) } if status.IsWait() { // Not allowed to be greater than maxTimeout. diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index a0e4cb36dab..f2c7966a671 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -2669,7 +2669,6 @@ func TestPermitPlugins(t *testing.T) { } status := f.RunPermitPlugins(ctx, nil, pod, "") - if !reflect.DeepEqual(status, tt.want) { t.Errorf("wrong status code. got %v, want %v", status, tt.want) } diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index 2d95fd1d54e..78572957f93 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -330,6 +330,7 @@ func (f *FitError) Error() string { if postFilterMsg != "" { reasonMsg += fmt.Sprintf(SeparatorFormat, postFilterMsg) } + return reasonMsg } diff --git a/pkg/scheduler/framework/types_test.go b/pkg/scheduler/framework/types_test.go index baec51bb76e..47fab9d8dfb 100644 --- a/pkg/scheduler/framework/types_test.go +++ b/pkg/scheduler/framework/types_test.go @@ -1447,6 +1447,17 @@ func TestFitError_Error(t *testing.T) { }, wantReasonMsg: "0/3 nodes are available: 1 Node(s) failed Filter plugin FalseFilter-2, 2 Node(s) failed Filter plugin FalseFilter-1. Error running PostFilter plugin FailedPostFilter.", }, + { + name: "failed to Permit on node", + numAllNodes: 1, + diagnosis: Diagnosis{ + NodeToStatusMap: NodeToStatusMap{ + // There should be only one node here. + "node1": NewStatus(Unschedulable, "Node failed Permit plugin Permit-1"), + }, + }, + wantReasonMsg: "0/1 nodes are available: 1 Node failed Permit plugin Permit-1.", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/scheduler/schedule_one.go b/pkg/scheduler/schedule_one.go index 5add3218256..1fcb36f2a9f 100644 --- a/pkg/scheduler/schedule_one.go +++ b/pkg/scheduler/schedule_one.go @@ -184,9 +184,7 @@ func (sched *Scheduler) schedulingCycle( // This relies on the fact that Error will check if the pod has been bound // to a node and if so will not add it back to the unscheduled pods queue // (otherwise this would cause an infinite loop). - return ScheduleResult{nominatingInfo: clearNominatedNode}, - assumedPodInfo, - framework.AsStatus(err) + return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, framework.AsStatus(err) } // Run the Reserve method of reserve plugins. @@ -197,9 +195,7 @@ func (sched *Scheduler) schedulingCycle( logger.Error(forgetErr, "Scheduler cache ForgetPod failed") } - return ScheduleResult{nominatingInfo: clearNominatedNode}, - assumedPodInfo, - sts + return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, sts } // Run "permit" plugins. @@ -211,9 +207,19 @@ func (sched *Scheduler) schedulingCycle( logger.Error(forgetErr, "Scheduler cache ForgetPod failed") } - return ScheduleResult{nominatingInfo: clearNominatedNode}, - assumedPodInfo, - runPermitStatus + if runPermitStatus.IsUnschedulable() { + fitErr := &framework.FitError{ + NumAllNodes: 1, + Pod: pod, + Diagnosis: framework.Diagnosis{ + NodeToStatusMap: framework.NodeToStatusMap{scheduleResult.SuggestedHost: runPermitStatus}, + UnschedulablePlugins: sets.New(runPermitStatus.FailedPlugin()), + }, + } + return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, framework.NewStatus(runPermitStatus.Code()).WithError(fitErr) + } + + return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, runPermitStatus } // At the end of a successful scheduling cycle, pop and move up Pods if needed. @@ -923,7 +929,6 @@ func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framewo errMsg := status.Message() if err == ErrNoNodesAvailable { - logger.V(2).Info("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod)) } else if fitError, ok := err.(*framework.FitError); ok { // Inject UnschedulablePlugins to PodInfo, which will be used later for moving Pods between queues efficiently. podInfo.UnschedulablePlugins = fitError.Diagnosis.UnschedulablePlugins diff --git a/test/integration/scheduler/plugins/plugins_test.go b/test/integration/scheduler/plugins/plugins_test.go index 94bf6d715c3..d0f8f5ac1ea 100644 --- a/test/integration/scheduler/plugins/plugins_test.go +++ b/test/integration/scheduler/plugins/plugins_test.go @@ -47,6 +47,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" st "k8s.io/kubernetes/pkg/scheduler/testing" + schedulerutils "k8s.io/kubernetes/test/integration/scheduler" testutils "k8s.io/kubernetes/test/integration/util" imageutils "k8s.io/kubernetes/test/utils/image" "k8s.io/utils/pointer" @@ -64,6 +65,9 @@ var ( waitForPodUnschedulable = testutils.WaitForPodUnschedulable waitForPodSchedulingGated = testutils.WaitForPodSchedulingGated waitForPodToScheduleWithTimeout = testutils.WaitForPodToScheduleWithTimeout + initRegistryAndConfig = func(t *testing.T, plugins ...framework.Plugin) (frameworkruntime.Registry, schedulerconfig.KubeSchedulerProfile) { + return schedulerutils.InitRegistryAndConfig(t, newPlugin, plugins...) + } ) type PreEnqueuePlugin struct { @@ -659,7 +663,7 @@ func TestPreFilterPlugin(t *testing.T) { preFilterPlugin := &PreFilterPlugin{} registry, prof := initRegistryAndConfig(t, preFilterPlugin) - testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer teardown() @@ -837,7 +841,7 @@ func TestPostFilterPlugin(t *testing.T) { }, }}}) - testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, int(tt.numNodes), + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, int(tt.numNodes), scheduler.WithProfiles(cfg.Profiles...), scheduler.WithFrameworkOutOfTreeRegistry(registry), ) @@ -904,7 +908,7 @@ func TestScorePlugin(t *testing.T) { scorePlugin := &ScorePlugin{} registry, prof := initRegistryAndConfig(t, scorePlugin) - testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 10, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 10, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer teardown() @@ -947,7 +951,7 @@ func TestNormalizeScorePlugin(t *testing.T) { scoreWithNormalizePlugin := &ScoreWithNormalizePlugin{} registry, prof := initRegistryAndConfig(t, scoreWithNormalizePlugin) - testCtx, _ := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "score-plugin", nil), 10, + testCtx, _ := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "score-plugin", nil), 10, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) @@ -995,7 +999,7 @@ func TestReservePluginReserve(t *testing.T) { reservePlugin := &ReservePlugin{} registry, prof := initRegistryAndConfig(t, reservePlugin) - testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer teardown() @@ -1110,7 +1114,7 @@ func TestPrebindPlugin(t *testing.T) { }, }) - testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, nodesNum, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, nodesNum, scheduler.WithProfiles(cfg.Profiles...), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer teardown() @@ -1265,7 +1269,7 @@ func TestUnReserveReservePlugins(t *testing.T) { } registry, prof := initRegistryAndConfig(t, pls...) - testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer teardown() @@ -1358,7 +1362,7 @@ func TestUnReservePermitPlugins(t *testing.T) { } registry, profile := initRegistryAndConfig(t, []framework.Plugin{test.plugin, reservePlugin}...) - testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, scheduler.WithProfiles(profile), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer teardown() @@ -1430,7 +1434,7 @@ func TestUnReservePreBindPlugins(t *testing.T) { } registry, profile := initRegistryAndConfig(t, []framework.Plugin{test.plugin, reservePlugin}...) - testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, scheduler.WithProfiles(profile), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer teardown() @@ -1501,7 +1505,7 @@ func TestUnReserveBindPlugins(t *testing.T) { test.plugin.client = testContext.ClientSet - testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, scheduler.WithProfiles(profile), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer teardown() @@ -1634,7 +1638,7 @@ func TestBindPlugin(t *testing.T) { }}, }) - testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, scheduler.WithProfiles(cfg.Profiles...), scheduler.WithFrameworkOutOfTreeRegistry(registry), ) @@ -1755,7 +1759,7 @@ func TestPostBindPlugin(t *testing.T) { } registry, prof := initRegistryAndConfig(t, preBindPlugin, postBindPlugin) - testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer teardown() @@ -1846,7 +1850,7 @@ func TestPermitPlugin(t *testing.T) { perPlugin := &PermitPlugin{name: permitPluginName} registry, prof := initRegistryAndConfig(t, perPlugin) - testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer teardown() @@ -1895,7 +1899,7 @@ func TestMultiplePermitPlugins(t *testing.T) { registry, prof := initRegistryAndConfig(t, perPlugin1, perPlugin2) // Create the API server and the scheduler with the test plugin set. - testCtx, _ := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "multi-permit-plugin", nil), 2, + testCtx, _ := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "multi-permit-plugin", nil), 2, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) @@ -1947,7 +1951,7 @@ func TestPermitPluginsCancelled(t *testing.T) { registry, prof := initRegistryAndConfig(t, perPlugin1, perPlugin2) // Create the API server and the scheduler with the test plugin set. - testCtx, _ := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "permit-plugins", nil), 2, + testCtx, _ := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "permit-plugins", nil), 2, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) @@ -2010,7 +2014,7 @@ func TestCoSchedulingWithPermitPlugin(t *testing.T) { permitPlugin := &PermitPlugin{name: permitPluginName} registry, prof := initRegistryAndConfig(t, permitPlugin) - testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer teardown() @@ -2092,7 +2096,7 @@ func TestFilterPlugin(t *testing.T) { filterPlugin := &FilterPlugin{} registry, prof := initRegistryAndConfig(t, filterPlugin) - testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 1, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 1, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer teardown() @@ -2148,7 +2152,7 @@ func TestPreScorePlugin(t *testing.T) { preScorePlugin := &PreScorePlugin{} registry, prof := initRegistryAndConfig(t, preScorePlugin) - testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer teardown() @@ -2209,7 +2213,7 @@ func TestPreEnqueuePlugin(t *testing.T) { preFilterPlugin := &PreFilterPlugin{} registry, prof := initRegistryAndConfig(t, enqueuePlugin, preFilterPlugin) - testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 1, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 1, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer teardown() @@ -2338,7 +2342,7 @@ func TestPreemptWithPermitPlugin(t *testing.T) { }, }) - testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 0, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 0, scheduler.WithProfiles(cfg.Profiles...), scheduler.WithFrameworkOutOfTreeRegistry(registry), ) @@ -2518,7 +2522,7 @@ func TestActivatePods(t *testing.T) { }) // Create the API server and the scheduler with the test plugin set. - testCtx, _ := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "job-plugin", nil), 1, + testCtx, _ := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "job-plugin", nil), 1, scheduler.WithProfiles(cfg.Profiles...), scheduler.WithFrameworkOutOfTreeRegistry(registry)) @@ -2563,96 +2567,3 @@ func TestActivatePods(t *testing.T) { t.Errorf("JobPlugin's pods activation logic is not called") } } - -// The returned shutdown func will delete created resources and scheduler, resources should be those -// that will affect the scheduling result, like nodes, pods, etc.. Namespaces should not be -// deleted here because it's created together with the apiserver, they should be deleted -// simultaneously or we'll have no namespace. -// This should only be called when you want to kill the scheduler alone, away from apiserver. -// For example, in scheduler integration tests, recreating apiserver is performance consuming, -// then shutdown the scheduler and recreate it between each test case is a better approach. -func initTestSchedulerForFrameworkTest(t *testing.T, testCtx *testutils.TestContext, nodeCount int, opts ...scheduler.Option) (*testutils.TestContext, testutils.ShutdownFunc) { - testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, 0, opts...) - testutils.SyncSchedulerInformerFactory(testCtx) - go testCtx.Scheduler.Run(testCtx.SchedulerCtx) - - if nodeCount > 0 { - if _, err := createAndWaitForNodesInCache(testCtx, "test-node", st.MakeNode(), nodeCount); err != nil { - // Make sure to cleanup the resources when initializing error. - testutils.CleanupTest(t, testCtx) - t.Fatal(err) - } - } - - teardown := func() { - err := testCtx.ClientSet.CoreV1().Nodes().DeleteCollection(testCtx.SchedulerCtx, *metav1.NewDeleteOptions(0), metav1.ListOptions{}) - if err != nil { - t.Errorf("error while deleting all nodes: %v", err) - } - err = testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).DeleteCollection(testCtx.SchedulerCtx, *metav1.NewDeleteOptions(0), metav1.ListOptions{}) - if err != nil { - t.Errorf("error while deleting pod: %v", err) - } - // Wait for all pods to be deleted, or will failed to create same name pods - // required in other test cases. - err = wait.PollUntilContextTimeout(testCtx.SchedulerCtx, time.Millisecond, wait.ForeverTestTimeout, true, - testutils.PodsCleanedUp(testCtx.SchedulerCtx, testCtx.ClientSet, testCtx.NS.Name)) - if err != nil { - t.Errorf("error while waiting for all pods to be deleted: %v", err) - } - // Kill the scheduler. - testCtx.SchedulerCloseFn() - } - - return testCtx, teardown -} - -// initRegistryAndConfig returns registry and plugins config based on give plugins. -func initRegistryAndConfig(t *testing.T, plugins ...framework.Plugin) (frameworkruntime.Registry, schedulerconfig.KubeSchedulerProfile) { - if len(plugins) == 0 { - return frameworkruntime.Registry{}, schedulerconfig.KubeSchedulerProfile{} - } - - registry := frameworkruntime.Registry{} - pls := &configv1.Plugins{} - - for _, p := range plugins { - registry.Register(p.Name(), newPlugin(p)) - plugin := configv1.Plugin{Name: p.Name()} - - switch p.(type) { - case *PreEnqueuePlugin: - pls.PreEnqueue.Enabled = append(pls.PreEnqueue.Enabled, plugin) - case *PreFilterPlugin: - pls.PreFilter.Enabled = append(pls.PreFilter.Enabled, plugin) - case *FilterPlugin: - pls.Filter.Enabled = append(pls.Filter.Enabled, plugin) - case *PreScorePlugin: - pls.PreScore.Enabled = append(pls.PreScore.Enabled, plugin) - case *ScorePlugin, *ScoreWithNormalizePlugin: - pls.Score.Enabled = append(pls.Score.Enabled, plugin) - case *ReservePlugin: - pls.Reserve.Enabled = append(pls.Reserve.Enabled, plugin) - case *PreBindPlugin: - pls.PreBind.Enabled = append(pls.PreBind.Enabled, plugin) - case *BindPlugin: - pls.Bind.Enabled = append(pls.Bind.Enabled, plugin) - // It's intentional to disable the DefaultBind plugin. Otherwise, DefaultBinder's failure would fail - // a pod's scheduling, as well as the test BindPlugin's execution. - pls.Bind.Disabled = []configv1.Plugin{{Name: defaultbinder.Name}} - case *PostBindPlugin: - pls.PostBind.Enabled = append(pls.PostBind.Enabled, plugin) - case *PermitPlugin: - pls.Permit.Enabled = append(pls.Permit.Enabled, plugin) - } - } - - versionedCfg := configv1.KubeSchedulerConfiguration{ - Profiles: []configv1.KubeSchedulerProfile{{ - SchedulerName: pointer.String(v1.DefaultSchedulerName), - Plugins: pls, - }}, - } - cfg := configtesting.V1ToInternalWithDefaults(t, versionedCfg) - return registry, cfg.Profiles[0] -} diff --git a/test/integration/scheduler/rescheduling_test.go b/test/integration/scheduler/rescheduling_test.go new file mode 100644 index 00000000000..5c48ce00a05 --- /dev/null +++ b/test/integration/scheduler/rescheduling_test.go @@ -0,0 +1,171 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "context" + "testing" + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/kubernetes/pkg/scheduler" + "k8s.io/kubernetes/pkg/scheduler/framework" + st "k8s.io/kubernetes/pkg/scheduler/testing" + testutils "k8s.io/kubernetes/test/integration/util" +) + +var _ framework.PermitPlugin = &PermitPlugin{} +var _ framework.EnqueueExtensions = &PermitPlugin{} + +type PermitPlugin struct { + name string + statusCode framework.Code + numPermitCalled int +} + +func (pp *PermitPlugin) Name() string { + return pp.name +} + +func (pp *PermitPlugin) Permit(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (*framework.Status, time.Duration) { + pp.numPermitCalled += 1 + + if pp.statusCode == framework.Error { + return framework.NewStatus(framework.Error, "failed to permit"), 0 + } + + if pp.statusCode == framework.Unschedulable { + if pp.numPermitCalled <= 1 { + return framework.NewStatus(framework.Unschedulable, "reject to permit"), 0 + } + } + + return nil, 0 +} + +func (pp *PermitPlugin) EventsToRegister() []framework.ClusterEventWithHint { + return []framework.ClusterEventWithHint{ + { + Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}, + QueueingHintFn: func(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { + return framework.QueueImmediately + }, + }, + } +} + +func TestReScheduling(t *testing.T) { + testContext := testutils.InitTestAPIServer(t, "permit-plugin", nil) + tests := []struct { + name string + plugins []framework.Plugin + action func() error + // The first time for pod scheduling, we make pod scheduled error or unschedulable on purpose. + // This is controlled by wantFirstSchedulingError. By default, pod is unschedulable. + wantFirstSchedulingError bool + + // wantScheduled/wantError means the final expected scheduling result. + wantScheduled bool + wantError bool + }{ + { + name: "Rescheduling pod rejected by Permit Plugin", + plugins: []framework.Plugin{ + &PermitPlugin{name: "permit", statusCode: framework.Unschedulable}, + }, + action: func() error { + _, err := createNode(testContext.ClientSet, st.MakeNode().Name("fake-node").Obj()) + return err + }, + wantScheduled: true, + }, + { + name: "Rescheduling pod rejected by Permit Plugin with unrelated event", + plugins: []framework.Plugin{ + &PermitPlugin{name: "permit", statusCode: framework.Unschedulable}, + }, + action: func() error { + _, err := createPausePod(testContext.ClientSet, + initPausePod(&testutils.PausePodConfig{Name: "test-pod-2", Namespace: testContext.NS.Name})) + return err + }, + wantScheduled: false, + }, + { + name: "Rescheduling pod failed by Permit Plugin", + plugins: []framework.Plugin{ + &PermitPlugin{name: "permit", statusCode: framework.Error}, + }, + action: func() error { + _, err := createNode(testContext.ClientSet, st.MakeNode().Name("fake-node").Obj()) + return err + }, + wantFirstSchedulingError: true, + wantError: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // Create a plugin registry for testing. Register only a permit plugin. + registry, prof := InitRegistryAndConfig(t, nil, test.plugins...) + + testCtx, teardown := InitTestSchedulerForFrameworkTest(t, testContext, 2, + scheduler.WithProfiles(prof), + scheduler.WithFrameworkOutOfTreeRegistry(registry)) + defer teardown() + + pod, err := createPausePod(testCtx.ClientSet, + initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) + if err != nil { + t.Errorf("Error while creating a test pod: %v", err) + } + + // The first time for scheduling, pod is error or unschedulable, controlled by wantFirstSchedulingError + if test.wantFirstSchedulingError { + if err = wait.Poll(10*time.Millisecond, 30*time.Second, testutils.PodSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { + t.Errorf("Expected a scheduling error, but got: %v", err) + } + } else { + if err = waitForPodUnschedulable(testCtx.ClientSet, pod); err != nil { + t.Errorf("Didn't expect the pod to be scheduled. error: %v", err) + } + } + + if test.action() != nil { + if err = test.action(); err != nil { + t.Errorf("Perform action() error: %v", err) + } + } + + if test.wantScheduled { + if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { + t.Errorf("Didn't expect the pod to be unschedulable. error: %v", err) + } + } else if test.wantError { + if err = wait.Poll(10*time.Millisecond, 30*time.Second, testutils.PodSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { + t.Errorf("Expected a scheduling error, but got: %v", err) + } + } else { + if err = waitForPodUnschedulable(testCtx.ClientSet, pod); err != nil { + t.Errorf("Didn't expect the pod to be scheduled. error: %v", err) + } + } + }) + } +} diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index e4e18833aa9..f5dde3e4f16 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -17,7 +17,23 @@ limitations under the License. package scheduler import ( + "testing" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + configv1 "k8s.io/kube-scheduler/config/v1" + "k8s.io/kubernetes/pkg/scheduler" + schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" + configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" + frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" + st "k8s.io/kubernetes/pkg/scheduler/testing" testutils "k8s.io/kubernetes/test/integration/util" + "k8s.io/utils/pointer" ) var ( @@ -35,3 +51,107 @@ var ( waitForPodUnschedulable = testutils.WaitForPodUnschedulable waitForReflection = testutils.WaitForReflection ) + +// The returned shutdown func will delete created resources and scheduler, resources should be those +// that will affect the scheduling result, like nodes, pods, etc.. Namespaces should not be +// deleted here because it's created together with the apiserver, they should be deleted +// simultaneously or we'll have no namespace. +// This should only be called when you want to kill the scheduler alone, away from apiserver. +// For example, in scheduler integration tests, recreating apiserver is performance consuming, +// then shutdown the scheduler and recreate it between each test case is a better approach. +func InitTestSchedulerForFrameworkTest(t *testing.T, testCtx *testutils.TestContext, nodeCount int, opts ...scheduler.Option) (*testutils.TestContext, testutils.ShutdownFunc) { + testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, 0, opts...) + testutils.SyncSchedulerInformerFactory(testCtx) + go testCtx.Scheduler.Run(testCtx.SchedulerCtx) + + if nodeCount > 0 { + if _, err := testutils.CreateAndWaitForNodesInCache(testCtx, "test-node", st.MakeNode(), nodeCount); err != nil { + // Make sure to cleanup the resources when initializing error. + testutils.CleanupTest(t, testCtx) + t.Fatal(err) + } + } + + teardown := func() { + err := testCtx.ClientSet.CoreV1().Nodes().DeleteCollection(testCtx.SchedulerCtx, *metav1.NewDeleteOptions(0), metav1.ListOptions{}) + if err != nil { + t.Errorf("error while deleting all nodes: %v", err) + } + err = testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).DeleteCollection(testCtx.SchedulerCtx, *metav1.NewDeleteOptions(0), metav1.ListOptions{}) + if err != nil { + t.Errorf("error while deleting pod: %v", err) + } + // Wait for all pods to be deleted, or will failed to create same name pods + // required in other test cases. + err = wait.PollUntilContextTimeout(testCtx.SchedulerCtx, time.Millisecond, wait.ForeverTestTimeout, true, + testutils.PodsCleanedUp(testCtx.SchedulerCtx, testCtx.ClientSet, testCtx.NS.Name)) + if err != nil { + t.Errorf("error while waiting for all pods to be deleted: %v", err) + } + // Kill the scheduler. + testCtx.SchedulerCloseFn() + } + + return testCtx, teardown +} + +// NewPlugin returns a plugin factory with specified Plugin. +func NewPlugin(plugin framework.Plugin) frameworkruntime.PluginFactory { + return func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { + return plugin, nil + } +} + +// InitRegistryAndConfig returns registry and plugins config based on give plugins. +func InitRegistryAndConfig(t *testing.T, factory func(plugin framework.Plugin) frameworkruntime.PluginFactory, plugins ...framework.Plugin) (frameworkruntime.Registry, schedulerconfig.KubeSchedulerProfile) { + if len(plugins) == 0 { + return frameworkruntime.Registry{}, schedulerconfig.KubeSchedulerProfile{} + } + + if factory == nil { + factory = NewPlugin + } + + registry := frameworkruntime.Registry{} + pls := &configv1.Plugins{} + + for _, p := range plugins { + registry.Register(p.Name(), factory(p)) + plugin := configv1.Plugin{Name: p.Name()} + + switch p.(type) { + case framework.PreEnqueuePlugin: + pls.PreEnqueue.Enabled = append(pls.PreEnqueue.Enabled, plugin) + case framework.PreFilterPlugin: + pls.PreFilter.Enabled = append(pls.PreFilter.Enabled, plugin) + case framework.FilterPlugin: + pls.Filter.Enabled = append(pls.Filter.Enabled, plugin) + case framework.PreScorePlugin: + pls.PreScore.Enabled = append(pls.PreScore.Enabled, plugin) + case framework.ScorePlugin: + pls.Score.Enabled = append(pls.Score.Enabled, plugin) + case framework.ReservePlugin: + pls.Reserve.Enabled = append(pls.Reserve.Enabled, plugin) + case framework.PreBindPlugin: + pls.PreBind.Enabled = append(pls.PreBind.Enabled, plugin) + case framework.BindPlugin: + pls.Bind.Enabled = append(pls.Bind.Enabled, plugin) + // It's intentional to disable the DefaultBind plugin. Otherwise, DefaultBinder's failure would fail + // a pod's scheduling, as well as the test BindPlugin's execution. + pls.Bind.Disabled = []configv1.Plugin{{Name: defaultbinder.Name}} + case framework.PostBindPlugin: + pls.PostBind.Enabled = append(pls.PostBind.Enabled, plugin) + case framework.PermitPlugin: + pls.Permit.Enabled = append(pls.Permit.Enabled, plugin) + } + } + + versionedCfg := configv1.KubeSchedulerConfiguration{ + Profiles: []configv1.KubeSchedulerProfile{{ + SchedulerName: pointer.String(v1.DefaultSchedulerName), + Plugins: pls, + }}, + } + cfg := configtesting.V1ToInternalWithDefaults(t, versionedCfg) + return registry, cfg.Profiles[0] +}