diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index 2b6e2417135..b50cc9422e7 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -31,6 +31,7 @@ import ( coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/scheduler/internal/queue" ) func (sched *Scheduler) onPvAdd(obj interface{}) { @@ -40,7 +41,7 @@ func (sched *Scheduler) onPvAdd(obj interface{}) { // provisioning and binding process, will not trigger events to schedule pod // again. So we need to move pods to active queue on PV add for this // scenario. - sched.SchedulingQueue.MoveAllToActiveQueue() + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvAdd) } func (sched *Scheduler) onPvUpdate(old, new interface{}) { @@ -48,15 +49,15 @@ func (sched *Scheduler) onPvUpdate(old, new interface{}) { // bindings due to conflicts if PVs are updated by PV controller or other // parties, then scheduler will add pod back to unschedulable queue. We // need to move pods to active queue on PV update for this scenario. - sched.SchedulingQueue.MoveAllToActiveQueue() + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvUpdate) } func (sched *Scheduler) onPvcAdd(obj interface{}) { - sched.SchedulingQueue.MoveAllToActiveQueue() + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvcAdd) } func (sched *Scheduler) onPvcUpdate(old, new interface{}) { - sched.SchedulingQueue.MoveAllToActiveQueue() + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvcUpdate) } func (sched *Scheduler) onStorageClassAdd(obj interface{}) { @@ -73,20 +74,20 @@ func (sched *Scheduler) onStorageClassAdd(obj interface{}) { // We don't need to invalidate cached results because results will not be // cached for pod that has unbound immediate PVCs. if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer { - sched.SchedulingQueue.MoveAllToActiveQueue() + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.StorageClassAdd) } } func (sched *Scheduler) onServiceAdd(obj interface{}) { - sched.SchedulingQueue.MoveAllToActiveQueue() + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceAdd) } func (sched *Scheduler) onServiceUpdate(oldObj interface{}, newObj interface{}) { - sched.SchedulingQueue.MoveAllToActiveQueue() + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceUpdate) } func (sched *Scheduler) onServiceDelete(obj interface{}) { - sched.SchedulingQueue.MoveAllToActiveQueue() + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceDelete) } func (sched *Scheduler) addNodeToCache(obj interface{}) { @@ -100,7 +101,7 @@ func (sched *Scheduler) addNodeToCache(obj interface{}) { klog.Errorf("scheduler cache AddNode failed: %v", err) } - sched.SchedulingQueue.MoveAllToActiveQueue() + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.NodeAdd) } func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) { @@ -124,8 +125,10 @@ func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) { // to save processing cycles. We still trigger a move to active queue to cover the case // that a pod being processed by the scheduler is determined unschedulable. We want this // pod to be reevaluated when a change in the cluster happens. - if sched.SchedulingQueue.NumUnschedulablePods() == 0 || nodeSchedulingPropertiesChanged(newNode, oldNode) { - sched.SchedulingQueue.MoveAllToActiveQueue() + if sched.SchedulingQueue.NumUnschedulablePods() == 0 { + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.Unknown) + } else if event := nodeSchedulingPropertiesChange(newNode, oldNode); event != "" { + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(event) } } @@ -166,7 +169,7 @@ func (sched *Scheduler) onCSINodeAdd(obj interface{}) { klog.Errorf("scheduler cache AddCSINode failed: %v", err) } - sched.SchedulingQueue.MoveAllToActiveQueue() + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeAdd) } func (sched *Scheduler) onCSINodeUpdate(oldObj, newObj interface{}) { @@ -186,7 +189,7 @@ func (sched *Scheduler) onCSINodeUpdate(oldObj, newObj interface{}) { klog.Errorf("scheduler cache UpdateCSINode failed: %v", err) } - sched.SchedulingQueue.MoveAllToActiveQueue() + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeUpdate) } func (sched *Scheduler) onCSINodeDelete(obj interface{}) { @@ -315,7 +318,7 @@ func (sched *Scheduler) deletePodFromCache(obj interface{}) { klog.Errorf("scheduler cache RemovePod failed: %v", err) } - sched.SchedulingQueue.MoveAllToActiveQueue() + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.AssignedPodDelete) } // assignedPod selects pods that are assigned (scheduled and running). @@ -488,24 +491,24 @@ func AddAllEventHandlers( ) } -func nodeSchedulingPropertiesChanged(newNode *v1.Node, oldNode *v1.Node) bool { +func nodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) string { if nodeSpecUnschedulableChanged(newNode, oldNode) { - return true + return queue.NodeSpecUnschedulableChange } if nodeAllocatableChanged(newNode, oldNode) { - return true + return queue.NodeAllocatableChange } if nodeLabelsChanged(newNode, oldNode) { - return true + return queue.NodeLabelChange } if nodeTaintsChanged(newNode, oldNode) { - return true + return queue.NodeTaintChange } if nodeConditionsChanged(newNode, oldNode) { - return true + return queue.NodeConditionChange } - return false + return "" } func nodeAllocatableChanged(newNode *v1.Node, oldNode *v1.Node) bool { diff --git a/pkg/scheduler/factory_test.go b/pkg/scheduler/factory_test.go index 1330926b2c2..f82eb454f12 100644 --- a/pkg/scheduler/factory_test.go +++ b/pkg/scheduler/factory_test.go @@ -293,7 +293,7 @@ func TestDefaultErrorFunc(t *testing.T) { queue.Delete(testPod) // Trigger a move request - queue.MoveAllToActiveQueue() + queue.MoveAllToActiveOrBackoffQueue("test") // Trigger error handling again to put the pod in backoff queue errFunc(testPodInfo, nil) diff --git a/pkg/scheduler/internal/queue/BUILD b/pkg/scheduler/internal/queue/BUILD index 282eee4d7c3..f1f016efaa6 100644 --- a/pkg/scheduler/internal/queue/BUILD +++ b/pkg/scheduler/internal/queue/BUILD @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ + "events.go", "pod_backoff.go", "scheduling_queue.go", ], diff --git a/pkg/scheduler/internal/queue/events.go b/pkg/scheduler/internal/queue/events.go new file mode 100644 index 00000000000..44440f7fb3a --- /dev/null +++ b/pkg/scheduler/internal/queue/events.go @@ -0,0 +1,72 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +// Events that trigger scheduler queue to change. +const ( + // Unknown event + Unknown = "Unknown" + // PodAdd is the event when a new pod is added to API server. + PodAdd = "PodAdd" + // NodeAdd is the event when a new node is added to the cluster. + NodeAdd = "NodeAdd" + // ScheduleAttemptFailure is the event when a schedule attempt fails. + ScheduleAttemptFailure = "ScheduleAttemptFailure" + // BackoffComplete is the event when a pod finishes backoff. + BackoffComplete = "BackoffComplete" + // UnschedulableTimeout is the event when a pod stays in unschedulable for longer than timeout. + UnschedulableTimeout = "UnschedulableTimeout" + // AssignedPodAdd is the event when a pod is added that causes pods with matching affinity terms + // to be more schedulable. + AssignedPodAdd = "AssignedPodAdd" + // AssignedPodUpdate is the event when a pod is updated that causes pods with matching affinity + // terms to be more schedulable. + AssignedPodUpdate = "AssignedPodUpdate" + // AssignedPodDelete is the event when a pod is deleted that causes pods with matching affinity + // terms to be more schedulable. + AssignedPodDelete = "AssignedPodDelete" + // PvAdd is the event when a persistent volume is added in the cluster. + PvAdd = "PvAdd" + // PvUpdate is the event when a persistent volume is updated in the cluster. + PvUpdate = "PvUpdate" + // PvcAdd is the event when a persistent volume claim is added in the cluster. + PvcAdd = "PvcAdd" + // PvcUpdate is the event when a persistent volume claim is updated in the cluster. + PvcUpdate = "PvcUpdate" + // StorageClassAdd is the event when a StorageClass is added in the cluster. + StorageClassAdd = "StorageClassAdd" + // ServiceAdd is the event when a service is added in the cluster. + ServiceAdd = "ServiceAdd" + // ServiceUpdate is the event when a service is updated in the cluster. + ServiceUpdate = "ServiceUpdate" + // ServiceDelete is the event when a service is deleted in the cluster. + ServiceDelete = "ServiceDelete" + // CSINodeAdd is the event when a CSI node is added in the cluster. + CSINodeAdd = "CSINodeAdd" + // CSINodeUpdate is the event when a CSI node is updated in the cluster. + CSINodeUpdate = "CSINodeUpdate" + // NodeSpecUnschedulableChange is the event when unschedulable node spec is changed. + NodeSpecUnschedulableChange = "NodeSpecUnschedulableChange" + // NodeAllocatableChange is the event when node allocatable is changed. + NodeAllocatableChange = "NodeAllocatableChange" + // NodeLabelsChange is the event when node label is changed. + NodeLabelChange = "NodeLabelChange" + // NodeTaintsChange is the event when node taint is changed. + NodeTaintChange = "NodeTaintChange" + // NodeConditionChange is the event when node condition is changed. + NodeConditionChange = "NodeConditionChange" +) diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index f8168b1dfba..6b3146c11ba 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -69,7 +69,6 @@ const ( // makes it easy to use those data structures as a SchedulingQueue. type SchedulingQueue interface { Add(pod *v1.Pod) error - AddIfNotPresent(pod *v1.Pod) error // AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue. // The podSchedulingCycle represents the current scheduling cycle number which can be // returned by calling SchedulingCycle(). @@ -83,7 +82,7 @@ type SchedulingQueue interface { Pop() (*framework.PodInfo, error) Update(oldPod, newPod *v1.Pod) error Delete(pod *v1.Pod) error - MoveAllToActiveQueue() + MoveAllToActiveOrBackoffQueue(event string) AssignedPodAdded(pod *v1.Pod) AssignedPodUpdated(pod *v1.Pod) NominatedPodsForNode(nodeName string) []*v1.Pod @@ -272,38 +271,13 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error { if err := p.podBackoffQ.Delete(pInfo); err == nil { klog.Errorf("Error: pod %v/%v is already in the podBackoff queue.", pod.Namespace, pod.Name) } + metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc() p.nominatedPods.add(pod, "") p.cond.Broadcast() return nil } -// AddIfNotPresent adds a pod to the active queue if it is not present in any of -// the queues. If it is present in any, it doesn't do any thing. -func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error { - p.lock.Lock() - defer p.lock.Unlock() - if p.unschedulableQ.get(pod) != nil { - return nil - } - - pInfo := p.newPodInfo(pod) - if _, exists, _ := p.activeQ.Get(pInfo); exists { - return nil - } - if _, exists, _ := p.podBackoffQ.Get(pInfo); exists { - return nil - } - err := p.activeQ.Add(pInfo) - if err != nil { - klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err) - } else { - p.nominatedPods.add(pod, "") - p.cond.Broadcast() - } - return err -} - // nsNameForPod returns a namespacedname for a pod func nsNameForPod(pod *v1.Pod) ktypes.NamespacedName { return ktypes.NamespacedName{ @@ -376,8 +350,10 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.PodInfo, p if err := p.podBackoffQ.Add(pInfo); err != nil { return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err) } + metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", ScheduleAttemptFailure).Inc() } else { p.unschedulableQ.addOrUpdate(pInfo) + metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc() } p.nominatedPods.add(pod, "") @@ -389,7 +365,6 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.PodInfo, p func (p *PriorityQueue) flushBackoffQCompleted() { p.lock.Lock() defer p.lock.Unlock() - for { rawPodInfo := p.podBackoffQ.Peek() if rawPodInfo == nil { @@ -401,6 +376,7 @@ func (p *PriorityQueue) flushBackoffQCompleted() { klog.Errorf("Unable to find backoff value for pod %v in backoffQ", nsNameForPod(pod)) p.podBackoffQ.Pop() p.activeQ.Add(rawPodInfo) + metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc() defer p.cond.Broadcast() continue } @@ -414,6 +390,7 @@ func (p *PriorityQueue) flushBackoffQCompleted() { return } p.activeQ.Add(rawPodInfo) + metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc() defer p.cond.Broadcast() } } @@ -434,7 +411,7 @@ func (p *PriorityQueue) flushUnschedulableQLeftover() { } if len(podsToMove) > 0 { - p.movePodsToActiveQueue(podsToMove) + p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout) } } @@ -550,7 +527,7 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error { // may make pending pods with matching affinity terms schedulable. func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod) { p.lock.Lock() - p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod)) + p.movePodsToActiveOrBackoffQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod), AssignedPodAdd) p.lock.Unlock() } @@ -558,60 +535,42 @@ func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod) { // may make pending pods with matching affinity terms schedulable. func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) { p.lock.Lock() - p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod)) + p.movePodsToActiveOrBackoffQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod), AssignedPodUpdate) p.lock.Unlock() } -// MoveAllToActiveQueue moves all pods from unschedulableQ to activeQ. This +// MoveAllToActiveOrBackoffQueue moves all pods from unschedulableQ to activeQ. This // function adds all pods and then signals the condition variable to ensure that // if Pop() is waiting for an item, it receives it after all the pods are in the // queue and the head is the highest priority pod. -func (p *PriorityQueue) MoveAllToActiveQueue() { +func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(event string) { p.lock.Lock() defer p.lock.Unlock() - - // There is a chance of errors when adding pods to other queues, - // we make a temporary slice to store the pods, - // since the probability is low, we set its len to 0 - addErrorPods := make([]*framework.PodInfo, 0) - + unschedulablePods := make([]*framework.PodInfo, 0, len(p.unschedulableQ.podInfoMap)) for _, pInfo := range p.unschedulableQ.podInfoMap { - pod := pInfo.Pod - if p.isPodBackingOff(pod) { - if err := p.podBackoffQ.Add(pInfo); err != nil { - klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err) - addErrorPods = append(addErrorPods, pInfo) - } - } else { - if err := p.activeQ.Add(pInfo); err != nil { - klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err) - addErrorPods = append(addErrorPods, pInfo) - } - } - } - p.unschedulableQ.clear() - // Adding pods that we could not move to Active queue or Backoff queue back to the Unschedulable queue - for _, podInfo := range addErrorPods { - p.unschedulableQ.addOrUpdate(podInfo) + unschedulablePods = append(unschedulablePods, pInfo) } + p.movePodsToActiveOrBackoffQueue(unschedulablePods, event) p.moveRequestCycle = p.schedulingCycle p.cond.Broadcast() } // NOTE: this function assumes lock has been acquired in caller -func (p *PriorityQueue) movePodsToActiveQueue(podInfoList []*framework.PodInfo) { +func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.PodInfo, event string) { for _, pInfo := range podInfoList { pod := pInfo.Pod if p.isPodBackingOff(pod) { if err := p.podBackoffQ.Add(pInfo); err != nil { klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err) } else { + metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc() p.unschedulableQ.delete(pod) } } else { if err := p.activeQ.Add(pInfo); err != nil { klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err) } else { + metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc() p.unschedulableQ.delete(pod) } } diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index bcbc5686109..0d63c09e050 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -39,6 +39,11 @@ import ( "k8s.io/kubernetes/pkg/scheduler/util" ) +const queueMetricMetadata = ` + # HELP scheduler_queue_incoming_pods_total [ALPHA] Number of pods added to scheduling queues by event and queue type. + # TYPE scheduler_queue_incoming_pods_total counter + ` + var lowPriority, midPriority, highPriority = int32(0), int32(100), int32(1000) var mediumPriority = (lowPriority + highPriority) / 2 var highPriorityPod, highPriNominatedPod, medPriorityPod, unschedulablePod = v1.Pod{ @@ -249,38 +254,6 @@ func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) { } } -func TestPriorityQueue_AddIfNotPresent(t *testing.T) { - q := NewPriorityQueue(nil, nil) - addOrUpdateUnschedulablePod(q, q.newPodInfo(&highPriNominatedPod)) - q.AddIfNotPresent(&highPriNominatedPod) // Must not add anything. - q.AddIfNotPresent(&medPriorityPod) - q.AddIfNotPresent(&unschedulablePod) - expectedNominatedPods := &nominatedPodMap{ - nominatedPodToNode: map[types.UID]string{ - medPriorityPod.UID: "node1", - unschedulablePod.UID: "node1", - }, - nominatedPods: map[string][]*v1.Pod{ - "node1": {&medPriorityPod, &unschedulablePod}, - }, - } - if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) { - t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods) - } - if p, err := q.Pop(); err != nil || p.Pod != &medPriorityPod { - t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Pod.Name) - } - if p, err := q.Pop(); err != nil || p.Pod != &unschedulablePod { - t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Pod.Name) - } - if len(q.nominatedPods.nominatedPods["node1"]) != 2 { - t.Errorf("Expected medPriorityPod and unschedulablePod to be still present in nomindatePods: %v", q.nominatedPods.nominatedPods["node1"]) - } - if getUnschedulablePod(q, &highPriNominatedPod) != &highPriNominatedPod { - t.Errorf("Pod %v was not found in the unschedulableQ.", highPriNominatedPod.Name) - } -} - func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { q := NewPriorityQueue(nil, nil) q.Add(&highPriNominatedPod) @@ -343,7 +316,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) { } // move all pods to active queue when we were trying to schedule them - q.MoveAllToActiveQueue() + q.MoveAllToActiveOrBackoffQueue("test") oldCycle := q.SchedulingCycle() firstPod, _ := q.Pop() @@ -462,7 +435,7 @@ func TestPriorityQueue_MoveAllToActiveQueue(t *testing.T) { q.Add(&medPriorityPod) addOrUpdateUnschedulablePod(q, q.newPodInfo(&unschedulablePod)) addOrUpdateUnschedulablePod(q, q.newPodInfo(&highPriorityPod)) - q.MoveAllToActiveQueue() + q.MoveAllToActiveOrBackoffQueue("test") if q.activeQ.Len() != 3 { t.Error("Expected all items to be in activeQ.") } @@ -559,7 +532,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) { t.Error("Unexpected list of pending Pods.") } // Move all to active queue. We should still see the same set of pods. - q.MoveAllToActiveQueue() + q.MoveAllToActiveOrBackoffQueue("test") if !reflect.DeepEqual(expectedSet, makeSet(q.PendingPods())) { t.Error("Unexpected list of pending Pods...") } @@ -873,7 +846,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) { // Put in the unschedulable queue. q.AddUnschedulableIfNotPresent(p1, q.SchedulingCycle()) // Move all unschedulable pods to the active queue. - q.MoveAllToActiveQueue() + q.MoveAllToActiveOrBackoffQueue("test") // Simulation is over. Now let's pop all pods. The pod popped first should be // the last one we pop here. for i := 0; i < 5; i++ { @@ -924,7 +897,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { // Clear its backoff to simulate backoff its expiration q.clearPodBackoff(&unschedulablePod) // Move all unschedulable pods to the active queue. - q.MoveAllToActiveQueue() + q.MoveAllToActiveOrBackoffQueue("test") // Simulate a pod being popped by the scheduler, // At this time, unschedulable pod should be popped. @@ -967,7 +940,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { // Clear its backoff to simulate its backoff expiration q.clearPodBackoff(&unschedulablePod) // Move all unschedulable pods to the active queue. - q.MoveAllToActiveQueue() + q.MoveAllToActiveOrBackoffQueue("test") // At this time, newerPod should be popped // because it is the oldest tried pod. @@ -1032,7 +1005,7 @@ func TestHighPriorityBackoff(t *testing.T) { // Put in the unschedulable queue. q.AddUnschedulableIfNotPresent(p, q.SchedulingCycle()) // Move all unschedulable pods to the active queue. - q.MoveAllToActiveQueue() + q.MoveAllToActiveOrBackoffQueue("test") p, err = q.Pop() if err != nil { @@ -1108,6 +1081,15 @@ func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) { type operation func(queue *PriorityQueue, pInfo *framework.PodInfo) var ( + add = func(queue *PriorityQueue, pInfo *framework.PodInfo) { + queue.Add(pInfo.Pod) + } + addUnschedulablePodBackToUnschedulableQ = func(queue *PriorityQueue, pInfo *framework.PodInfo) { + queue.AddUnschedulableIfNotPresent(pInfo, 0) + } + addUnschedulablePodBackToBackoffQ = func(queue *PriorityQueue, pInfo *framework.PodInfo) { + queue.AddUnschedulableIfNotPresent(pInfo, -1) + } addPodActiveQ = func(queue *PriorityQueue, pInfo *framework.PodInfo) { queue.lock.Lock() queue.activeQ.Add(pInfo) @@ -1135,8 +1117,8 @@ var ( queue.podBackoffQ.Add(pInfo) queue.lock.Unlock() } - moveAllToActiveQ = func(queue *PriorityQueue, _ *framework.PodInfo) { - queue.MoveAllToActiveQueue() + moveAllToActiveOrBackoffQ = func(queue *PriorityQueue, _ *framework.PodInfo) { + queue.MoveAllToActiveOrBackoffQueue("test") } backoffPod = func(queue *PriorityQueue, pInfo *framework.PodInfo) { queue.backoffPod(pInfo.Pod) @@ -1145,6 +1127,9 @@ var ( queue.clock.(*clock.FakeClock).Step(2 * time.Second) queue.flushBackoffQCompleted() } + moveClockForward = func(queue *PriorityQueue, _ *framework.PodInfo) { + queue.clock.(*clock.FakeClock).Step(2 * time.Second) + } ) // TestPodTimestamp tests the operations related to PodInfo. @@ -1210,7 +1195,7 @@ func TestPodTimestamp(t *testing.T) { operations: []operation{ addPodUnschedulableQ, addPodUnschedulableQ, - moveAllToActiveQ, + moveAllToActiveOrBackoffQ, }, operands: []*framework.PodInfo{pInfo2, pInfo1, nil}, expected: []*framework.PodInfo{pInfo1, pInfo2}, @@ -1222,7 +1207,7 @@ func TestPodTimestamp(t *testing.T) { addPodBackoffQ, backoffPod, flushBackoffQ, - moveAllToActiveQ, + moveAllToActiveOrBackoffQ, }, operands: []*framework.PodInfo{pInfo2, pInfo1, pInfo1, nil, nil}, expected: []*framework.PodInfo{pInfo1, pInfo2}, @@ -1326,7 +1311,7 @@ scheduler_pending_pods{queue="unschedulable"} 10 name: "add pods to unschedulableQ and then move all to activeQ", operations: []operation{ addPodUnschedulableQ, - moveAllToActiveQ, + moveAllToActiveOrBackoffQ, }, operands: [][]*framework.PodInfo{ pInfos[:total], @@ -1346,7 +1331,7 @@ scheduler_pending_pods{queue="unschedulable"} 0 operations: []operation{ backoffPod, addPodUnschedulableQ, - moveAllToActiveQ, + moveAllToActiveOrBackoffQ, }, operands: [][]*framework.PodInfo{ pInfos[:20], @@ -1368,7 +1353,7 @@ scheduler_pending_pods{queue="unschedulable"} 0 backoffPod, addPodUnschedulableQ, addPodActiveQ, - moveAllToActiveQ, + moveAllToActiveOrBackoffQ, flushBackoffQ, }, operands: [][]*framework.PodInfo{ @@ -1479,6 +1464,100 @@ func TestPerPodSchedulingMetrics(t *testing.T) { checkPerPodSchedulingMetrics("Attempt twice with update", t, pInfo, 2, timestamp) } +func TestIncomingPodsMetrics(t *testing.T) { + timestamp := time.Now() + metrics.Register() + var pInfos = make([]*framework.PodInfo, 0, 3) + for i := 1; i <= 3; i++ { + p := &framework.PodInfo{ + Pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-pod-%d", i), + Namespace: fmt.Sprintf("ns%d", i), + UID: types.UID(fmt.Sprintf("tp-%d", i)), + }, + }, + Timestamp: timestamp, + } + pInfos = append(pInfos, p) + } + tests := []struct { + name string + operations []operation + want string + }{ + { + name: "add pods to activeQ", + operations: []operation{ + add, + }, + want: ` + scheduler_queue_incoming_pods_total{event="PodAdd",queue="active"} 3 +`, + }, + { + name: "add pods to unschedulableQ", + operations: []operation{ + addUnschedulablePodBackToUnschedulableQ, + }, + want: ` + scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="unschedulable"} 3 +`, + }, + { + name: "add pods to unschedulableQ and then move all to backoffQ", + operations: []operation{ + addUnschedulablePodBackToUnschedulableQ, + moveAllToActiveOrBackoffQ, + }, + want: ` scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="unschedulable"} 3 + scheduler_queue_incoming_pods_total{event="test",queue="backoff"} 3 +`, + }, + { + name: "add pods to unschedulableQ and then move all to activeQ", + operations: []operation{ + addUnschedulablePodBackToUnschedulableQ, + moveClockForward, + moveAllToActiveOrBackoffQ, + }, + want: ` scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="unschedulable"} 3 + scheduler_queue_incoming_pods_total{event="test",queue="active"} 3 +`, + }, + { + name: "make some pods subject to backoff and add them to backoffQ, then flush backoffQ", + operations: []operation{ + addUnschedulablePodBackToBackoffQ, + moveClockForward, + flushBackoffQ, + }, + want: ` scheduler_queue_incoming_pods_total{event="BackoffComplete",queue="active"} 3 + scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="backoff"} 3 +`, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + metrics.SchedulerQueueIncomingPods.Reset() + stop := make(chan struct{}) + close(stop) // Stop the periodic flush + queue := NewPriorityQueue(stop, nil, WithClock(clock.NewFakeClock(timestamp))) + for _, op := range test.operations { + for _, pInfo := range pInfos { + op(queue, pInfo) + } + } + metricName := metrics.SchedulerSubsystem + "_" + metrics.SchedulerQueueIncomingPods.Name + if err := testutil.CollectAndCompare(metrics.SchedulerQueueIncomingPods, strings.NewReader(queueMetricMetadata+test.want), metricName); err != nil { + t.Errorf("unexpected collecting result:\n%s", err) + } + + }) + } +} + func checkPerPodSchedulingMetrics(name string, t *testing.T, pInfo *framework.PodInfo, wantAttemtps int, wantInitialAttemptTs time.Time) { if pInfo.Attempts != wantAttemtps { t.Errorf("[%s] Pod schedule attempt unexpected, got %v, want %v", name, pInfo.Attempts, wantAttemtps) diff --git a/pkg/scheduler/metrics/metrics.go b/pkg/scheduler/metrics/metrics.go index f3f13c9f9bf..43c2738ee69 100644 --- a/pkg/scheduler/metrics/metrics.go +++ b/pkg/scheduler/metrics/metrics.go @@ -248,6 +248,14 @@ var ( }, []string{"extension_point", "status"}) + SchedulerQueueIncomingPods = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: SchedulerSubsystem, + Name: "queue_incoming_pods_total", + Help: "Number of pods added to scheduling queues by event and queue type.", + StabilityLevel: metrics.ALPHA, + }, []string{"queue", "event"}) + metricsList = []metrics.Registerable{ scheduleAttempts, SchedulingLatency, @@ -270,6 +278,7 @@ var ( PodSchedulingDuration, PodSchedulingAttempts, FrameworkExtensionPointDuration, + SchedulerQueueIncomingPods, } )