scheduler: run Unreserve if Reserve fails

If a reserve plugin's Reserve method returns an error, there could be
previously allocated resources from successfully completed reserve
plugins that must be unallocated by the corresponding Unreserve
operation. Since Unreserve operations are idempotent, this patch runs
the Unreserve operation of ALL reserve plugins when a Reserve operation
fails.
This commit is contained in:
Adhityaa Chandrasekar 2020-06-22 18:38:29 +00:00
parent 66ff1ae536
commit 1b223b861a
4 changed files with 27 additions and 11 deletions

View File

@ -788,7 +788,8 @@ func (f *frameworkImpl) runPostBindPlugin(ctx context.Context, pl framework.Post
// RunReservePluginsReserve runs the Reserve method in the set of configured // RunReservePluginsReserve runs the Reserve method in the set of configured
// reserve plugins. If any of these plugins returns an error, it does not // reserve plugins. If any of these plugins returns an error, it does not
// continue running the remaining ones and returns the error. In such a case, // continue running the remaining ones and returns the error. In such a case,
// the pod will not be scheduled. // the pod will not be scheduled and the caller will be expected to call
// RunReservePluginsUnreserve.
func (f *frameworkImpl) RunReservePluginsReserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) { func (f *frameworkImpl) RunReservePluginsReserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
startTime := time.Now() startTime := time.Now()
defer func() { defer func() {

View File

@ -333,11 +333,14 @@ type ScorePlugin interface {
type ReservePlugin interface { type ReservePlugin interface {
Plugin Plugin
// Reserve is called by the scheduling framework when the scheduler cache is // Reserve is called by the scheduling framework when the scheduler cache is
// updated. // updated. If this method returns a failed Status, the scheduler will call
// the Unreserve method for all enabled ReservePlugins.
Reserve(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status Reserve(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status
// Unreserve is called by the scheduling framework when a reserved pod was // Unreserve is called by the scheduling framework when a reserved pod was
// rejected, an error occurred during reservation of subsequent plugins, or // rejected, an error occurred during reservation of subsequent plugins, or
// in a later phase. The Unreserve method implementation must be idempotent. // in a later phase. The Unreserve method implementation must be idempotent
// and may be called by the scheduler even if the corresponding Reserve
// method for the same plugin was not called.
Unreserve(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) Unreserve(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string)
} }

View File

@ -520,6 +520,8 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
if sts := prof.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { if sts := prof.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, "") sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, "")
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
// trigger un-reserve to clean up state associated with the reserved Pod
prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
return return
} }

View File

@ -269,6 +269,7 @@ func (rp *ReservePlugin) Unreserve(ctx context.Context, state *framework.CycleSt
func (rp *ReservePlugin) reset() { func (rp *ReservePlugin) reset() {
rp.numReserveCalled = 0 rp.numReserveCalled = 0
rp.numUnreserveCalled = 0 rp.numUnreserveCalled = 0
rp.failReserve = false
} }
// Name returns name of the plugin. // Name returns name of the plugin.
@ -1006,22 +1007,31 @@ func TestReservePluginUnreserve(t *testing.T) {
defer testutils.CleanupTest(t, testCtx) defer testutils.CleanupTest(t, testCtx)
tests := []struct { tests := []struct {
name string name string
preBindFail bool failReserve bool
failReserveIndex int
failPreBind bool
}{ }{
{ {
name: "fail preBind", name: "fail reserve",
preBindFail: true, failReserve: true,
failReserveIndex: 1,
}, },
{ {
name: "pass preBind", name: "fail preBind",
preBindFail: false, failPreBind: true,
},
{
name: "pass everything",
}, },
} }
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
preBindPlugin.failPreBind = test.preBindFail preBindPlugin.failPreBind = test.failPreBind
if test.failReserve {
reservePlugins[test.failReserveIndex].failReserve = true
}
// Create a best effort pod. // Create a best effort pod.
pod, err := createPausePod(testCtx.ClientSet, pod, err := createPausePod(testCtx.ClientSet,
initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
@ -1029,7 +1039,7 @@ func TestReservePluginUnreserve(t *testing.T) {
t.Errorf("Error while creating a test pod: %v", err) t.Errorf("Error while creating a test pod: %v", err)
} }
if test.preBindFail { if test.failPreBind || test.failReserve {
if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil {
t.Errorf("Expected a scheduling error, but didn't get it: %v", err) t.Errorf("Expected a scheduling error, but didn't get it: %v", err)
} }