mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 10:51:29 +00:00
Merge pull request #93511 from Huang-Wei/flake-reserve-plugin
Hold Pod in cache until all other cleanup work is completed
This commit is contained in:
commit
db28b0239a
@ -501,16 +501,6 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
|||||||
// This allows us to keep scheduling without waiting on binding to occur.
|
// This allows us to keep scheduling without waiting on binding to occur.
|
||||||
assumedPodInfo := podInfo.DeepCopy()
|
assumedPodInfo := podInfo.DeepCopy()
|
||||||
assumedPod := assumedPodInfo.Pod
|
assumedPod := assumedPodInfo.Pod
|
||||||
|
|
||||||
// Run the Reserve method of reserve plugins.
|
|
||||||
if sts := prof.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
|
|
||||||
sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, "")
|
|
||||||
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
|
|
||||||
// trigger un-reserve to clean up state associated with the reserved Pod
|
|
||||||
prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
|
// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
|
||||||
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
|
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -521,7 +511,14 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
|||||||
// (otherwise this would cause an infinite loop).
|
// (otherwise this would cause an infinite loop).
|
||||||
sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, "")
|
sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, "")
|
||||||
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
|
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
|
||||||
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run the Reserve method of reserve plugins.
|
||||||
|
if sts := prof.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
|
||||||
|
sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, "")
|
||||||
|
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
|
||||||
|
// trigger un-reserve to clean up state associated with the reserved Pod
|
||||||
prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -537,11 +534,11 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
|||||||
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
|
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
|
||||||
reason = SchedulerError
|
reason = SchedulerError
|
||||||
}
|
}
|
||||||
|
// One of the plugins returned status different than success or wait.
|
||||||
|
prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
||||||
if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
|
if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
|
||||||
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
|
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
|
||||||
}
|
}
|
||||||
// One of the plugins returned status different than success or wait.
|
|
||||||
prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
|
||||||
sched.recordSchedulingFailure(prof, assumedPodInfo, runPermitStatus.AsError(), reason, "")
|
sched.recordSchedulingFailure(prof, assumedPodInfo, runPermitStatus.AsError(), reason, "")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -563,11 +560,11 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
|||||||
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
|
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
|
||||||
reason = SchedulerError
|
reason = SchedulerError
|
||||||
}
|
}
|
||||||
|
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
||||||
|
prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
||||||
if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
|
if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
|
||||||
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
|
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
|
||||||
}
|
}
|
||||||
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
|
||||||
prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
|
||||||
sched.recordSchedulingFailure(prof, assumedPodInfo, waitOnPermitStatus.AsError(), reason, "")
|
sched.recordSchedulingFailure(prof, assumedPodInfo, waitOnPermitStatus.AsError(), reason, "")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -575,15 +572,13 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
|||||||
// Run "prebind" plugins.
|
// Run "prebind" plugins.
|
||||||
preBindStatus := prof.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
preBindStatus := prof.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
||||||
if !preBindStatus.IsSuccess() {
|
if !preBindStatus.IsSuccess() {
|
||||||
var reason string
|
|
||||||
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
|
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
|
||||||
reason = SchedulerError
|
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
||||||
|
prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
||||||
if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
|
if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
|
||||||
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
|
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
|
||||||
}
|
}
|
||||||
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
sched.recordSchedulingFailure(prof, assumedPodInfo, preBindStatus.AsError(), SchedulerError, "")
|
||||||
prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
|
||||||
sched.recordSchedulingFailure(prof, assumedPodInfo, preBindStatus.AsError(), reason, "")
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -971,57 +971,6 @@ func TestPrebindPlugin(t *testing.T) {
|
|||||||
// tests that the order of invocation of Unreserve operation is executed in the
|
// tests that the order of invocation of Unreserve operation is executed in the
|
||||||
// reverse order of invocation of the Reserve operation.
|
// reverse order of invocation of the Reserve operation.
|
||||||
func TestReservePluginUnreserve(t *testing.T) {
|
func TestReservePluginUnreserve(t *testing.T) {
|
||||||
numReservePlugins := 3
|
|
||||||
pluginInvokeEventChan := make(chan pluginInvokeEvent, numReservePlugins)
|
|
||||||
|
|
||||||
preBindPlugin := &PreBindPlugin{
|
|
||||||
failPreBind: true,
|
|
||||||
}
|
|
||||||
var reservePlugins []*ReservePlugin
|
|
||||||
for i := 0; i < numReservePlugins; i++ {
|
|
||||||
reservePlugins = append(reservePlugins, &ReservePlugin{
|
|
||||||
name: fmt.Sprintf("%s-%d", reservePluginName, i),
|
|
||||||
pluginInvokeEventChan: pluginInvokeEventChan,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
registry := frameworkruntime.Registry{
|
|
||||||
// TODO(#92229): test more failure points that would trigger Unreserve in
|
|
||||||
// reserve plugins than just one pre-bind plugin.
|
|
||||||
preBindPluginName: newPlugin(preBindPlugin),
|
|
||||||
}
|
|
||||||
for _, pl := range reservePlugins {
|
|
||||||
registry[pl.Name()] = newPlugin(pl)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Setup initial reserve and prebind plugin for testing.
|
|
||||||
prof := schedulerconfig.KubeSchedulerProfile{
|
|
||||||
SchedulerName: v1.DefaultSchedulerName,
|
|
||||||
Plugins: &schedulerconfig.Plugins{
|
|
||||||
Reserve: &schedulerconfig.PluginSet{
|
|
||||||
// filled by looping over reservePlugins
|
|
||||||
},
|
|
||||||
PreBind: &schedulerconfig.PluginSet{
|
|
||||||
Enabled: []schedulerconfig.Plugin{
|
|
||||||
{
|
|
||||||
Name: preBindPluginName,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
for _, pl := range reservePlugins {
|
|
||||||
prof.Plugins.Reserve.Enabled = append(prof.Plugins.Reserve.Enabled, schedulerconfig.Plugin{
|
|
||||||
Name: pl.Name(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create the master and the scheduler with the test plugin set.
|
|
||||||
testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestMaster(t, "reserve-plugin-unreserve", nil), 2,
|
|
||||||
scheduler.WithProfiles(prof),
|
|
||||||
scheduler.WithFrameworkOutOfTreeRegistry(registry))
|
|
||||||
defer testutils.CleanupTest(t, testCtx)
|
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
failReserve bool
|
failReserve bool
|
||||||
@ -1044,6 +993,57 @@ func TestReservePluginUnreserve(t *testing.T) {
|
|||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
numReservePlugins := 3
|
||||||
|
pluginInvokeEventChan := make(chan pluginInvokeEvent, numReservePlugins)
|
||||||
|
|
||||||
|
preBindPlugin := &PreBindPlugin{
|
||||||
|
failPreBind: true,
|
||||||
|
}
|
||||||
|
var reservePlugins []*ReservePlugin
|
||||||
|
for i := 0; i < numReservePlugins; i++ {
|
||||||
|
reservePlugins = append(reservePlugins, &ReservePlugin{
|
||||||
|
name: fmt.Sprintf("%s-%d", reservePluginName, i),
|
||||||
|
pluginInvokeEventChan: pluginInvokeEventChan,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
registry := frameworkruntime.Registry{
|
||||||
|
// TODO(#92229): test more failure points that would trigger Unreserve in
|
||||||
|
// reserve plugins than just one pre-bind plugin.
|
||||||
|
preBindPluginName: newPlugin(preBindPlugin),
|
||||||
|
}
|
||||||
|
for _, pl := range reservePlugins {
|
||||||
|
registry[pl.Name()] = newPlugin(pl)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setup initial reserve and prebind plugin for testing.
|
||||||
|
prof := schedulerconfig.KubeSchedulerProfile{
|
||||||
|
SchedulerName: v1.DefaultSchedulerName,
|
||||||
|
Plugins: &schedulerconfig.Plugins{
|
||||||
|
Reserve: &schedulerconfig.PluginSet{
|
||||||
|
// filled by looping over reservePlugins
|
||||||
|
},
|
||||||
|
PreBind: &schedulerconfig.PluginSet{
|
||||||
|
Enabled: []schedulerconfig.Plugin{
|
||||||
|
{
|
||||||
|
Name: preBindPluginName,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, pl := range reservePlugins {
|
||||||
|
prof.Plugins.Reserve.Enabled = append(prof.Plugins.Reserve.Enabled, schedulerconfig.Plugin{
|
||||||
|
Name: pl.Name(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the master and the scheduler with the test plugin set.
|
||||||
|
testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestMaster(t, "reserve-plugin-unreserve", nil), 2,
|
||||||
|
scheduler.WithProfiles(prof),
|
||||||
|
scheduler.WithFrameworkOutOfTreeRegistry(registry))
|
||||||
|
defer testutils.CleanupTest(t, testCtx)
|
||||||
|
|
||||||
preBindPlugin.failPreBind = test.failPreBind
|
preBindPlugin.failPreBind = test.failPreBind
|
||||||
if test.failReserve {
|
if test.failReserve {
|
||||||
reservePlugins[test.failReserveIndex].failReserve = true
|
reservePlugins[test.failReserveIndex].failReserve = true
|
||||||
@ -1080,11 +1080,6 @@ func TestReservePluginUnreserve(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
preBindPlugin.reset()
|
|
||||||
for _, pl := range reservePlugins {
|
|
||||||
pl.reset()
|
|
||||||
}
|
|
||||||
testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod})
|
testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user