Support fine-gained rescheduling in ReservePlugin

Signed-off-by: kerthcet <kerthcet@gmail.com>
This commit is contained in:
kerthcet 2023-07-07 10:51:30 +08:00
parent b07a843cb5
commit c0eb0caf4a
5 changed files with 109 additions and 3 deletions

View File

@ -1265,6 +1265,11 @@ func (f *frameworkImpl) RunReservePluginsReserve(ctx context.Context, state *fra
ctx := klog.NewContext(ctx, logger)
status = f.runReservePluginReserve(ctx, pl, state, pod, nodeName)
if !status.IsSuccess() {
if status.IsUnschedulable() {
logger.V(4).Info("Pod rejected by plugin", "pod", klog.KObj(pod), "plugin", pl.Name(), "status", status.Message())
status.SetFailedPlugin(pl.Name())
return status
}
err := status.AsError()
logger.Error(err, "Plugin failed", "plugin", pl.Name(), "pod", klog.KObj(pod))
return framework.AsStatus(fmt.Errorf("running Reserve plugin %q: %w", pl.Name(), err))

View File

@ -2422,7 +2422,7 @@ func TestReservePlugins(t *testing.T) {
inj: injectedResult{ReserveStatus: int(framework.Unschedulable)},
},
},
wantStatus: framework.AsStatus(fmt.Errorf(`running Reserve plugin "TestPlugin": %w`, errInjectedStatus)),
wantStatus: framework.NewStatus(framework.Unschedulable, injectReason).WithFailedPlugin("TestPlugin"),
},
{
name: "ErrorReservePlugin",
@ -2442,7 +2442,7 @@ func TestReservePlugins(t *testing.T) {
inj: injectedResult{ReserveStatus: int(framework.UnschedulableAndUnresolvable)},
},
},
wantStatus: framework.AsStatus(fmt.Errorf(`running Reserve plugin "TestPlugin": %w`, errInjectedStatus)),
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, injectReason).WithFailedPlugin("TestPlugin"),
},
{
name: "SuccessSuccessReservePlugins",
@ -2512,7 +2512,7 @@ func TestReservePlugins(t *testing.T) {
inj: injectedResult{ReserveStatus: int(framework.Success)},
},
},
wantStatus: framework.AsStatus(fmt.Errorf(`running Reserve plugin "TestPlugin": %w`, errInjectedStatus)),
wantStatus: framework.NewStatus(framework.Unschedulable, injectReason).WithFailedPlugin("TestPlugin"),
},
}

View File

@ -1458,6 +1458,17 @@ func TestFitError_Error(t *testing.T) {
},
wantReasonMsg: "0/1 nodes are available: 1 Node failed Permit plugin Permit-1.",
},
{
name: "failed to Reserve on node",
numAllNodes: 1,
diagnosis: Diagnosis{
NodeToStatusMap: NodeToStatusMap{
// There should be only one node here.
"node1": NewStatus(Unschedulable, "Node failed Reserve plugin Reserve-1"),
},
},
wantReasonMsg: "0/1 nodes are available: 1 Node failed Reserve plugin Reserve-1.",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

View File

@ -195,6 +195,17 @@ func (sched *Scheduler) schedulingCycle(
logger.Error(forgetErr, "Scheduler cache ForgetPod failed")
}
if sts.IsUnschedulable() {
fitErr := &framework.FitError{
NumAllNodes: 1,
Pod: pod,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{scheduleResult.SuggestedHost: sts},
UnschedulablePlugins: sets.New(sts.FailedPlugin()),
},
}
return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, framework.NewStatus(sts.Code()).WithError(fitErr)
}
return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, sts
}

View File

@ -31,6 +31,50 @@ import (
var _ framework.PermitPlugin = &PermitPlugin{}
var _ framework.EnqueueExtensions = &PermitPlugin{}
var _ framework.ReservePlugin = &ReservePlugin{}
var _ framework.EnqueueExtensions = &ReservePlugin{}
type ReservePlugin struct {
name string
statusCode framework.Code
numReserveCalled int
numUnreserveCalled int
}
func (rp *ReservePlugin) Name() string {
return rp.name
}
func (rp *ReservePlugin) Reserve(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
rp.numReserveCalled += 1
if rp.statusCode == framework.Error {
return framework.NewStatus(framework.Error, "failed to reserve")
}
if rp.statusCode == framework.Unschedulable {
if rp.numReserveCalled <= 1 {
return framework.NewStatus(framework.Unschedulable, "reject to reserve")
}
}
return nil
}
func (rp *ReservePlugin) Unreserve(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) {
rp.numUnreserveCalled += 1
}
func (rp *ReservePlugin) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{
{
Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add},
QueueingHintFn: func(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
return framework.QueueImmediately
},
},
}
}
type PermitPlugin struct {
name string
@ -118,6 +162,41 @@ func TestReScheduling(t *testing.T) {
wantFirstSchedulingError: true,
wantError: true,
},
{
name: "Rescheduling pod rejected by Reserve Plugin",
plugins: []framework.Plugin{
&ReservePlugin{name: "reserve", statusCode: framework.Unschedulable},
},
action: func() error {
_, err := createNode(testContext.ClientSet, st.MakeNode().Name("fake-node").Obj())
return err
},
wantScheduled: true,
},
{
name: "Rescheduling pod rejected by Reserve Plugin with unrelated event",
plugins: []framework.Plugin{
&ReservePlugin{name: "reserve", statusCode: framework.Unschedulable},
},
action: func() error {
_, err := createPausePod(testContext.ClientSet,
initPausePod(&testutils.PausePodConfig{Name: "test-pod-2", Namespace: testContext.NS.Name}))
return err
},
wantScheduled: false,
},
{
name: "Rescheduling pod failed by Reserve Plugin",
plugins: []framework.Plugin{
&ReservePlugin{name: "reserve", statusCode: framework.Error},
},
action: func() error {
_, err := createNode(testContext.ClientSet, st.MakeNode().Name("fake-node").Obj())
return err
},
wantFirstSchedulingError: true,
wantError: true,
},
}
for _, test := range tests {