fix: register the plugin rejects Pods in WaitOnPermit to UnschedulablePlugins

This commit is contained in:
Kensei Nakada 2023-08-06 09:06:20 +00:00
parent 6e0cb243d5
commit cf3f0bd778
4 changed files with 143 additions and 1 deletions

View File

@ -177,6 +177,7 @@ type QueuedPodInfo struct {
// latency for a pod. // latency for a pod.
InitialAttemptTimestamp *time.Time InitialAttemptTimestamp *time.Time
// If a Pod failed in a scheduling cycle, record the plugin names it failed by. // If a Pod failed in a scheduling cycle, record the plugin names it failed by.
// It's registered only when the Pod is rejected in PreFilter, Filter, Reserve, or Permit (WaitOnPermit).
UnschedulablePlugins sets.Set[string] UnschedulablePlugins sets.Set[string]
// Whether the Pod is scheduling gated (by PreEnqueuePlugins) or not. // Whether the Pod is scheduling gated (by PreEnqueuePlugins) or not.
Gated bool Gated bool

View File

@ -731,7 +731,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo *
// In this case, we try to requeue this Pod to activeQ/backoffQ. // In this case, we try to requeue this Pod to activeQ/backoffQ.
queue := p.requeuePodViaQueueingHint(logger, pInfo, schedulingHint, ScheduleAttemptFailure) queue := p.requeuePodViaQueueingHint(logger, pInfo, schedulingHint, ScheduleAttemptFailure)
logger.V(6).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", ScheduleAttemptFailure, "queue", queue, "schedulingCycle", podSchedulingCycle) logger.V(3).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", ScheduleAttemptFailure, "queue", queue, "schedulingCycle", podSchedulingCycle, "hint", schedulingHint)
if queue == activeQ { if queue == activeQ {
// When the Pod is moved to activeQ, need to let p.cond know so that the Pod will be pop()ed out. // When the Pod is moved to activeQ, need to let p.cond know so that the Pod will be pop()ed out.
p.cond.Broadcast() p.cond.Broadcast()

View File

@ -269,6 +269,17 @@ func (sched *Scheduler) bindingCycle(
// Run "permit" plugins. // Run "permit" plugins.
if status := fwk.WaitOnPermit(ctx, assumedPod); !status.IsSuccess() { if status := fwk.WaitOnPermit(ctx, assumedPod); !status.IsSuccess() {
if status.IsUnschedulable() {
fitErr := &framework.FitError{
NumAllNodes: 1,
Pod: assumedPodInfo.Pod,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{scheduleResult.SuggestedHost: status},
UnschedulablePlugins: sets.New(status.FailedPlugin()),
},
}
return framework.NewStatus(status.Code()).WithError(fitErr)
}
return status return status
} }

View File

@ -532,3 +532,133 @@ func (p *firstFailBindPlugin) Bind(ctx context.Context, state *framework.CycleSt
return p.defaultBinderPlugin.Bind(ctx, state, pod, nodename) return p.defaultBinderPlugin.Bind(ctx, state, pod, nodename)
} }
// TestRequeueByPermitRejection verify Pods failed by permit plugins in the binding cycle are
// put back to the queue, according to the correct scheduling cycle number.
func TestRequeueByPermitRejection(t *testing.T) {
queueingHintCalledCounter := 0
fakePermit := &fakePermitPlugin{}
registry := frameworkruntime.Registry{
fakePermitPluginName: func(o runtime.Object, fh framework.Handle) (framework.Plugin, error) {
fakePermit = &fakePermitPlugin{
frameworkHandler: fh,
schedulingHint: func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
queueingHintCalledCounter++
return framework.QueueImmediately
},
}
return fakePermit, nil
},
}
cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{
Profiles: []configv1.KubeSchedulerProfile{{
SchedulerName: pointer.String(v1.DefaultSchedulerName),
Plugins: &configv1.Plugins{
MultiPoint: configv1.PluginSet{
Enabled: []configv1.Plugin{
{Name: fakePermitPluginName},
},
},
},
}}})
// Use zero backoff seconds to bypass backoffQ.
testCtx := testutils.InitTestSchedulerWithOptions(
t,
testutils.InitTestAPIServer(t, "core-res-enqueue", nil),
0,
scheduler.WithPodInitialBackoffSeconds(0),
scheduler.WithPodMaxBackoffSeconds(0),
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(registry),
)
testutils.SyncSchedulerInformerFactory(testCtx)
go testCtx.Scheduler.Run(testCtx.Ctx)
cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx
node := st.MakeNode().Name("fake-node").Obj()
if _, err := cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Node %q: %v", node.Name, err)
}
// create a pod.
pod := st.MakePod().Namespace(ns).Name("pod-1").Container(imageutils.GetPauseImageName()).Obj()
if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
}
// update node label. (causes the NodeUpdate event)
node.Labels = map[string]string{"updated": ""}
if _, err := cs.CoreV1().Nodes().Update(ctx, node, metav1.UpdateOptions{}); err != nil {
t.Fatalf("Failed to add labels to the node: %v", err)
}
// create a pod to increment the scheduling cycle number in the scheduling queue.
// We can make sure NodeUpdate event, that has happened in the previous scheduling cycle, makes Pod to be enqueued to activeQ via the scheduling queue.
pod = st.MakePod().Namespace(ns).Name("pod-2").Container(imageutils.GetPauseImageName()).Obj()
if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
}
// reject pod-1 to simulate the failure in Permit plugins.
// This pod-1 should be enqueued to activeQ because the NodeUpdate event has happened.
fakePermit.frameworkHandler.IterateOverWaitingPods(func(wp framework.WaitingPod) {
if wp.GetPod().Name == "pod-1" {
wp.Reject(fakePermitPluginName, "fakePermitPlugin rejects the Pod")
return
}
})
// Wait for pod-2 to be scheduled.
err := wait.Poll(200*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) {
fakePermit.frameworkHandler.IterateOverWaitingPods(func(wp framework.WaitingPod) {
if wp.GetPod().Name == "pod-2" {
wp.Allow(fakePermitPluginName)
}
})
return testutils.PodScheduled(cs, ns, "pod-2")()
})
if err != nil {
t.Fatalf("Expect pod-2 to be scheduled")
}
err = wait.Poll(200*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) {
pod1Found := false
fakePermit.frameworkHandler.IterateOverWaitingPods(func(wp framework.WaitingPod) {
if wp.GetPod().Name == "pod-1" {
pod1Found = true
wp.Allow(fakePermitPluginName)
}
})
return pod1Found, nil
})
if err != nil {
t.Fatal("Expect pod-1 to be scheduled again")
}
if queueingHintCalledCounter != 1 {
t.Fatalf("Expected the scheduling hint to be called 1 time, but %v", queueingHintCalledCounter)
}
}
type fakePermitPlugin struct {
frameworkHandler framework.Handle
schedulingHint framework.QueueingHintFn
}
const fakePermitPluginName = "fakePermitPlugin"
func (p *fakePermitPlugin) Name() string {
return fakePermitPluginName
}
func (p *fakePermitPlugin) Permit(ctx context.Context, state *framework.CycleState, _ *v1.Pod, _ string) (*framework.Status, time.Duration) {
return framework.NewStatus(framework.Wait), wait.ForeverTestTimeout
}
func (p *fakePermitPlugin) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeLabel}, QueueingHintFn: p.schedulingHint},
}
}