Merge pull request #119785 from sanposhiho/waitonpermit-fiterror

fix: register the plugin rejects Pods in WaitOnPermit to UnschedulablePlugins
This commit is contained in:
Kubernetes Prow Robot 2023-08-15 23:13:04 -07:00 committed by GitHub
commit 130a5a423f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 143 additions and 1 deletions

View File

@ -177,6 +177,7 @@ type QueuedPodInfo struct {
// latency for a pod.
InitialAttemptTimestamp *time.Time
// If a Pod failed in a scheduling cycle, record the plugin names it failed by.
// It's registered only when the Pod is rejected in PreFilter, Filter, Reserve, or Permit (WaitOnPermit).
UnschedulablePlugins sets.Set[string]
// Whether the Pod is scheduling gated (by PreEnqueuePlugins) or not.
Gated bool

View File

@ -731,7 +731,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo *
// In this case, we try to requeue this Pod to activeQ/backoffQ.
queue := p.requeuePodViaQueueingHint(logger, pInfo, schedulingHint, ScheduleAttemptFailure)
logger.V(6).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", ScheduleAttemptFailure, "queue", queue, "schedulingCycle", podSchedulingCycle)
logger.V(3).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", ScheduleAttemptFailure, "queue", queue, "schedulingCycle", podSchedulingCycle, "hint", schedulingHint)
if queue == activeQ {
// When the Pod is moved to activeQ, need to let p.cond know so that the Pod will be pop()ed out.
p.cond.Broadcast()

View File

@ -269,6 +269,17 @@ func (sched *Scheduler) bindingCycle(
// Run "permit" plugins.
if status := fwk.WaitOnPermit(ctx, assumedPod); !status.IsSuccess() {
if status.IsUnschedulable() {
fitErr := &framework.FitError{
NumAllNodes: 1,
Pod: assumedPodInfo.Pod,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{scheduleResult.SuggestedHost: status},
UnschedulablePlugins: sets.New(status.FailedPlugin()),
},
}
return framework.NewStatus(status.Code()).WithError(fitErr)
}
return status
}

View File

@ -532,3 +532,133 @@ func (p *firstFailBindPlugin) Bind(ctx context.Context, state *framework.CycleSt
return p.defaultBinderPlugin.Bind(ctx, state, pod, nodename)
}
// TestRequeueByPermitRejection verify Pods failed by permit plugins in the binding cycle are
// put back to the queue, according to the correct scheduling cycle number.
func TestRequeueByPermitRejection(t *testing.T) {
queueingHintCalledCounter := 0
fakePermit := &fakePermitPlugin{}
registry := frameworkruntime.Registry{
fakePermitPluginName: func(o runtime.Object, fh framework.Handle) (framework.Plugin, error) {
fakePermit = &fakePermitPlugin{
frameworkHandler: fh,
schedulingHint: func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
queueingHintCalledCounter++
return framework.QueueImmediately
},
}
return fakePermit, nil
},
}
cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{
Profiles: []configv1.KubeSchedulerProfile{{
SchedulerName: pointer.String(v1.DefaultSchedulerName),
Plugins: &configv1.Plugins{
MultiPoint: configv1.PluginSet{
Enabled: []configv1.Plugin{
{Name: fakePermitPluginName},
},
},
},
}}})
// Use zero backoff seconds to bypass backoffQ.
testCtx := testutils.InitTestSchedulerWithOptions(
t,
testutils.InitTestAPIServer(t, "core-res-enqueue", nil),
0,
scheduler.WithPodInitialBackoffSeconds(0),
scheduler.WithPodMaxBackoffSeconds(0),
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(registry),
)
testutils.SyncSchedulerInformerFactory(testCtx)
go testCtx.Scheduler.Run(testCtx.Ctx)
cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx
node := st.MakeNode().Name("fake-node").Obj()
if _, err := cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Node %q: %v", node.Name, err)
}
// create a pod.
pod := st.MakePod().Namespace(ns).Name("pod-1").Container(imageutils.GetPauseImageName()).Obj()
if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
}
// update node label. (causes the NodeUpdate event)
node.Labels = map[string]string{"updated": ""}
if _, err := cs.CoreV1().Nodes().Update(ctx, node, metav1.UpdateOptions{}); err != nil {
t.Fatalf("Failed to add labels to the node: %v", err)
}
// create a pod to increment the scheduling cycle number in the scheduling queue.
// We can make sure NodeUpdate event, that has happened in the previous scheduling cycle, makes Pod to be enqueued to activeQ via the scheduling queue.
pod = st.MakePod().Namespace(ns).Name("pod-2").Container(imageutils.GetPauseImageName()).Obj()
if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
}
// reject pod-1 to simulate the failure in Permit plugins.
// This pod-1 should be enqueued to activeQ because the NodeUpdate event has happened.
fakePermit.frameworkHandler.IterateOverWaitingPods(func(wp framework.WaitingPod) {
if wp.GetPod().Name == "pod-1" {
wp.Reject(fakePermitPluginName, "fakePermitPlugin rejects the Pod")
return
}
})
// Wait for pod-2 to be scheduled.
err := wait.Poll(200*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) {
fakePermit.frameworkHandler.IterateOverWaitingPods(func(wp framework.WaitingPod) {
if wp.GetPod().Name == "pod-2" {
wp.Allow(fakePermitPluginName)
}
})
return testutils.PodScheduled(cs, ns, "pod-2")()
})
if err != nil {
t.Fatalf("Expect pod-2 to be scheduled")
}
err = wait.Poll(200*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) {
pod1Found := false
fakePermit.frameworkHandler.IterateOverWaitingPods(func(wp framework.WaitingPod) {
if wp.GetPod().Name == "pod-1" {
pod1Found = true
wp.Allow(fakePermitPluginName)
}
})
return pod1Found, nil
})
if err != nil {
t.Fatal("Expect pod-1 to be scheduled again")
}
if queueingHintCalledCounter != 1 {
t.Fatalf("Expected the scheduling hint to be called 1 time, but %v", queueingHintCalledCounter)
}
}
type fakePermitPlugin struct {
frameworkHandler framework.Handle
schedulingHint framework.QueueingHintFn
}
const fakePermitPluginName = "fakePermitPlugin"
func (p *fakePermitPlugin) Name() string {
return fakePermitPluginName
}
func (p *fakePermitPlugin) Permit(ctx context.Context, state *framework.CycleState, _ *v1.Pod, _ string) (*framework.Status, time.Duration) {
return framework.NewStatus(framework.Wait), wait.ForeverTestTimeout
}
func (p *fakePermitPlugin) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeLabel}, QueueingHintFn: p.schedulingHint},
}
}