forget the pod when the reserve plugins fail

and move the metrics function calls before all other functions
This commit is contained in:
Yecheng Fu 2020-08-09 20:28:23 +08:00
parent 9b78bd5979
commit 1176ef9c7d
4 changed files with 157 additions and 18 deletions

View File

@ -422,9 +422,6 @@ func (sched *Scheduler) finishBinding(prof *profile.Profile, assumed *v1.Pod, ta
}
if err != nil {
klog.V(1).Infof("Failed to bind pod: %v/%v", assumed.Namespace, assumed.Name)
if err := sched.SchedulerCache.ForgetPod(assumed); err != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", err)
}
return
}
@ -504,22 +501,25 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
if err != nil {
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
// This is most probably result of a BUG in retrying logic.
// We report an error here so that pod scheduling can be retried.
// This relies on the fact that Error will check if the pod has been bound
// to a node and if so will not add it back to the unscheduled pods queue
// (otherwise this would cause an infinite loop).
sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, "")
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
return
}
// Run the Reserve method of reserve plugins.
if sts := prof.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, "")
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
// trigger un-reserve to clean up state associated with the reserved Pod
prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
}
sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, "")
return
}
@ -587,6 +587,9 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
// trigger un-reserve plugins to clean up state associated with the reserved Pod
prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if err := sched.SchedulerCache.ForgetPod(assumedPod); err != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", err)
}
sched.recordSchedulingFailure(prof, assumedPodInfo, fmt.Errorf("Binding rejected: %v", err), SchedulerError, "")
} else {
// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.

View File

@ -214,17 +214,57 @@ func TestSchedulerScheduleOne(t *testing.T) {
errB := errors.New("binder")
table := []struct {
name string
injectBindError error
sendPod *v1.Pod
algo core.ScheduleAlgorithm
expectErrorPod *v1.Pod
expectForgetPod *v1.Pod
expectAssumedPod *v1.Pod
expectError error
expectBind *v1.Binding
eventReason string
name string
injectBindError error
sendPod *v1.Pod
algo core.ScheduleAlgorithm
registerPluginFuncs []st.RegisterPluginFunc
expectErrorPod *v1.Pod
expectForgetPod *v1.Pod
expectAssumedPod *v1.Pod
expectError error
expectBind *v1.Binding
eventReason string
}{
{
name: "error reserve pod",
sendPod: podWithID("foo", ""),
algo: mockScheduler{core.ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
registerPluginFuncs: []st.RegisterPluginFunc{
st.RegisterReservePlugin("FakeReserve", st.NewFakeReservePlugin(framework.NewStatus(framework.Error, "reserve error"))),
},
expectErrorPod: podWithID("foo", testNode.Name),
expectForgetPod: podWithID("foo", testNode.Name),
expectAssumedPod: podWithID("foo", testNode.Name),
expectError: errors.New(`error while running Reserve in "FakeReserve" reserve plugin for pod "foo": reserve error`),
eventReason: "FailedScheduling",
},
{
name: "error permit pod",
sendPod: podWithID("foo", ""),
algo: mockScheduler{core.ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
registerPluginFuncs: []st.RegisterPluginFunc{
st.RegisterPermitPlugin("FakePermit", st.NewFakePermitPlugin(framework.NewStatus(framework.Error, "permit error"), time.Minute)),
},
expectErrorPod: podWithID("foo", testNode.Name),
expectForgetPod: podWithID("foo", testNode.Name),
expectAssumedPod: podWithID("foo", testNode.Name),
expectError: errors.New(`error while running "FakePermit" permit plugin for pod "foo": permit error`),
eventReason: "FailedScheduling",
},
{
name: "error prebind pod",
sendPod: podWithID("foo", ""),
algo: mockScheduler{core.ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
registerPluginFuncs: []st.RegisterPluginFunc{
st.RegisterPreBindPlugin("FakePreBind", st.NewFakePreBindPlugin(framework.NewStatus(framework.Error, "prebind error"))),
},
expectErrorPod: podWithID("foo", testNode.Name),
expectForgetPod: podWithID("foo", testNode.Name),
expectAssumedPod: podWithID("foo", testNode.Name),
expectError: errors.New(`error while running "FakePreBind" prebind plugin for pod "foo": prebind error`),
eventReason: "FailedScheduling",
},
{
name: "bind assumed pod scheduled",
sendPod: podWithID("foo", ""),
@ -252,7 +292,8 @@ func TestSchedulerScheduleOne(t *testing.T) {
expectErrorPod: podWithID("foo", testNode.Name),
expectForgetPod: podWithID("foo", testNode.Name),
eventReason: "FailedScheduling",
}, {
},
{
name: "deleting pod",
sendPod: deletingPod("foo"),
algo: mockScheduler{core.ScheduleResult{}, nil},
@ -296,10 +337,11 @@ func TestSchedulerScheduleOne(t *testing.T) {
gotBinding = action.(clienttesting.CreateAction).GetObject().(*v1.Binding)
return true, gotBinding, item.injectBindError
})
fwk, err := st.NewFramework([]st.RegisterPluginFunc{
registerPluginFuncs := append(item.registerPluginFuncs,
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}, frameworkruntime.WithClientSet(client))
)
fwk, err := st.NewFramework(registerPluginFuncs, frameworkruntime.WithClientSet(client))
if err != nil {
t.Fatal(err)
}

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"sync/atomic"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -152,3 +153,81 @@ func NewFakePreFilterPlugin(status *framework.Status) frameworkruntime.PluginFac
}, nil
}
}
// FakeReservePlugin is a test reserve plugin.
type FakeReservePlugin struct {
Status *framework.Status
}
// Name returns name of the plugin.
func (pl *FakeReservePlugin) Name() string {
return "FakeReserve"
}
// Reserve invoked at the Reserve extension point.
func (pl *FakeReservePlugin) Reserve(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ string) *framework.Status {
return pl.Status
}
// Unreserve invoked at the Unreserve extension point.
func (pl *FakeReservePlugin) Unreserve(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ string) {
}
// NewFakeReservePlugin initializes a fakeReservePlugin and returns it.
func NewFakeReservePlugin(status *framework.Status) frameworkruntime.PluginFactory {
return func(_ runtime.Object, _ framework.FrameworkHandle) (framework.Plugin, error) {
return &FakeReservePlugin{
Status: status,
}, nil
}
}
// FakePreBindPlugin is a test prebind plugin.
type FakePreBindPlugin struct {
Status *framework.Status
}
// Name returns name of the plugin.
func (pl *FakePreBindPlugin) Name() string {
return "FakePreBind"
}
// PreBind invoked at the PreBind extension point.
func (pl *FakePreBindPlugin) PreBind(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ string) *framework.Status {
return pl.Status
}
// NewFakePreBindPlugin initializes a fakePreBindPlugin and returns it.
func NewFakePreBindPlugin(status *framework.Status) frameworkruntime.PluginFactory {
return func(_ runtime.Object, _ framework.FrameworkHandle) (framework.Plugin, error) {
return &FakePreBindPlugin{
Status: status,
}, nil
}
}
// FakePermitPlugin is a test permit plugin.
type FakePermitPlugin struct {
Status *framework.Status
Timeout time.Duration
}
// Name returns name of the plugin.
func (pl *FakePermitPlugin) Name() string {
return "FakePermit"
}
// Permit invoked at the Permit extension point.
func (pl *FakePermitPlugin) Permit(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ string) (*framework.Status, time.Duration) {
return pl.Status, pl.Timeout
}
// NewFakePermitPlugin initializes a fakePermitPlugin and returns it.
func NewFakePermitPlugin(status *framework.Status, timeout time.Duration) frameworkruntime.PluginFactory {
return func(_ runtime.Object, _ framework.FrameworkHandle) (framework.Plugin, error) {
return &FakePermitPlugin{
Status: status,
Timeout: timeout,
}, nil
}
}

View File

@ -52,6 +52,21 @@ func RegisterFilterPlugin(pluginName string, pluginNewFunc runtime.PluginFactory
return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "Filter")
}
// RegisterReservePlugin returns a function to register a Reserve Plugin to a given registry.
func RegisterReservePlugin(pluginName string, pluginNewFunc runtime.PluginFactory) RegisterPluginFunc {
return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "Reserve")
}
// RegisterPermitPlugin returns a function to register a Permit Plugin to a given registry.
func RegisterPermitPlugin(pluginName string, pluginNewFunc runtime.PluginFactory) RegisterPluginFunc {
return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "Permit")
}
// RegisterPreBindPlugin returns a function to register a PreBind Plugin to a given registry.
func RegisterPreBindPlugin(pluginName string, pluginNewFunc runtime.PluginFactory) RegisterPluginFunc {
return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "PreBind")
}
// RegisterScorePlugin returns a function to register a Score Plugin to a given registry.
func RegisterScorePlugin(pluginName string, pluginNewFunc runtime.PluginFactory, weight int32) RegisterPluginFunc {
return RegisterPluginAsExtensionsWithWeight(pluginName, weight, pluginNewFunc, "Score")