diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index a701e4633a1..7181e67ee3c 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -422,9 +422,6 @@ func (sched *Scheduler) finishBinding(prof *profile.Profile, assumed *v1.Pod, ta } if err != nil { klog.V(1).Infof("Failed to bind pod: %v/%v", assumed.Namespace, assumed.Name) - if err := sched.SchedulerCache.ForgetPod(assumed); err != nil { - klog.Errorf("scheduler cache ForgetPod failed: %v", err) - } return } @@ -504,22 +501,25 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost err = sched.assume(assumedPod, scheduleResult.SuggestedHost) if err != nil { + metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) // This is most probably result of a BUG in retrying logic. // We report an error here so that pod scheduling can be retried. // This relies on the fact that Error will check if the pod has been bound // to a node and if so will not add it back to the unscheduled pods queue // (otherwise this would cause an infinite loop). sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, "") - metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) return } // Run the Reserve method of reserve plugins. if sts := prof.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { - sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, "") metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) // trigger un-reserve to clean up state associated with the reserved Pod prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) + if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil { + klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) + } + sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, "") return } @@ -587,6 +587,9 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) // trigger un-reserve plugins to clean up state associated with the reserved Pod prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) + if err := sched.SchedulerCache.ForgetPod(assumedPod); err != nil { + klog.Errorf("scheduler cache ForgetPod failed: %v", err) + } sched.recordSchedulingFailure(prof, assumedPodInfo, fmt.Errorf("Binding rejected: %v", err), SchedulerError, "") } else { // Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2. diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 914185bd879..f13fde0ef8c 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -214,17 +214,57 @@ func TestSchedulerScheduleOne(t *testing.T) { errB := errors.New("binder") table := []struct { - name string - injectBindError error - sendPod *v1.Pod - algo core.ScheduleAlgorithm - expectErrorPod *v1.Pod - expectForgetPod *v1.Pod - expectAssumedPod *v1.Pod - expectError error - expectBind *v1.Binding - eventReason string + name string + injectBindError error + sendPod *v1.Pod + algo core.ScheduleAlgorithm + registerPluginFuncs []st.RegisterPluginFunc + expectErrorPod *v1.Pod + expectForgetPod *v1.Pod + expectAssumedPod *v1.Pod + expectError error + expectBind *v1.Binding + eventReason string }{ + { + name: "error reserve pod", + sendPod: podWithID("foo", ""), + algo: mockScheduler{core.ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil}, + registerPluginFuncs: []st.RegisterPluginFunc{ + st.RegisterReservePlugin("FakeReserve", st.NewFakeReservePlugin(framework.NewStatus(framework.Error, "reserve error"))), + }, + expectErrorPod: podWithID("foo", testNode.Name), + expectForgetPod: podWithID("foo", testNode.Name), + expectAssumedPod: podWithID("foo", testNode.Name), + expectError: errors.New(`error while running Reserve in "FakeReserve" reserve plugin for pod "foo": reserve error`), + eventReason: "FailedScheduling", + }, + { + name: "error permit pod", + sendPod: podWithID("foo", ""), + algo: mockScheduler{core.ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil}, + registerPluginFuncs: []st.RegisterPluginFunc{ + st.RegisterPermitPlugin("FakePermit", st.NewFakePermitPlugin(framework.NewStatus(framework.Error, "permit error"), time.Minute)), + }, + expectErrorPod: podWithID("foo", testNode.Name), + expectForgetPod: podWithID("foo", testNode.Name), + expectAssumedPod: podWithID("foo", testNode.Name), + expectError: errors.New(`error while running "FakePermit" permit plugin for pod "foo": permit error`), + eventReason: "FailedScheduling", + }, + { + name: "error prebind pod", + sendPod: podWithID("foo", ""), + algo: mockScheduler{core.ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil}, + registerPluginFuncs: []st.RegisterPluginFunc{ + st.RegisterPreBindPlugin("FakePreBind", st.NewFakePreBindPlugin(framework.NewStatus(framework.Error, "prebind error"))), + }, + expectErrorPod: podWithID("foo", testNode.Name), + expectForgetPod: podWithID("foo", testNode.Name), + expectAssumedPod: podWithID("foo", testNode.Name), + expectError: errors.New(`error while running "FakePreBind" prebind plugin for pod "foo": prebind error`), + eventReason: "FailedScheduling", + }, { name: "bind assumed pod scheduled", sendPod: podWithID("foo", ""), @@ -252,7 +292,8 @@ func TestSchedulerScheduleOne(t *testing.T) { expectErrorPod: podWithID("foo", testNode.Name), expectForgetPod: podWithID("foo", testNode.Name), eventReason: "FailedScheduling", - }, { + }, + { name: "deleting pod", sendPod: deletingPod("foo"), algo: mockScheduler{core.ScheduleResult{}, nil}, @@ -296,10 +337,11 @@ func TestSchedulerScheduleOne(t *testing.T) { gotBinding = action.(clienttesting.CreateAction).GetObject().(*v1.Binding) return true, gotBinding, item.injectBindError }) - fwk, err := st.NewFramework([]st.RegisterPluginFunc{ + registerPluginFuncs := append(item.registerPluginFuncs, st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), - }, frameworkruntime.WithClientSet(client)) + ) + fwk, err := st.NewFramework(registerPluginFuncs, frameworkruntime.WithClientSet(client)) if err != nil { t.Fatal(err) } diff --git a/pkg/scheduler/testing/fake_plugins.go b/pkg/scheduler/testing/fake_plugins.go index 45be1500e4d..41427db7078 100644 --- a/pkg/scheduler/testing/fake_plugins.go +++ b/pkg/scheduler/testing/fake_plugins.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "sync/atomic" + "time" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" @@ -152,3 +153,81 @@ func NewFakePreFilterPlugin(status *framework.Status) frameworkruntime.PluginFac }, nil } } + +// FakeReservePlugin is a test reserve plugin. +type FakeReservePlugin struct { + Status *framework.Status +} + +// Name returns name of the plugin. +func (pl *FakeReservePlugin) Name() string { + return "FakeReserve" +} + +// Reserve invoked at the Reserve extension point. +func (pl *FakeReservePlugin) Reserve(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ string) *framework.Status { + return pl.Status +} + +// Unreserve invoked at the Unreserve extension point. +func (pl *FakeReservePlugin) Unreserve(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ string) { +} + +// NewFakeReservePlugin initializes a fakeReservePlugin and returns it. +func NewFakeReservePlugin(status *framework.Status) frameworkruntime.PluginFactory { + return func(_ runtime.Object, _ framework.FrameworkHandle) (framework.Plugin, error) { + return &FakeReservePlugin{ + Status: status, + }, nil + } +} + +// FakePreBindPlugin is a test prebind plugin. +type FakePreBindPlugin struct { + Status *framework.Status +} + +// Name returns name of the plugin. +func (pl *FakePreBindPlugin) Name() string { + return "FakePreBind" +} + +// PreBind invoked at the PreBind extension point. +func (pl *FakePreBindPlugin) PreBind(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ string) *framework.Status { + return pl.Status +} + +// NewFakePreBindPlugin initializes a fakePreBindPlugin and returns it. +func NewFakePreBindPlugin(status *framework.Status) frameworkruntime.PluginFactory { + return func(_ runtime.Object, _ framework.FrameworkHandle) (framework.Plugin, error) { + return &FakePreBindPlugin{ + Status: status, + }, nil + } +} + +// FakePermitPlugin is a test permit plugin. +type FakePermitPlugin struct { + Status *framework.Status + Timeout time.Duration +} + +// Name returns name of the plugin. +func (pl *FakePermitPlugin) Name() string { + return "FakePermit" +} + +// Permit invoked at the Permit extension point. +func (pl *FakePermitPlugin) Permit(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ string) (*framework.Status, time.Duration) { + return pl.Status, pl.Timeout +} + +// NewFakePermitPlugin initializes a fakePermitPlugin and returns it. +func NewFakePermitPlugin(status *framework.Status, timeout time.Duration) frameworkruntime.PluginFactory { + return func(_ runtime.Object, _ framework.FrameworkHandle) (framework.Plugin, error) { + return &FakePermitPlugin{ + Status: status, + Timeout: timeout, + }, nil + } +} diff --git a/pkg/scheduler/testing/framework_helpers.go b/pkg/scheduler/testing/framework_helpers.go index b7e8c4dd933..14c3cd9deee 100644 --- a/pkg/scheduler/testing/framework_helpers.go +++ b/pkg/scheduler/testing/framework_helpers.go @@ -52,6 +52,21 @@ func RegisterFilterPlugin(pluginName string, pluginNewFunc runtime.PluginFactory return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "Filter") } +// RegisterReservePlugin returns a function to register a Reserve Plugin to a given registry. +func RegisterReservePlugin(pluginName string, pluginNewFunc runtime.PluginFactory) RegisterPluginFunc { + return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "Reserve") +} + +// RegisterPermitPlugin returns a function to register a Permit Plugin to a given registry. +func RegisterPermitPlugin(pluginName string, pluginNewFunc runtime.PluginFactory) RegisterPluginFunc { + return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "Permit") +} + +// RegisterPreBindPlugin returns a function to register a PreBind Plugin to a given registry. +func RegisterPreBindPlugin(pluginName string, pluginNewFunc runtime.PluginFactory) RegisterPluginFunc { + return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "PreBind") +} + // RegisterScorePlugin returns a function to register a Score Plugin to a given registry. func RegisterScorePlugin(pluginName string, pluginNewFunc runtime.PluginFactory, weight int32) RegisterPluginFunc { return RegisterPluginAsExtensionsWithWeight(pluginName, weight, pluginNewFunc, "Score")