mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #93831 from cofyc/fix93830
scheduler: forget the pod when the reserve plugins fail
This commit is contained in:
commit
343817ef93
@ -422,9 +422,6 @@ func (sched *Scheduler) finishBinding(prof *profile.Profile, assumed *v1.Pod, ta
|
||||
}
|
||||
if err != nil {
|
||||
klog.V(1).Infof("Failed to bind pod: %v/%v", assumed.Namespace, assumed.Name)
|
||||
if err := sched.SchedulerCache.ForgetPod(assumed); err != nil {
|
||||
klog.Errorf("scheduler cache ForgetPod failed: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@ -504,22 +501,25 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
||||
// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
|
||||
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
|
||||
if err != nil {
|
||||
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
|
||||
// This is most probably result of a BUG in retrying logic.
|
||||
// We report an error here so that pod scheduling can be retried.
|
||||
// This relies on the fact that Error will check if the pod has been bound
|
||||
// to a node and if so will not add it back to the unscheduled pods queue
|
||||
// (otherwise this would cause an infinite loop).
|
||||
sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, "")
|
||||
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
|
||||
return
|
||||
}
|
||||
|
||||
// Run the Reserve method of reserve plugins.
|
||||
if sts := prof.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
|
||||
sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, "")
|
||||
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
|
||||
// trigger un-reserve to clean up state associated with the reserved Pod
|
||||
prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
||||
if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
|
||||
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
|
||||
}
|
||||
sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, "")
|
||||
return
|
||||
}
|
||||
|
||||
@ -587,6 +587,9 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
||||
metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
|
||||
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
||||
prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
||||
if err := sched.SchedulerCache.ForgetPod(assumedPod); err != nil {
|
||||
klog.Errorf("scheduler cache ForgetPod failed: %v", err)
|
||||
}
|
||||
sched.recordSchedulingFailure(prof, assumedPodInfo, fmt.Errorf("Binding rejected: %v", err), SchedulerError, "")
|
||||
} else {
|
||||
// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
|
||||
|
@ -214,17 +214,57 @@ func TestSchedulerScheduleOne(t *testing.T) {
|
||||
errB := errors.New("binder")
|
||||
|
||||
table := []struct {
|
||||
name string
|
||||
injectBindError error
|
||||
sendPod *v1.Pod
|
||||
algo core.ScheduleAlgorithm
|
||||
expectErrorPod *v1.Pod
|
||||
expectForgetPod *v1.Pod
|
||||
expectAssumedPod *v1.Pod
|
||||
expectError error
|
||||
expectBind *v1.Binding
|
||||
eventReason string
|
||||
name string
|
||||
injectBindError error
|
||||
sendPod *v1.Pod
|
||||
algo core.ScheduleAlgorithm
|
||||
registerPluginFuncs []st.RegisterPluginFunc
|
||||
expectErrorPod *v1.Pod
|
||||
expectForgetPod *v1.Pod
|
||||
expectAssumedPod *v1.Pod
|
||||
expectError error
|
||||
expectBind *v1.Binding
|
||||
eventReason string
|
||||
}{
|
||||
{
|
||||
name: "error reserve pod",
|
||||
sendPod: podWithID("foo", ""),
|
||||
algo: mockScheduler{core.ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
|
||||
registerPluginFuncs: []st.RegisterPluginFunc{
|
||||
st.RegisterReservePlugin("FakeReserve", st.NewFakeReservePlugin(framework.NewStatus(framework.Error, "reserve error"))),
|
||||
},
|
||||
expectErrorPod: podWithID("foo", testNode.Name),
|
||||
expectForgetPod: podWithID("foo", testNode.Name),
|
||||
expectAssumedPod: podWithID("foo", testNode.Name),
|
||||
expectError: errors.New(`error while running Reserve in "FakeReserve" reserve plugin for pod "foo": reserve error`),
|
||||
eventReason: "FailedScheduling",
|
||||
},
|
||||
{
|
||||
name: "error permit pod",
|
||||
sendPod: podWithID("foo", ""),
|
||||
algo: mockScheduler{core.ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
|
||||
registerPluginFuncs: []st.RegisterPluginFunc{
|
||||
st.RegisterPermitPlugin("FakePermit", st.NewFakePermitPlugin(framework.NewStatus(framework.Error, "permit error"), time.Minute)),
|
||||
},
|
||||
expectErrorPod: podWithID("foo", testNode.Name),
|
||||
expectForgetPod: podWithID("foo", testNode.Name),
|
||||
expectAssumedPod: podWithID("foo", testNode.Name),
|
||||
expectError: errors.New(`error while running "FakePermit" permit plugin for pod "foo": permit error`),
|
||||
eventReason: "FailedScheduling",
|
||||
},
|
||||
{
|
||||
name: "error prebind pod",
|
||||
sendPod: podWithID("foo", ""),
|
||||
algo: mockScheduler{core.ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
|
||||
registerPluginFuncs: []st.RegisterPluginFunc{
|
||||
st.RegisterPreBindPlugin("FakePreBind", st.NewFakePreBindPlugin(framework.NewStatus(framework.Error, "prebind error"))),
|
||||
},
|
||||
expectErrorPod: podWithID("foo", testNode.Name),
|
||||
expectForgetPod: podWithID("foo", testNode.Name),
|
||||
expectAssumedPod: podWithID("foo", testNode.Name),
|
||||
expectError: errors.New(`error while running "FakePreBind" prebind plugin for pod "foo": prebind error`),
|
||||
eventReason: "FailedScheduling",
|
||||
},
|
||||
{
|
||||
name: "bind assumed pod scheduled",
|
||||
sendPod: podWithID("foo", ""),
|
||||
@ -252,7 +292,8 @@ func TestSchedulerScheduleOne(t *testing.T) {
|
||||
expectErrorPod: podWithID("foo", testNode.Name),
|
||||
expectForgetPod: podWithID("foo", testNode.Name),
|
||||
eventReason: "FailedScheduling",
|
||||
}, {
|
||||
},
|
||||
{
|
||||
name: "deleting pod",
|
||||
sendPod: deletingPod("foo"),
|
||||
algo: mockScheduler{core.ScheduleResult{}, nil},
|
||||
@ -296,10 +337,11 @@ func TestSchedulerScheduleOne(t *testing.T) {
|
||||
gotBinding = action.(clienttesting.CreateAction).GetObject().(*v1.Binding)
|
||||
return true, gotBinding, item.injectBindError
|
||||
})
|
||||
fwk, err := st.NewFramework([]st.RegisterPluginFunc{
|
||||
registerPluginFuncs := append(item.registerPluginFuncs,
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
}, frameworkruntime.WithClientSet(client))
|
||||
)
|
||||
fwk, err := st.NewFramework(registerPluginFuncs, frameworkruntime.WithClientSet(client))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
@ -152,3 +153,81 @@ func NewFakePreFilterPlugin(status *framework.Status) frameworkruntime.PluginFac
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
// FakeReservePlugin is a test reserve plugin.
|
||||
type FakeReservePlugin struct {
|
||||
Status *framework.Status
|
||||
}
|
||||
|
||||
// Name returns name of the plugin.
|
||||
func (pl *FakeReservePlugin) Name() string {
|
||||
return "FakeReserve"
|
||||
}
|
||||
|
||||
// Reserve invoked at the Reserve extension point.
|
||||
func (pl *FakeReservePlugin) Reserve(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ string) *framework.Status {
|
||||
return pl.Status
|
||||
}
|
||||
|
||||
// Unreserve invoked at the Unreserve extension point.
|
||||
func (pl *FakeReservePlugin) Unreserve(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ string) {
|
||||
}
|
||||
|
||||
// NewFakeReservePlugin initializes a fakeReservePlugin and returns it.
|
||||
func NewFakeReservePlugin(status *framework.Status) frameworkruntime.PluginFactory {
|
||||
return func(_ runtime.Object, _ framework.FrameworkHandle) (framework.Plugin, error) {
|
||||
return &FakeReservePlugin{
|
||||
Status: status,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
// FakePreBindPlugin is a test prebind plugin.
|
||||
type FakePreBindPlugin struct {
|
||||
Status *framework.Status
|
||||
}
|
||||
|
||||
// Name returns name of the plugin.
|
||||
func (pl *FakePreBindPlugin) Name() string {
|
||||
return "FakePreBind"
|
||||
}
|
||||
|
||||
// PreBind invoked at the PreBind extension point.
|
||||
func (pl *FakePreBindPlugin) PreBind(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ string) *framework.Status {
|
||||
return pl.Status
|
||||
}
|
||||
|
||||
// NewFakePreBindPlugin initializes a fakePreBindPlugin and returns it.
|
||||
func NewFakePreBindPlugin(status *framework.Status) frameworkruntime.PluginFactory {
|
||||
return func(_ runtime.Object, _ framework.FrameworkHandle) (framework.Plugin, error) {
|
||||
return &FakePreBindPlugin{
|
||||
Status: status,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
// FakePermitPlugin is a test permit plugin.
|
||||
type FakePermitPlugin struct {
|
||||
Status *framework.Status
|
||||
Timeout time.Duration
|
||||
}
|
||||
|
||||
// Name returns name of the plugin.
|
||||
func (pl *FakePermitPlugin) Name() string {
|
||||
return "FakePermit"
|
||||
}
|
||||
|
||||
// Permit invoked at the Permit extension point.
|
||||
func (pl *FakePermitPlugin) Permit(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ string) (*framework.Status, time.Duration) {
|
||||
return pl.Status, pl.Timeout
|
||||
}
|
||||
|
||||
// NewFakePermitPlugin initializes a fakePermitPlugin and returns it.
|
||||
func NewFakePermitPlugin(status *framework.Status, timeout time.Duration) frameworkruntime.PluginFactory {
|
||||
return func(_ runtime.Object, _ framework.FrameworkHandle) (framework.Plugin, error) {
|
||||
return &FakePermitPlugin{
|
||||
Status: status,
|
||||
Timeout: timeout,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
@ -52,6 +52,21 @@ func RegisterFilterPlugin(pluginName string, pluginNewFunc runtime.PluginFactory
|
||||
return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "Filter")
|
||||
}
|
||||
|
||||
// RegisterReservePlugin returns a function to register a Reserve Plugin to a given registry.
|
||||
func RegisterReservePlugin(pluginName string, pluginNewFunc runtime.PluginFactory) RegisterPluginFunc {
|
||||
return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "Reserve")
|
||||
}
|
||||
|
||||
// RegisterPermitPlugin returns a function to register a Permit Plugin to a given registry.
|
||||
func RegisterPermitPlugin(pluginName string, pluginNewFunc runtime.PluginFactory) RegisterPluginFunc {
|
||||
return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "Permit")
|
||||
}
|
||||
|
||||
// RegisterPreBindPlugin returns a function to register a PreBind Plugin to a given registry.
|
||||
func RegisterPreBindPlugin(pluginName string, pluginNewFunc runtime.PluginFactory) RegisterPluginFunc {
|
||||
return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "PreBind")
|
||||
}
|
||||
|
||||
// RegisterScorePlugin returns a function to register a Score Plugin to a given registry.
|
||||
func RegisterScorePlugin(pluginName string, pluginNewFunc runtime.PluginFactory, weight int32) RegisterPluginFunc {
|
||||
return RegisterPluginAsExtensionsWithWeight(pluginName, weight, pluginNewFunc, "Score")
|
||||
|
Loading…
Reference in New Issue
Block a user