Filter gated pods before calling isPodWorthRequeueing

This commit is contained in:
Gabe 2024-04-22 17:00:58 +00:00
parent 72524fa574
commit 4e99ada05f
4 changed files with 101 additions and 2 deletions

View File

@ -1175,6 +1175,11 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podIn
activated := false
for _, pInfo := range podInfoList {
// Since there may be many gated pods and they will not move from the
// unschedulable pool, we skip calling the expensive isPodWorthRequeueing.
if pInfo.Gated {
continue
}
schedulingHint := p.isPodWorthRequeuing(logger, pInfo, event, oldObj, newObj)
if schedulingHint == queueSkip {
// QueueingHintFn determined that this Pod isn't worth putting to activeQ or backoffQ by this event.

View File

@ -44,6 +44,7 @@ import (
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/schedulinggates"
"k8s.io/kubernetes/pkg/scheduler/metrics"
st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/kubernetes/pkg/scheduler/util"
@ -93,6 +94,11 @@ var (
}
)
func setQueuedPodInfoGated(queuedPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo {
queuedPodInfo.Gated = true
return queuedPodInfo
}
func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod {
pInfo := p.unschedulablePods.get(pod)
if pInfo != nil {
@ -1451,6 +1457,14 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing.
hint: queueHintReturnSkip,
expectedQ: unschedulablePods,
},
{
name: "QueueHintFunction is not called when Pod is gated",
podInfo: setQueuedPodInfoGated(&framework.QueuedPodInfo{PodInfo: mustNewPodInfo(p), UnschedulablePlugins: sets.New("foo")}),
hint: func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
return framework.Queue, fmt.Errorf("QueueingHintFn should not be called as pod is gated")
},
expectedQ: unschedulablePods,
},
}
for _, test := range tests {
@ -2732,7 +2746,7 @@ func TestPendingPodsMetric(t *testing.T) {
gated := makeQueuedPodInfos(total-queueableNum, "y", failme, timestamp)
// Manually mark them as gated=true.
for _, pInfo := range gated {
pInfo.Gated = true
setQueuedPodInfoGated(pInfo)
}
pInfos = append(pInfos, gated...)
totalWithDelay := 20
@ -3729,3 +3743,26 @@ func Test_isPodWorthRequeuing(t *testing.T) {
})
}
}
func Test_queuedPodInfo_gatedSetUponCreationAndUnsetUponUpdate(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
plugin, _ := schedulinggates.New(ctx, nil, nil)
m := map[string][]framework.PreEnqueuePlugin{"": {plugin.(framework.PreEnqueuePlugin)}}
q := NewTestQueue(ctx, newDefaultQueueSort(), WithPreEnqueuePluginMap(m))
gatedPod := st.MakePod().SchedulingGates([]string{"hello world"}).Obj()
q.Add(logger, gatedPod)
if !q.unschedulablePods.get(gatedPod).Gated {
t.Error("expected pod to be gated")
}
ungatedPod := gatedPod.DeepCopy()
ungatedPod.Spec.SchedulingGates = nil
q.Update(logger, gatedPod, ungatedPod)
ungatedPodInfo, _ := q.Pop(logger)
if ungatedPodInfo.Gated {
t.Error("expected pod to be ungated")
}
}

View File

@ -2677,7 +2677,7 @@ func TestSchedulingGatesPluginEventsToRegister(t *testing.T) {
{
name: "preEnqueue plugin with event registered",
enqueuePlugin: &SchedulingGatesPluginWithEvents{SchedulingGates: schedulinggates.SchedulingGates{}},
count: 3,
count: 2,
},
}

View File

@ -152,6 +152,63 @@ func TestSchedulingGates(t *testing.T) {
}
}
// We create a gated and non-gated pod. After scheduling and deleting the
// non-gated pod, we ensure that the gated pod is schedulable after the removal
// of its gate.
func TestGatedPodSchedulableAfterEvent(t *testing.T) {
testCtx := testutils.InitTestSchedulerWithOptions(
t,
testutils.InitTestAPIServer(t, "pod-scheduling-gates", nil),
0,
scheduler.WithPodInitialBackoffSeconds(0),
scheduler.WithPodMaxBackoffSeconds(0),
)
testutils.SyncSchedulerInformerFactory(testCtx)
cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx
// create initially gated pod
pod1 := st.MakePod().Namespace(ns).Name("p1").Container("pause").SchedulingGates([]string{"foo"}).Obj()
if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod1, metav1.CreateOptions{}); err != nil {
t.Fatal("Failed to create p1")
}
// create immediately schedulable pod
pod2 := st.MakePod().Namespace(ns).Name("p2").Container("pause").Obj()
if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod2, metav1.CreateOptions{}); err != nil {
t.Fatal("Failed to create p2")
}
// create node on which to schedule pods
node := st.MakeNode().Name("node1").Obj()
if _, err := cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil {
t.Fatal("Failed to create node1")
}
// schedule p2
testCtx.Scheduler.ScheduleOne(testCtx.Ctx)
if err := wait.PollUntilContextTimeout(ctx, 200*time.Millisecond, wait.ForeverTestTimeout, false, testutils.PodScheduled(cs, ns, "p2")); err != nil {
t.Fatal("Failed to schedule p2")
}
// delete p2, which triggers DeletePodFromCache event
cs.CoreV1().Pods(ns).Delete(ctx, "p2", metav1.DeleteOptions{})
if err := wait.PollUntilContextTimeout(ctx, 200*time.Millisecond, wait.ForeverTestTimeout, false, testutils.PodDeleted(ctx, cs, ns, "p2")); err != nil {
t.Fatal("Failed to delete p2")
}
// remove gate from p1
patch := `{"spec": {"schedulingGates": null}}`
if _, err := cs.CoreV1().Pods(ns).Patch(ctx, "p1", types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}); err != nil {
t.Fatal("Failed to remove schedulingGates from p1")
}
// schedule p1
testCtx.Scheduler.ScheduleOne(testCtx.Ctx)
if err := wait.PollUntilContextTimeout(ctx, 200*time.Millisecond, wait.ForeverTestTimeout, false, testutils.PodScheduled(cs, ns, "p1")); err != nil {
t.Fatal("Failed to schedule p1")
}
}
// TestCoreResourceEnqueue verify Pods failed by in-tree default plugins can be
// moved properly upon their registered events.
func TestCoreResourceEnqueue(t *testing.T) {