From 0e71facefebf7d3a646169cc5c4d4c85cff3a66c Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Wed, 29 Jul 2020 11:27:18 -0700 Subject: [PATCH] Hold Pod in cache until all other cleanup work is completed - Move "ForgetPod" after "RunReservePluginsUnreserve", so that the cache would hold the pod to avoid it's being retried simutaneously until Unreserve is completed. - Move "assume" ahead of "RunReservePluginsReserve". This is based on the fact that "ForgetPod" is the last step of failure path, so "assume" should be reversly treated as the first step. The current failure path is like this: assume -> reserve -> unreserve -> forgetPod -> recordingFailure - Make subtests of TestReservePluginUnreserve stateless --- pkg/scheduler/scheduler.go | 35 +++--- test/integration/scheduler/framework_test.go | 107 +++++++++---------- 2 files changed, 66 insertions(+), 76 deletions(-) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 6ec68fbb549..a701e4633a1 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -501,16 +501,6 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // This allows us to keep scheduling without waiting on binding to occur. assumedPodInfo := podInfo.DeepCopy() assumedPod := assumedPodInfo.Pod - - // Run the Reserve method of reserve plugins. - if sts := prof.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { - sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, "") - metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) - // trigger un-reserve to clean up state associated with the reserved Pod - prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) - return - } - // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost err = sched.assume(assumedPod, scheduleResult.SuggestedHost) if err != nil { @@ -521,7 +511,14 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // (otherwise this would cause an infinite loop). sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, "") metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) - // trigger un-reserve plugins to clean up state associated with the reserved Pod + return + } + + // Run the Reserve method of reserve plugins. + if sts := prof.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { + sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, "") + metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) + // trigger un-reserve to clean up state associated with the reserved Pod prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) return } @@ -537,11 +534,11 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) reason = SchedulerError } + // One of the plugins returned status different than success or wait. + prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil { klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } - // One of the plugins returned status different than success or wait. - prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) sched.recordSchedulingFailure(prof, assumedPodInfo, runPermitStatus.AsError(), reason, "") return } @@ -563,11 +560,11 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) reason = SchedulerError } + // trigger un-reserve plugins to clean up state associated with the reserved Pod + prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil { klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } - // trigger un-reserve plugins to clean up state associated with the reserved Pod - prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) sched.recordSchedulingFailure(prof, assumedPodInfo, waitOnPermitStatus.AsError(), reason, "") return } @@ -575,15 +572,13 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // Run "prebind" plugins. preBindStatus := prof.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if !preBindStatus.IsSuccess() { - var reason string metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) - reason = SchedulerError + // trigger un-reserve plugins to clean up state associated with the reserved Pod + prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil { klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } - // trigger un-reserve plugins to clean up state associated with the reserved Pod - prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) - sched.recordSchedulingFailure(prof, assumedPodInfo, preBindStatus.AsError(), reason, "") + sched.recordSchedulingFailure(prof, assumedPodInfo, preBindStatus.AsError(), SchedulerError, "") return } diff --git a/test/integration/scheduler/framework_test.go b/test/integration/scheduler/framework_test.go index ddbf6196041..9c0d29452d2 100644 --- a/test/integration/scheduler/framework_test.go +++ b/test/integration/scheduler/framework_test.go @@ -971,57 +971,6 @@ func TestPrebindPlugin(t *testing.T) { // tests that the order of invocation of Unreserve operation is executed in the // reverse order of invocation of the Reserve operation. func TestReservePluginUnreserve(t *testing.T) { - numReservePlugins := 3 - pluginInvokeEventChan := make(chan pluginInvokeEvent, numReservePlugins) - - preBindPlugin := &PreBindPlugin{ - failPreBind: true, - } - var reservePlugins []*ReservePlugin - for i := 0; i < numReservePlugins; i++ { - reservePlugins = append(reservePlugins, &ReservePlugin{ - name: fmt.Sprintf("%s-%d", reservePluginName, i), - pluginInvokeEventChan: pluginInvokeEventChan, - }) - } - - registry := frameworkruntime.Registry{ - // TODO(#92229): test more failure points that would trigger Unreserve in - // reserve plugins than just one pre-bind plugin. - preBindPluginName: newPlugin(preBindPlugin), - } - for _, pl := range reservePlugins { - registry[pl.Name()] = newPlugin(pl) - } - - // Setup initial reserve and prebind plugin for testing. - prof := schedulerconfig.KubeSchedulerProfile{ - SchedulerName: v1.DefaultSchedulerName, - Plugins: &schedulerconfig.Plugins{ - Reserve: &schedulerconfig.PluginSet{ - // filled by looping over reservePlugins - }, - PreBind: &schedulerconfig.PluginSet{ - Enabled: []schedulerconfig.Plugin{ - { - Name: preBindPluginName, - }, - }, - }, - }, - } - for _, pl := range reservePlugins { - prof.Plugins.Reserve.Enabled = append(prof.Plugins.Reserve.Enabled, schedulerconfig.Plugin{ - Name: pl.Name(), - }) - } - - // Create the master and the scheduler with the test plugin set. - testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestMaster(t, "reserve-plugin-unreserve", nil), 2, - scheduler.WithProfiles(prof), - scheduler.WithFrameworkOutOfTreeRegistry(registry)) - defer testutils.CleanupTest(t, testCtx) - tests := []struct { name string failReserve bool @@ -1044,6 +993,57 @@ func TestReservePluginUnreserve(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + numReservePlugins := 3 + pluginInvokeEventChan := make(chan pluginInvokeEvent, numReservePlugins) + + preBindPlugin := &PreBindPlugin{ + failPreBind: true, + } + var reservePlugins []*ReservePlugin + for i := 0; i < numReservePlugins; i++ { + reservePlugins = append(reservePlugins, &ReservePlugin{ + name: fmt.Sprintf("%s-%d", reservePluginName, i), + pluginInvokeEventChan: pluginInvokeEventChan, + }) + } + + registry := frameworkruntime.Registry{ + // TODO(#92229): test more failure points that would trigger Unreserve in + // reserve plugins than just one pre-bind plugin. + preBindPluginName: newPlugin(preBindPlugin), + } + for _, pl := range reservePlugins { + registry[pl.Name()] = newPlugin(pl) + } + + // Setup initial reserve and prebind plugin for testing. + prof := schedulerconfig.KubeSchedulerProfile{ + SchedulerName: v1.DefaultSchedulerName, + Plugins: &schedulerconfig.Plugins{ + Reserve: &schedulerconfig.PluginSet{ + // filled by looping over reservePlugins + }, + PreBind: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + { + Name: preBindPluginName, + }, + }, + }, + }, + } + for _, pl := range reservePlugins { + prof.Plugins.Reserve.Enabled = append(prof.Plugins.Reserve.Enabled, schedulerconfig.Plugin{ + Name: pl.Name(), + }) + } + + // Create the master and the scheduler with the test plugin set. + testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestMaster(t, "reserve-plugin-unreserve", nil), 2, + scheduler.WithProfiles(prof), + scheduler.WithFrameworkOutOfTreeRegistry(registry)) + defer testutils.CleanupTest(t, testCtx) + preBindPlugin.failPreBind = test.failPreBind if test.failReserve { reservePlugins[test.failReserveIndex].failReserve = true @@ -1080,11 +1080,6 @@ func TestReservePluginUnreserve(t *testing.T) { } } } - - preBindPlugin.reset() - for _, pl := range reservePlugins { - pl.reset() - } testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) }) }