diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index 8e647fea466..b15a6064f69 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -115,6 +115,8 @@ func (pqi *QueuedPodInfo) DeepCopy() *QueuedPodInfo { Timestamp: pqi.Timestamp, Attempts: pqi.Attempts, InitialAttemptTimestamp: pqi.InitialAttemptTimestamp, + UnschedulablePlugins: pqi.UnschedulablePlugins.Clone(), + Gated: pqi.Gated, } } diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 4e3cf633597..2486792078e 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -368,12 +368,13 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error { defer p.lock.Unlock() pInfo := p.newQueuedPodInfo(pod) + gated := pInfo.Gated if added, err := p.addToActiveQ(pInfo); !added { return err } if p.unschedulablePods.get(pod) != nil { klog.ErrorS(nil, "Error: pod is already in the unschedulable queue", "pod", klog.KObj(pod)) - p.unschedulablePods.delete(pInfo) + p.unschedulablePods.delete(pod, gated) } // Delete pod from backoffQ if it is backing off if err := p.podBackoffQ.Delete(pInfo); err == nil { @@ -428,10 +429,11 @@ func (p *PriorityQueue) activate(pod *v1.Pod) bool { return false } + gated := pInfo.Gated if added, _ := p.addToActiveQ(pInfo); !added { return false } - p.unschedulablePods.delete(pInfo) + p.unschedulablePods.delete(pInfo.Pod, gated) p.podBackoffQ.Delete(pInfo) metrics.SchedulerQueueIncomingPods.WithLabelValues("active", ForceActivate).Inc() p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil) @@ -621,17 +623,18 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { pInfo := updatePod(usPodInfo, newPod) p.PodNominator.UpdateNominatedPod(oldPod, pInfo.PodInfo) if isPodUpdated(oldPod, newPod) { + gated := usPodInfo.Gated if p.isPodBackingoff(usPodInfo) { if err := p.podBackoffQ.Add(pInfo); err != nil { return err } - p.unschedulablePods.delete(usPodInfo) + p.unschedulablePods.delete(usPodInfo.Pod, gated) klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", PodUpdate, "queue", backoffQName) } else { if added, err := p.addToActiveQ(pInfo); !added { return err } - p.unschedulablePods.delete(usPodInfo) + p.unschedulablePods.delete(usPodInfo.Pod, gated) klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", BackoffComplete, "queue", activeQName) p.cond.Broadcast() } @@ -663,7 +666,9 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error { if err := p.activeQ.Delete(pInfo); err != nil { // The item was probably not found in the activeQ. p.podBackoffQ.Delete(pInfo) - p.unschedulablePods.delete(pInfo) + if pInfo = p.unschedulablePods.get(pod); pInfo != nil { + p.unschedulablePods.delete(pod, pInfo.Gated) + } } return nil } @@ -718,14 +723,15 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework. } else { klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", backoffQName) metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event.Label).Inc() - p.unschedulablePods.delete(pInfo) + p.unschedulablePods.delete(pod, pInfo.Gated) } } else { + gated := pInfo.Gated if added, _ := p.addToActiveQ(pInfo); added { klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQName) activated = true metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event.Label).Inc() - p.unschedulablePods.delete(pInfo) + p.unschedulablePods.delete(pod, gated) } } } @@ -875,7 +881,7 @@ type UnschedulablePods struct { unschedulableRecorder, gatedRecorder metrics.MetricRecorder } -// Add adds a pod to the unschedulable podInfoMap. +// addOrUpdate adds a pod to the unschedulable podInfoMap. func (u *UnschedulablePods) addOrUpdate(pInfo *framework.QueuedPodInfo) { podID := u.keyFunc(pInfo.Pod) if _, exists := u.podInfoMap[podID]; !exists { @@ -888,20 +894,21 @@ func (u *UnschedulablePods) addOrUpdate(pInfo *framework.QueuedPodInfo) { u.podInfoMap[podID] = pInfo } -// Delete deletes a pod from the unschedulable podInfoMap. -func (u *UnschedulablePods) delete(pInfo *framework.QueuedPodInfo) { - podID := u.keyFunc(pInfo.Pod) +// delete deletes a pod from the unschedulable podInfoMap. +// The `gated` parameter is used to figure out which metric should be decreased. +func (u *UnschedulablePods) delete(pod *v1.Pod, gated bool) { + podID := u.keyFunc(pod) if _, exists := u.podInfoMap[podID]; exists { - if pInfo.Gated && u.gatedRecorder != nil { + if gated && u.gatedRecorder != nil { u.gatedRecorder.Dec() - } else if !pInfo.Gated && u.unschedulableRecorder != nil { + } else if !gated && u.unschedulableRecorder != nil { u.unschedulableRecorder.Dec() } } delete(u.podInfoMap, podID) } -// Get returns the QueuedPodInfo if a pod with the same key as the key of the given "pod" +// get returns the QueuedPodInfo if a pod with the same key as the key of the given "pod" // is found in the map. It returns nil otherwise. func (u *UnschedulablePods) get(pod *v1.Pod) *framework.QueuedPodInfo { podKey := u.keyFunc(pod) @@ -911,7 +918,7 @@ func (u *UnschedulablePods) get(pod *v1.Pod) *framework.QueuedPodInfo { return nil } -// Clear removes all the entries from the unschedulable podInfoMap. +// clear removes all the entries from the unschedulable podInfoMap. func (u *UnschedulablePods) clear() { u.podInfoMap = make(map[string]*framework.QueuedPodInfo) if u.unschedulableRecorder != nil { diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index e90ef891773..cd819118ac2 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -456,8 +456,10 @@ func (pl *preEnqueuePlugin) Name() string { func (pl *preEnqueuePlugin) PreEnqueue(ctx context.Context, p *v1.Pod) *framework.Status { for _, allowed := range pl.allowlists { - if strings.Contains(p.Name, allowed) { - return nil + for label := range p.Labels { + if label == allowed { + return nil + } } } return framework.NewStatus(framework.UnschedulableAndUnresolvable, "pod name not in allowlists") @@ -473,14 +475,14 @@ func TestPriorityQueue_addToActiveQ(t *testing.T) { }{ { name: "no plugins registered", - pod: st.MakePod().Name("p").Obj(), + pod: st.MakePod().Name("p").Label("p", "").Obj(), wantUnschedulablePods: 0, wantSuccess: true, }, { name: "preEnqueue plugin registered, pod name not in allowlists", plugins: []framework.PreEnqueuePlugin{&preEnqueuePlugin{}, &preEnqueuePlugin{}}, - pod: st.MakePod().Name("p").Obj(), + pod: st.MakePod().Name("p").Label("p", "").Obj(), wantUnschedulablePods: 1, wantSuccess: false, }, @@ -490,7 +492,7 @@ func TestPriorityQueue_addToActiveQ(t *testing.T) { &preEnqueuePlugin{allowlists: []string{"foo", "bar"}}, &preEnqueuePlugin{allowlists: []string{"foo"}}, }, - pod: st.MakePod().Name("bar").Obj(), + pod: st.MakePod().Name("bar").Label("bar", "").Obj(), wantUnschedulablePods: 1, wantSuccess: false, }, @@ -500,7 +502,7 @@ func TestPriorityQueue_addToActiveQ(t *testing.T) { &preEnqueuePlugin{allowlists: []string{"foo", "bar"}}, &preEnqueuePlugin{allowlists: []string{"bar"}}, }, - pod: st.MakePod().Name("bar").Obj(), + pod: st.MakePod().Name("bar").Label("bar", "").Obj(), wantUnschedulablePods: 0, wantSuccess: true, }, @@ -1056,7 +1058,7 @@ func TestUnschedulablePodsMap(t *testing.T) { } } for _, p := range test.podsToDelete { - upm.delete(newQueuedPodInfoForLookup(p)) + upm.delete(p, false) } if !reflect.DeepEqual(upm.podInfoMap, test.expectedMapAfterDelete) { t.Errorf("Unexpected map after deleting pods. Expected: %v, got: %v", @@ -1406,6 +1408,14 @@ var ( } queue.unschedulablePods.addOrUpdate(pInfo) } + deletePod = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { + queue.Delete(pInfo.Pod) + } + updatePodQueueable = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { + newPod := pInfo.Pod.DeepCopy() + newPod.Labels = map[string]string{"queueable": ""} + queue.Update(pInfo.Pod, newPod) + } addPodBackoffQ = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { queue.podBackoffQ.Add(pInfo) } @@ -1526,18 +1536,18 @@ func TestPendingPodsMetric(t *testing.T) { metrics.Register() total := 60 queueableNum := 50 - queueable := "queueable" + queueable, failme := "queueable", "failme" // First 50 Pods are queueable. - pInfos := makeQueuedPodInfos(queueableNum, queueable, timestamp) + pInfos := makeQueuedPodInfos(queueableNum, "x", queueable, timestamp) // The last 10 Pods are not queueable. - gated := makeQueuedPodInfos(total-queueableNum, "fail-me", timestamp) + gated := makeQueuedPodInfos(total-queueableNum, "y", failme, timestamp) // Manually mark them as gated=true. for _, pInfo := range gated { pInfo.Gated = true } pInfos = append(pInfos, gated...) totalWithDelay := 20 - pInfosWithDelay := makeQueuedPodInfos(totalWithDelay, queueable, timestamp.Add(2*time.Second)) + pInfosWithDelay := makeQueuedPodInfos(totalWithDelay, "z", queueable, timestamp.Add(2*time.Second)) tests := []struct { name string @@ -1656,6 +1666,54 @@ scheduler_pending_pods{queue="active"} 50 scheduler_pending_pods{queue="backoff"} 0 scheduler_pending_pods{queue="gated"} 0 scheduler_pending_pods{queue="unschedulable"} 0 +`, + }, + { + name: "add pods to activeQ/unschedulablePods and then delete some Pods", + operations: []operation{ + addPodActiveQ, + addPodUnschedulablePods, + deletePod, + deletePod, + deletePod, + }, + operands: [][]*framework.QueuedPodInfo{ + pInfos[:30], + pInfos[30:], + pInfos[:2], + pInfos[30:33], + pInfos[50:54], + }, + metricsName: "scheduler_pending_pods", + wants: ` +# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods that the scheduler attempted to schedule and failed; 'gated' is the number of unschedulable pods that the scheduler never attempted to schedule because they are gated. +# TYPE scheduler_pending_pods gauge +scheduler_pending_pods{queue="active"} 28 +scheduler_pending_pods{queue="backoff"} 0 +scheduler_pending_pods{queue="gated"} 6 +scheduler_pending_pods{queue="unschedulable"} 17 +`, + }, + { + name: "add pods to activeQ/unschedulablePods and then update some Pods as queueable", + operations: []operation{ + addPodActiveQ, + addPodUnschedulablePods, + updatePodQueueable, + }, + operands: [][]*framework.QueuedPodInfo{ + pInfos[:30], + pInfos[30:], + pInfos[50:55], + }, + metricsName: "scheduler_pending_pods", + wants: ` +# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods that the scheduler attempted to schedule and failed; 'gated' is the number of unschedulable pods that the scheduler never attempted to schedule because they are gated. +# TYPE scheduler_pending_pods gauge +scheduler_pending_pods{queue="active"} 35 +scheduler_pending_pods{queue="backoff"} 0 +scheduler_pending_pods{queue="gated"} 5 +scheduler_pending_pods{queue="unschedulable"} 20 `, }, } @@ -2094,11 +2152,12 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) { } } -func makeQueuedPodInfos(num int, namePrefix string, timestamp time.Time) []*framework.QueuedPodInfo { +func makeQueuedPodInfos(num int, namePrefix, label string, timestamp time.Time) []*framework.QueuedPodInfo { var pInfos = make([]*framework.QueuedPodInfo, 0, num) for i := 1; i <= num; i++ { p := &framework.QueuedPodInfo{ - PodInfo: mustNewPodInfo(st.MakePod().Name(fmt.Sprintf("%v-%d", namePrefix, i)).Namespace(fmt.Sprintf("ns%d", i)).UID(fmt.Sprintf("tp-%d", i)).Obj()), + PodInfo: mustNewPodInfo( + st.MakePod().Name(fmt.Sprintf("%v-%d", namePrefix, i)).Namespace(fmt.Sprintf("ns%d", i)).Label(label, "").UID(fmt.Sprintf("tp-%d", i)).Obj()), Timestamp: timestamp, UnschedulablePlugins: sets.NewString(), }