Merge pull request #119784 from sanposhiho/bug-fix

fix: broadcast when pod is pushed back to activeQ directly in AddUnschedulableIfNotPresent
This commit is contained in:
Kubernetes Prow Robot 2023-08-09 13:05:31 -07:00 committed by GitHub
commit 201f807036
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 97 additions and 0 deletions

View File

@ -732,6 +732,10 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo *
// In this case, we try to requeue this Pod to activeQ/backoffQ. // In this case, we try to requeue this Pod to activeQ/backoffQ.
queue := p.requeuePodViaQueueingHint(logger, pInfo, schedulingHint, ScheduleAttemptFailure) queue := p.requeuePodViaQueueingHint(logger, pInfo, schedulingHint, ScheduleAttemptFailure)
logger.V(6).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", ScheduleAttemptFailure, "queue", queue, "schedulingCycle", podSchedulingCycle) logger.V(6).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", ScheduleAttemptFailure, "queue", queue, "schedulingCycle", podSchedulingCycle)
if queue == activeQ {
// When the Pod is moved to activeQ, need to let p.cond know so that the Pod will be pop()ed out.
p.cond.Broadcast()
}
p.addNominatedPodUnlocked(logger, pInfo.PodInfo, nil) p.addNominatedPodUnlocked(logger, pInfo.PodInfo, nil)
return nil return nil

View File

@ -44,6 +44,8 @@ import (
"k8s.io/kubernetes/pkg/scheduler" "k8s.io/kubernetes/pkg/scheduler"
configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
st "k8s.io/kubernetes/pkg/scheduler/testing" st "k8s.io/kubernetes/pkg/scheduler/testing"
testfwk "k8s.io/kubernetes/test/integration/framework" testfwk "k8s.io/kubernetes/test/integration/framework"
@ -439,3 +441,94 @@ func TestCustomResourceEnqueue(t *testing.T) {
t.Errorf("Expected the Pod to be attempted 2 times, but got %v", podInfo.Attempts) t.Errorf("Expected the Pod to be attempted 2 times, but got %v", podInfo.Attempts)
} }
} }
// TestRequeueByBindFailure verify Pods failed by bind plugin are
// put back to the queue regardless of whether event happens or not.
func TestRequeueByBindFailure(t *testing.T) {
registry := frameworkruntime.Registry{
"firstFailBindPlugin": newFirstFailBindPlugin,
}
cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{
Profiles: []configv1.KubeSchedulerProfile{{
SchedulerName: pointer.String(v1.DefaultSchedulerName),
Plugins: &configv1.Plugins{
MultiPoint: configv1.PluginSet{
Enabled: []configv1.Plugin{
{Name: "firstFailBindPlugin"},
},
Disabled: []configv1.Plugin{
{Name: names.DefaultBinder},
},
},
},
}}})
// Use zero backoff seconds to bypass backoffQ.
testCtx := testutils.InitTestSchedulerWithOptions(
t,
testutils.InitTestAPIServer(t, "core-res-enqueue", nil),
0,
scheduler.WithPodInitialBackoffSeconds(0),
scheduler.WithPodMaxBackoffSeconds(0),
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(registry),
)
testutils.SyncSchedulerInformerFactory(testCtx)
go testCtx.Scheduler.Run(testCtx.Ctx)
cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx
node := st.MakeNode().Name("fake-node").Obj()
if _, err := cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Node %q: %v", node.Name, err)
}
// create a pod.
pod := st.MakePod().Namespace(ns).Name("pod-1").Container(imageutils.GetPauseImageName()).Obj()
if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
}
// first binding try should fail.
err := wait.Poll(200*time.Millisecond, wait.ForeverTestTimeout, testutils.PodSchedulingError(cs, ns, "pod-1"))
if err != nil {
t.Fatalf("Expect pod-1 to be rejected by the bind plugin")
}
// The pod should be enqueued to activeQ/backoffQ without any event.
// The pod should be scheduled in the second binding try.
err = wait.Poll(200*time.Millisecond, wait.ForeverTestTimeout, testutils.PodScheduled(cs, ns, "pod-1"))
if err != nil {
t.Fatalf("Expect pod-1 to be scheduled by the bind plugin in the second binding try")
}
}
// firstFailBindPlugin rejects the Pod in the first Bind call.
type firstFailBindPlugin struct {
counter int
defaultBinderPlugin framework.BindPlugin
}
func newFirstFailBindPlugin(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) {
binder, err := defaultbinder.New(nil, handle)
if err != nil {
return nil, err
}
return &firstFailBindPlugin{
defaultBinderPlugin: binder.(framework.BindPlugin),
}, nil
}
func (*firstFailBindPlugin) Name() string {
return "firstFailBindPlugin"
}
func (p *firstFailBindPlugin) Bind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodename string) *framework.Status {
if p.counter == 0 {
// fail in the first Bind call.
p.counter++
return framework.NewStatus(framework.Error, "firstFailBindPlugin rejects the Pod")
}
return p.defaultBinderPlugin.Bind(ctx, state, pod, nodename)
}