Hold Pod in cache until all other cleanup work is completed

- Move "ForgetPod" after "RunReservePluginsUnreserve", so that the cache would hold the pod to
  avoid it's being retried simutaneously until Unreserve is completed.
- Move "assume" ahead of "RunReservePluginsReserve". This is based on the fact that "ForgetPod" is
  the last step of failure path, so "assume" should be reversly treated as the first step. The
  current failure path is like this:
  assume -> reserve -> unreserve -> forgetPod -> recordingFailure
- Make subtests of TestReservePluginUnreserve stateless
This commit is contained in:
Wei Huang 2020-07-29 11:27:18 -07:00
parent d3edcb7924
commit 0e71facefe
No known key found for this signature in database
GPG Key ID: BE5E9752F8B6E005
2 changed files with 66 additions and 76 deletions

View File

@ -501,16 +501,6 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// This allows us to keep scheduling without waiting on binding to occur. // This allows us to keep scheduling without waiting on binding to occur.
assumedPodInfo := podInfo.DeepCopy() assumedPodInfo := podInfo.DeepCopy()
assumedPod := assumedPodInfo.Pod assumedPod := assumedPodInfo.Pod
// Run the Reserve method of reserve plugins.
if sts := prof.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, "")
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
// trigger un-reserve to clean up state associated with the reserved Pod
prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
return
}
// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
err = sched.assume(assumedPod, scheduleResult.SuggestedHost) err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
if err != nil { if err != nil {
@ -521,7 +511,14 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// (otherwise this would cause an infinite loop). // (otherwise this would cause an infinite loop).
sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, "") sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, "")
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
// trigger un-reserve plugins to clean up state associated with the reserved Pod return
}
// Run the Reserve method of reserve plugins.
if sts := prof.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, "")
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
// trigger un-reserve to clean up state associated with the reserved Pod
prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
return return
} }
@ -537,11 +534,11 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
reason = SchedulerError reason = SchedulerError
} }
// One of the plugins returned status different than success or wait.
prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil { if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
} }
// One of the plugins returned status different than success or wait.
prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(prof, assumedPodInfo, runPermitStatus.AsError(), reason, "") sched.recordSchedulingFailure(prof, assumedPodInfo, runPermitStatus.AsError(), reason, "")
return return
} }
@ -563,11 +560,11 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
reason = SchedulerError reason = SchedulerError
} }
// trigger un-reserve plugins to clean up state associated with the reserved Pod
prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil { if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
} }
// trigger un-reserve plugins to clean up state associated with the reserved Pod
prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(prof, assumedPodInfo, waitOnPermitStatus.AsError(), reason, "") sched.recordSchedulingFailure(prof, assumedPodInfo, waitOnPermitStatus.AsError(), reason, "")
return return
} }
@ -575,15 +572,13 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// Run "prebind" plugins. // Run "prebind" plugins.
preBindStatus := prof.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) preBindStatus := prof.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if !preBindStatus.IsSuccess() { if !preBindStatus.IsSuccess() {
var reason string
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
reason = SchedulerError // trigger un-reserve plugins to clean up state associated with the reserved Pod
prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil { if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
} }
// trigger un-reserve plugins to clean up state associated with the reserved Pod sched.recordSchedulingFailure(prof, assumedPodInfo, preBindStatus.AsError(), SchedulerError, "")
prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(prof, assumedPodInfo, preBindStatus.AsError(), reason, "")
return return
} }

View File

@ -971,57 +971,6 @@ func TestPrebindPlugin(t *testing.T) {
// tests that the order of invocation of Unreserve operation is executed in the // tests that the order of invocation of Unreserve operation is executed in the
// reverse order of invocation of the Reserve operation. // reverse order of invocation of the Reserve operation.
func TestReservePluginUnreserve(t *testing.T) { func TestReservePluginUnreserve(t *testing.T) {
numReservePlugins := 3
pluginInvokeEventChan := make(chan pluginInvokeEvent, numReservePlugins)
preBindPlugin := &PreBindPlugin{
failPreBind: true,
}
var reservePlugins []*ReservePlugin
for i := 0; i < numReservePlugins; i++ {
reservePlugins = append(reservePlugins, &ReservePlugin{
name: fmt.Sprintf("%s-%d", reservePluginName, i),
pluginInvokeEventChan: pluginInvokeEventChan,
})
}
registry := frameworkruntime.Registry{
// TODO(#92229): test more failure points that would trigger Unreserve in
// reserve plugins than just one pre-bind plugin.
preBindPluginName: newPlugin(preBindPlugin),
}
for _, pl := range reservePlugins {
registry[pl.Name()] = newPlugin(pl)
}
// Setup initial reserve and prebind plugin for testing.
prof := schedulerconfig.KubeSchedulerProfile{
SchedulerName: v1.DefaultSchedulerName,
Plugins: &schedulerconfig.Plugins{
Reserve: &schedulerconfig.PluginSet{
// filled by looping over reservePlugins
},
PreBind: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: preBindPluginName,
},
},
},
},
}
for _, pl := range reservePlugins {
prof.Plugins.Reserve.Enabled = append(prof.Plugins.Reserve.Enabled, schedulerconfig.Plugin{
Name: pl.Name(),
})
}
// Create the master and the scheduler with the test plugin set.
testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestMaster(t, "reserve-plugin-unreserve", nil), 2,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer testutils.CleanupTest(t, testCtx)
tests := []struct { tests := []struct {
name string name string
failReserve bool failReserve bool
@ -1044,6 +993,57 @@ func TestReservePluginUnreserve(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
numReservePlugins := 3
pluginInvokeEventChan := make(chan pluginInvokeEvent, numReservePlugins)
preBindPlugin := &PreBindPlugin{
failPreBind: true,
}
var reservePlugins []*ReservePlugin
for i := 0; i < numReservePlugins; i++ {
reservePlugins = append(reservePlugins, &ReservePlugin{
name: fmt.Sprintf("%s-%d", reservePluginName, i),
pluginInvokeEventChan: pluginInvokeEventChan,
})
}
registry := frameworkruntime.Registry{
// TODO(#92229): test more failure points that would trigger Unreserve in
// reserve plugins than just one pre-bind plugin.
preBindPluginName: newPlugin(preBindPlugin),
}
for _, pl := range reservePlugins {
registry[pl.Name()] = newPlugin(pl)
}
// Setup initial reserve and prebind plugin for testing.
prof := schedulerconfig.KubeSchedulerProfile{
SchedulerName: v1.DefaultSchedulerName,
Plugins: &schedulerconfig.Plugins{
Reserve: &schedulerconfig.PluginSet{
// filled by looping over reservePlugins
},
PreBind: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: preBindPluginName,
},
},
},
},
}
for _, pl := range reservePlugins {
prof.Plugins.Reserve.Enabled = append(prof.Plugins.Reserve.Enabled, schedulerconfig.Plugin{
Name: pl.Name(),
})
}
// Create the master and the scheduler with the test plugin set.
testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestMaster(t, "reserve-plugin-unreserve", nil), 2,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer testutils.CleanupTest(t, testCtx)
preBindPlugin.failPreBind = test.failPreBind preBindPlugin.failPreBind = test.failPreBind
if test.failReserve { if test.failReserve {
reservePlugins[test.failReserveIndex].failReserve = true reservePlugins[test.failReserveIndex].failReserve = true
@ -1080,11 +1080,6 @@ func TestReservePluginUnreserve(t *testing.T) {
} }
} }
} }
preBindPlugin.reset()
for _, pl := range reservePlugins {
pl.reset()
}
testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod})
}) })
} }