Merge pull request #124618 from gabesaba/gated_performance

Filter gated pods before calling isPodWorthRequeueing
This commit is contained in:
Kubernetes Prow Robot 2024-05-09 11:33:23 -07:00 committed by GitHub
commit db82fd1604
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 74 additions and 26 deletions

View File

@ -1175,6 +1175,11 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podIn
activated := false activated := false
for _, pInfo := range podInfoList { for _, pInfo := range podInfoList {
// Since there may be many gated pods and they will not move from the
// unschedulable pool, we skip calling the expensive isPodWorthRequeueing.
if pInfo.Gated {
continue
}
schedulingHint := p.isPodWorthRequeuing(logger, pInfo, event, oldObj, newObj) schedulingHint := p.isPodWorthRequeuing(logger, pInfo, event, oldObj, newObj)
if schedulingHint == queueSkip { if schedulingHint == queueSkip {
// QueueingHintFn determined that this Pod isn't worth putting to activeQ or backoffQ by this event. // QueueingHintFn determined that this Pod isn't worth putting to activeQ or backoffQ by this event.

View File

@ -94,6 +94,11 @@ var (
} }
) )
func setQueuedPodInfoGated(queuedPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo {
queuedPodInfo.Gated = true
return queuedPodInfo
}
func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod { func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod {
pInfo := p.unschedulablePods.get(pod) pInfo := p.unschedulablePods.get(pod)
if pInfo != nil { if pInfo != nil {
@ -1452,6 +1457,14 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing.
hint: queueHintReturnSkip, hint: queueHintReturnSkip,
expectedQ: unschedulablePods, expectedQ: unschedulablePods,
}, },
{
name: "QueueHintFunction is not called when Pod is gated",
podInfo: setQueuedPodInfoGated(&framework.QueuedPodInfo{PodInfo: mustNewPodInfo(p), UnschedulablePlugins: sets.New("foo")}),
hint: func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
return framework.Queue, fmt.Errorf("QueueingHintFn should not be called as pod is gated")
},
expectedQ: unschedulablePods,
},
} }
for _, test := range tests { for _, test := range tests {
@ -2733,7 +2746,7 @@ func TestPendingPodsMetric(t *testing.T) {
gated := makeQueuedPodInfos(total-queueableNum, "y", failme, timestamp) gated := makeQueuedPodInfos(total-queueableNum, "y", failme, timestamp)
// Manually mark them as gated=true. // Manually mark them as gated=true.
for _, pInfo := range gated { for _, pInfo := range gated {
pInfo.Gated = true setQueuedPodInfoGated(pInfo)
} }
pInfos = append(pInfos, gated...) pInfos = append(pInfos, gated...)
totalWithDelay := 20 totalWithDelay := 20

View File

@ -2677,7 +2677,7 @@ func TestSchedulingGatesPluginEventsToRegister(t *testing.T) {
{ {
name: "preEnqueue plugin with event registered", name: "preEnqueue plugin with event registered",
enqueuePlugin: &SchedulingGatesPluginWithEvents{SchedulingGates: schedulinggates.SchedulingGates{}}, enqueuePlugin: &SchedulingGatesPluginWithEvents{SchedulingGates: schedulinggates.SchedulingGates{}},
count: 3, count: 2,
}, },
} }

View File

@ -59,9 +59,9 @@ func TestSchedulingGates(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
pods []*v1.Pod pods []*v1.Pod
want []string schedule []string
rmPodsSchedulingGates []int delete []string
wantPostGatesRemoval []string rmGates []string
}{ }{
{ {
name: "regular pods", name: "regular pods",
@ -69,7 +69,7 @@ func TestSchedulingGates(t *testing.T) {
st.MakePod().Name("p1").Container("pause").Obj(), st.MakePod().Name("p1").Container("pause").Obj(),
st.MakePod().Name("p2").Container("pause").Obj(), st.MakePod().Name("p2").Container("pause").Obj(),
}, },
want: []string{"p1", "p2"}, schedule: []string{"p1", "p2"},
}, },
{ {
name: "one pod carrying scheduling gates", name: "one pod carrying scheduling gates",
@ -77,7 +77,7 @@ func TestSchedulingGates(t *testing.T) {
st.MakePod().Name("p1").SchedulingGates([]string{"foo"}).Container("pause").Obj(), st.MakePod().Name("p1").SchedulingGates([]string{"foo"}).Container("pause").Obj(),
st.MakePod().Name("p2").Container("pause").Obj(), st.MakePod().Name("p2").Container("pause").Obj(),
}, },
want: []string{"p2"}, schedule: []string{"p2"},
}, },
{ {
name: "two pod carrying scheduling gates, and remove gates of one pod", name: "two pod carrying scheduling gates, and remove gates of one pod",
@ -86,9 +86,18 @@ func TestSchedulingGates(t *testing.T) {
st.MakePod().Name("p2").SchedulingGates([]string{"bar"}).Container("pause").Obj(), st.MakePod().Name("p2").SchedulingGates([]string{"bar"}).Container("pause").Obj(),
st.MakePod().Name("p3").Container("pause").Obj(), st.MakePod().Name("p3").Container("pause").Obj(),
}, },
want: []string{"p3"}, schedule: []string{"p3"},
rmPodsSchedulingGates: []int{1}, // remove gates of 'p2' rmGates: []string{"p2"},
wantPostGatesRemoval: []string{"p2"}, },
{
name: "gated pod schedulable after deleting the scheduled pod and removing gate",
pods: []*v1.Pod{
st.MakePod().Name("p1").SchedulingGates([]string{"foo"}).Container("pause").Obj(),
st.MakePod().Name("p2").Container("pause").Obj(),
},
schedule: []string{"p2"},
delete: []string{"p2"},
rmGates: []string{"p1"},
}, },
} }
@ -107,6 +116,15 @@ func TestSchedulingGates(t *testing.T) {
testutils.SyncSchedulerInformerFactory(testCtx) testutils.SyncSchedulerInformerFactory(testCtx)
cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx
// Create node, so we can schedule pods.
node := st.MakeNode().Name("node").Obj()
if _, err := cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil {
t.Fatal("Failed to create node")
}
// Create pods.
for _, p := range tt.pods { for _, p := range tt.pods {
p.Namespace = ns p.Namespace = ns
if _, err := cs.CoreV1().Pods(ns).Create(ctx, p, metav1.CreateOptions{}); err != nil { if _, err := cs.CoreV1().Pods(ns).Create(ctx, p, metav1.CreateOptions{}); err != nil {
@ -122,30 +140,42 @@ func TestSchedulingGates(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
// Pop the expected pods out. They should be de-queueable. // Schedule pods.
for _, wantPod := range tt.want { for _, podName := range tt.schedule {
podInfo := testutils.NextPodOrDie(t, testCtx) testCtx.Scheduler.ScheduleOne(testCtx.Ctx)
if got := podInfo.Pod.Name; got != wantPod { if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, testutils.PodScheduled(cs, ns, podName)); err != nil {
t.Errorf("Want %v to be popped out, but got %v", wantPod, got) t.Fatalf("Failed to schedule %s", podName)
} }
} }
if len(tt.rmPodsSchedulingGates) == 0 { // Delete pods, which triggers AssignedPodDelete event in the scheduling queue.
return for _, podName := range tt.delete {
if err := cs.CoreV1().Pods(ns).Delete(ctx, podName, metav1.DeleteOptions{}); err != nil {
t.Fatalf("Error calling Delete on %s", podName)
} }
if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, testutils.PodDeleted(ctx, cs, ns, podName)); err != nil {
t.Fatalf("Failed to delete %s", podName)
}
}
// Ensure gated pods are not in ActiveQ
if len(testCtx.Scheduler.SchedulingQueue.PodsInActiveQ()) > 0 {
t.Fatal("Expected no schedulable pods")
}
// Remove scheduling gates from the pod spec. // Remove scheduling gates from the pod spec.
for _, idx := range tt.rmPodsSchedulingGates { for _, podName := range tt.rmGates {
patch := `{"spec": {"schedulingGates": null}}` patch := `{"spec": {"schedulingGates": null}}`
podName := tt.pods[idx].Name
if _, err := cs.CoreV1().Pods(ns).Patch(ctx, podName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}); err != nil { if _, err := cs.CoreV1().Pods(ns).Patch(ctx, podName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}); err != nil {
t.Fatalf("Failed to patch pod %v: %v", podName, err) t.Fatalf("Failed to patch pod %v: %v", podName, err)
} }
} }
// Pop the expected pods out. They should be de-queueable.
for _, wantPod := range tt.wantPostGatesRemoval { // Schedule pods which no longer have gates.
podInfo := testutils.NextPodOrDie(t, testCtx) for _, podName := range tt.rmGates {
if got := podInfo.Pod.Name; got != wantPod { testCtx.Scheduler.ScheduleOne(testCtx.Ctx)
t.Errorf("Want %v to be popped out, but got %v", wantPod, got) if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, testutils.PodScheduled(cs, ns, podName)); err != nil {
t.Fatalf("Failed to schedule %s", podName)
} }
} }
}) })