Merge pull request #83577 from liu-cong/queue-metrics

Add incoming pod metrics to scheduler queue.
This commit is contained in:
Kubernetes Prow Robot 2019-10-16 17:34:39 -07:00 committed by GitHub
commit f7091992c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 249 additions and 126 deletions

View File

@ -31,6 +31,7 @@ import (
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/internal/queue"
)
func (sched *Scheduler) onPvAdd(obj interface{}) {
@ -40,7 +41,7 @@ func (sched *Scheduler) onPvAdd(obj interface{}) {
// provisioning and binding process, will not trigger events to schedule pod
// again. So we need to move pods to active queue on PV add for this
// scenario.
sched.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvAdd)
}
func (sched *Scheduler) onPvUpdate(old, new interface{}) {
@ -48,15 +49,15 @@ func (sched *Scheduler) onPvUpdate(old, new interface{}) {
// bindings due to conflicts if PVs are updated by PV controller or other
// parties, then scheduler will add pod back to unschedulable queue. We
// need to move pods to active queue on PV update for this scenario.
sched.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvUpdate)
}
func (sched *Scheduler) onPvcAdd(obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvcAdd)
}
func (sched *Scheduler) onPvcUpdate(old, new interface{}) {
sched.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvcUpdate)
}
func (sched *Scheduler) onStorageClassAdd(obj interface{}) {
@ -73,20 +74,20 @@ func (sched *Scheduler) onStorageClassAdd(obj interface{}) {
// We don't need to invalidate cached results because results will not be
// cached for pod that has unbound immediate PVCs.
if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
sched.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.StorageClassAdd)
}
}
func (sched *Scheduler) onServiceAdd(obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceAdd)
}
func (sched *Scheduler) onServiceUpdate(oldObj interface{}, newObj interface{}) {
sched.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceUpdate)
}
func (sched *Scheduler) onServiceDelete(obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceDelete)
}
func (sched *Scheduler) addNodeToCache(obj interface{}) {
@ -100,7 +101,7 @@ func (sched *Scheduler) addNodeToCache(obj interface{}) {
klog.Errorf("scheduler cache AddNode failed: %v", err)
}
sched.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.NodeAdd)
}
func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
@ -124,8 +125,10 @@ func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
// to save processing cycles. We still trigger a move to active queue to cover the case
// that a pod being processed by the scheduler is determined unschedulable. We want this
// pod to be reevaluated when a change in the cluster happens.
if sched.SchedulingQueue.NumUnschedulablePods() == 0 || nodeSchedulingPropertiesChanged(newNode, oldNode) {
sched.SchedulingQueue.MoveAllToActiveQueue()
if sched.SchedulingQueue.NumUnschedulablePods() == 0 {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.Unknown)
} else if event := nodeSchedulingPropertiesChange(newNode, oldNode); event != "" {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(event)
}
}
@ -166,7 +169,7 @@ func (sched *Scheduler) onCSINodeAdd(obj interface{}) {
klog.Errorf("scheduler cache AddCSINode failed: %v", err)
}
sched.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeAdd)
}
func (sched *Scheduler) onCSINodeUpdate(oldObj, newObj interface{}) {
@ -186,7 +189,7 @@ func (sched *Scheduler) onCSINodeUpdate(oldObj, newObj interface{}) {
klog.Errorf("scheduler cache UpdateCSINode failed: %v", err)
}
sched.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeUpdate)
}
func (sched *Scheduler) onCSINodeDelete(obj interface{}) {
@ -315,7 +318,7 @@ func (sched *Scheduler) deletePodFromCache(obj interface{}) {
klog.Errorf("scheduler cache RemovePod failed: %v", err)
}
sched.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.AssignedPodDelete)
}
// assignedPod selects pods that are assigned (scheduled and running).
@ -488,24 +491,24 @@ func AddAllEventHandlers(
)
}
func nodeSchedulingPropertiesChanged(newNode *v1.Node, oldNode *v1.Node) bool {
func nodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) string {
if nodeSpecUnschedulableChanged(newNode, oldNode) {
return true
return queue.NodeSpecUnschedulableChange
}
if nodeAllocatableChanged(newNode, oldNode) {
return true
return queue.NodeAllocatableChange
}
if nodeLabelsChanged(newNode, oldNode) {
return true
return queue.NodeLabelChange
}
if nodeTaintsChanged(newNode, oldNode) {
return true
return queue.NodeTaintChange
}
if nodeConditionsChanged(newNode, oldNode) {
return true
return queue.NodeConditionChange
}
return false
return ""
}
func nodeAllocatableChanged(newNode *v1.Node, oldNode *v1.Node) bool {

View File

@ -293,7 +293,7 @@ func TestDefaultErrorFunc(t *testing.T) {
queue.Delete(testPod)
// Trigger a move request
queue.MoveAllToActiveQueue()
queue.MoveAllToActiveOrBackoffQueue("test")
// Trigger error handling again to put the pod in backoff queue
errFunc(testPodInfo, nil)

View File

@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"events.go",
"pod_backoff.go",
"scheduling_queue.go",
],

View File

@ -0,0 +1,72 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue
// Events that trigger scheduler queue to change.
const (
// Unknown event
Unknown = "Unknown"
// PodAdd is the event when a new pod is added to API server.
PodAdd = "PodAdd"
// NodeAdd is the event when a new node is added to the cluster.
NodeAdd = "NodeAdd"
// ScheduleAttemptFailure is the event when a schedule attempt fails.
ScheduleAttemptFailure = "ScheduleAttemptFailure"
// BackoffComplete is the event when a pod finishes backoff.
BackoffComplete = "BackoffComplete"
// UnschedulableTimeout is the event when a pod stays in unschedulable for longer than timeout.
UnschedulableTimeout = "UnschedulableTimeout"
// AssignedPodAdd is the event when a pod is added that causes pods with matching affinity terms
// to be more schedulable.
AssignedPodAdd = "AssignedPodAdd"
// AssignedPodUpdate is the event when a pod is updated that causes pods with matching affinity
// terms to be more schedulable.
AssignedPodUpdate = "AssignedPodUpdate"
// AssignedPodDelete is the event when a pod is deleted that causes pods with matching affinity
// terms to be more schedulable.
AssignedPodDelete = "AssignedPodDelete"
// PvAdd is the event when a persistent volume is added in the cluster.
PvAdd = "PvAdd"
// PvUpdate is the event when a persistent volume is updated in the cluster.
PvUpdate = "PvUpdate"
// PvcAdd is the event when a persistent volume claim is added in the cluster.
PvcAdd = "PvcAdd"
// PvcUpdate is the event when a persistent volume claim is updated in the cluster.
PvcUpdate = "PvcUpdate"
// StorageClassAdd is the event when a StorageClass is added in the cluster.
StorageClassAdd = "StorageClassAdd"
// ServiceAdd is the event when a service is added in the cluster.
ServiceAdd = "ServiceAdd"
// ServiceUpdate is the event when a service is updated in the cluster.
ServiceUpdate = "ServiceUpdate"
// ServiceDelete is the event when a service is deleted in the cluster.
ServiceDelete = "ServiceDelete"
// CSINodeAdd is the event when a CSI node is added in the cluster.
CSINodeAdd = "CSINodeAdd"
// CSINodeUpdate is the event when a CSI node is updated in the cluster.
CSINodeUpdate = "CSINodeUpdate"
// NodeSpecUnschedulableChange is the event when unschedulable node spec is changed.
NodeSpecUnschedulableChange = "NodeSpecUnschedulableChange"
// NodeAllocatableChange is the event when node allocatable is changed.
NodeAllocatableChange = "NodeAllocatableChange"
// NodeLabelsChange is the event when node label is changed.
NodeLabelChange = "NodeLabelChange"
// NodeTaintsChange is the event when node taint is changed.
NodeTaintChange = "NodeTaintChange"
// NodeConditionChange is the event when node condition is changed.
NodeConditionChange = "NodeConditionChange"
)

View File

@ -69,7 +69,6 @@ const (
// makes it easy to use those data structures as a SchedulingQueue.
type SchedulingQueue interface {
Add(pod *v1.Pod) error
AddIfNotPresent(pod *v1.Pod) error
// AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue.
// The podSchedulingCycle represents the current scheduling cycle number which can be
// returned by calling SchedulingCycle().
@ -83,7 +82,7 @@ type SchedulingQueue interface {
Pop() (*framework.PodInfo, error)
Update(oldPod, newPod *v1.Pod) error
Delete(pod *v1.Pod) error
MoveAllToActiveQueue()
MoveAllToActiveOrBackoffQueue(event string)
AssignedPodAdded(pod *v1.Pod)
AssignedPodUpdated(pod *v1.Pod)
NominatedPodsForNode(nodeName string) []*v1.Pod
@ -272,38 +271,13 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error {
if err := p.podBackoffQ.Delete(pInfo); err == nil {
klog.Errorf("Error: pod %v/%v is already in the podBackoff queue.", pod.Namespace, pod.Name)
}
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc()
p.nominatedPods.add(pod, "")
p.cond.Broadcast()
return nil
}
// AddIfNotPresent adds a pod to the active queue if it is not present in any of
// the queues. If it is present in any, it doesn't do any thing.
func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
if p.unschedulableQ.get(pod) != nil {
return nil
}
pInfo := p.newPodInfo(pod)
if _, exists, _ := p.activeQ.Get(pInfo); exists {
return nil
}
if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
return nil
}
err := p.activeQ.Add(pInfo)
if err != nil {
klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
} else {
p.nominatedPods.add(pod, "")
p.cond.Broadcast()
}
return err
}
// nsNameForPod returns a namespacedname for a pod
func nsNameForPod(pod *v1.Pod) ktypes.NamespacedName {
return ktypes.NamespacedName{
@ -376,8 +350,10 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.PodInfo, p
if err := p.podBackoffQ.Add(pInfo); err != nil {
return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
}
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", ScheduleAttemptFailure).Inc()
} else {
p.unschedulableQ.addOrUpdate(pInfo)
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()
}
p.nominatedPods.add(pod, "")
@ -389,7 +365,6 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.PodInfo, p
func (p *PriorityQueue) flushBackoffQCompleted() {
p.lock.Lock()
defer p.lock.Unlock()
for {
rawPodInfo := p.podBackoffQ.Peek()
if rawPodInfo == nil {
@ -401,6 +376,7 @@ func (p *PriorityQueue) flushBackoffQCompleted() {
klog.Errorf("Unable to find backoff value for pod %v in backoffQ", nsNameForPod(pod))
p.podBackoffQ.Pop()
p.activeQ.Add(rawPodInfo)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc()
defer p.cond.Broadcast()
continue
}
@ -414,6 +390,7 @@ func (p *PriorityQueue) flushBackoffQCompleted() {
return
}
p.activeQ.Add(rawPodInfo)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc()
defer p.cond.Broadcast()
}
}
@ -434,7 +411,7 @@ func (p *PriorityQueue) flushUnschedulableQLeftover() {
}
if len(podsToMove) > 0 {
p.movePodsToActiveQueue(podsToMove)
p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout)
}
}
@ -550,7 +527,7 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error {
// may make pending pods with matching affinity terms schedulable.
func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod) {
p.lock.Lock()
p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod))
p.movePodsToActiveOrBackoffQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod), AssignedPodAdd)
p.lock.Unlock()
}
@ -558,60 +535,42 @@ func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod) {
// may make pending pods with matching affinity terms schedulable.
func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) {
p.lock.Lock()
p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod))
p.movePodsToActiveOrBackoffQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod), AssignedPodUpdate)
p.lock.Unlock()
}
// MoveAllToActiveQueue moves all pods from unschedulableQ to activeQ. This
// MoveAllToActiveOrBackoffQueue moves all pods from unschedulableQ to activeQ. This
// function adds all pods and then signals the condition variable to ensure that
// if Pop() is waiting for an item, it receives it after all the pods are in the
// queue and the head is the highest priority pod.
func (p *PriorityQueue) MoveAllToActiveQueue() {
func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(event string) {
p.lock.Lock()
defer p.lock.Unlock()
// There is a chance of errors when adding pods to other queues,
// we make a temporary slice to store the pods,
// since the probability is low, we set its len to 0
addErrorPods := make([]*framework.PodInfo, 0)
unschedulablePods := make([]*framework.PodInfo, 0, len(p.unschedulableQ.podInfoMap))
for _, pInfo := range p.unschedulableQ.podInfoMap {
pod := pInfo.Pod
if p.isPodBackingOff(pod) {
if err := p.podBackoffQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
addErrorPods = append(addErrorPods, pInfo)
}
} else {
if err := p.activeQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
addErrorPods = append(addErrorPods, pInfo)
}
}
}
p.unschedulableQ.clear()
// Adding pods that we could not move to Active queue or Backoff queue back to the Unschedulable queue
for _, podInfo := range addErrorPods {
p.unschedulableQ.addOrUpdate(podInfo)
unschedulablePods = append(unschedulablePods, pInfo)
}
p.movePodsToActiveOrBackoffQueue(unschedulablePods, event)
p.moveRequestCycle = p.schedulingCycle
p.cond.Broadcast()
}
// NOTE: this function assumes lock has been acquired in caller
func (p *PriorityQueue) movePodsToActiveQueue(podInfoList []*framework.PodInfo) {
func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.PodInfo, event string) {
for _, pInfo := range podInfoList {
pod := pInfo.Pod
if p.isPodBackingOff(pod) {
if err := p.podBackoffQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
} else {
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc()
p.unschedulableQ.delete(pod)
}
} else {
if err := p.activeQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
} else {
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc()
p.unschedulableQ.delete(pod)
}
}

View File

@ -39,6 +39,11 @@ import (
"k8s.io/kubernetes/pkg/scheduler/util"
)
const queueMetricMetadata = `
# HELP scheduler_queue_incoming_pods_total [ALPHA] Number of pods added to scheduling queues by event and queue type.
# TYPE scheduler_queue_incoming_pods_total counter
`
var lowPriority, midPriority, highPriority = int32(0), int32(100), int32(1000)
var mediumPriority = (lowPriority + highPriority) / 2
var highPriorityPod, highPriNominatedPod, medPriorityPod, unschedulablePod = v1.Pod{
@ -249,38 +254,6 @@ func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) {
}
}
func TestPriorityQueue_AddIfNotPresent(t *testing.T) {
q := NewPriorityQueue(nil, nil)
addOrUpdateUnschedulablePod(q, q.newPodInfo(&highPriNominatedPod))
q.AddIfNotPresent(&highPriNominatedPod) // Must not add anything.
q.AddIfNotPresent(&medPriorityPod)
q.AddIfNotPresent(&unschedulablePod)
expectedNominatedPods := &nominatedPodMap{
nominatedPodToNode: map[types.UID]string{
medPriorityPod.UID: "node1",
unschedulablePod.UID: "node1",
},
nominatedPods: map[string][]*v1.Pod{
"node1": {&medPriorityPod, &unschedulablePod},
},
}
if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) {
t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods)
}
if p, err := q.Pop(); err != nil || p.Pod != &medPriorityPod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Pod.Name)
}
if p, err := q.Pop(); err != nil || p.Pod != &unschedulablePod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Pod.Name)
}
if len(q.nominatedPods.nominatedPods["node1"]) != 2 {
t.Errorf("Expected medPriorityPod and unschedulablePod to be still present in nomindatePods: %v", q.nominatedPods.nominatedPods["node1"])
}
if getUnschedulablePod(q, &highPriNominatedPod) != &highPriNominatedPod {
t.Errorf("Pod %v was not found in the unschedulableQ.", highPriNominatedPod.Name)
}
}
func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
q := NewPriorityQueue(nil, nil)
q.Add(&highPriNominatedPod)
@ -343,7 +316,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
}
// move all pods to active queue when we were trying to schedule them
q.MoveAllToActiveQueue()
q.MoveAllToActiveOrBackoffQueue("test")
oldCycle := q.SchedulingCycle()
firstPod, _ := q.Pop()
@ -462,7 +435,7 @@ func TestPriorityQueue_MoveAllToActiveQueue(t *testing.T) {
q.Add(&medPriorityPod)
addOrUpdateUnschedulablePod(q, q.newPodInfo(&unschedulablePod))
addOrUpdateUnschedulablePod(q, q.newPodInfo(&highPriorityPod))
q.MoveAllToActiveQueue()
q.MoveAllToActiveOrBackoffQueue("test")
if q.activeQ.Len() != 3 {
t.Error("Expected all items to be in activeQ.")
}
@ -559,7 +532,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
t.Error("Unexpected list of pending Pods.")
}
// Move all to active queue. We should still see the same set of pods.
q.MoveAllToActiveQueue()
q.MoveAllToActiveOrBackoffQueue("test")
if !reflect.DeepEqual(expectedSet, makeSet(q.PendingPods())) {
t.Error("Unexpected list of pending Pods...")
}
@ -873,7 +846,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) {
// Put in the unschedulable queue.
q.AddUnschedulableIfNotPresent(p1, q.SchedulingCycle())
// Move all unschedulable pods to the active queue.
q.MoveAllToActiveQueue()
q.MoveAllToActiveOrBackoffQueue("test")
// Simulation is over. Now let's pop all pods. The pod popped first should be
// the last one we pop here.
for i := 0; i < 5; i++ {
@ -924,7 +897,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
// Clear its backoff to simulate backoff its expiration
q.clearPodBackoff(&unschedulablePod)
// Move all unschedulable pods to the active queue.
q.MoveAllToActiveQueue()
q.MoveAllToActiveOrBackoffQueue("test")
// Simulate a pod being popped by the scheduler,
// At this time, unschedulable pod should be popped.
@ -967,7 +940,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
// Clear its backoff to simulate its backoff expiration
q.clearPodBackoff(&unschedulablePod)
// Move all unschedulable pods to the active queue.
q.MoveAllToActiveQueue()
q.MoveAllToActiveOrBackoffQueue("test")
// At this time, newerPod should be popped
// because it is the oldest tried pod.
@ -1032,7 +1005,7 @@ func TestHighPriorityBackoff(t *testing.T) {
// Put in the unschedulable queue.
q.AddUnschedulableIfNotPresent(p, q.SchedulingCycle())
// Move all unschedulable pods to the active queue.
q.MoveAllToActiveQueue()
q.MoveAllToActiveOrBackoffQueue("test")
p, err = q.Pop()
if err != nil {
@ -1108,6 +1081,15 @@ func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) {
type operation func(queue *PriorityQueue, pInfo *framework.PodInfo)
var (
add = func(queue *PriorityQueue, pInfo *framework.PodInfo) {
queue.Add(pInfo.Pod)
}
addUnschedulablePodBackToUnschedulableQ = func(queue *PriorityQueue, pInfo *framework.PodInfo) {
queue.AddUnschedulableIfNotPresent(pInfo, 0)
}
addUnschedulablePodBackToBackoffQ = func(queue *PriorityQueue, pInfo *framework.PodInfo) {
queue.AddUnschedulableIfNotPresent(pInfo, -1)
}
addPodActiveQ = func(queue *PriorityQueue, pInfo *framework.PodInfo) {
queue.lock.Lock()
queue.activeQ.Add(pInfo)
@ -1135,8 +1117,8 @@ var (
queue.podBackoffQ.Add(pInfo)
queue.lock.Unlock()
}
moveAllToActiveQ = func(queue *PriorityQueue, _ *framework.PodInfo) {
queue.MoveAllToActiveQueue()
moveAllToActiveOrBackoffQ = func(queue *PriorityQueue, _ *framework.PodInfo) {
queue.MoveAllToActiveOrBackoffQueue("test")
}
backoffPod = func(queue *PriorityQueue, pInfo *framework.PodInfo) {
queue.backoffPod(pInfo.Pod)
@ -1145,6 +1127,9 @@ var (
queue.clock.(*clock.FakeClock).Step(2 * time.Second)
queue.flushBackoffQCompleted()
}
moveClockForward = func(queue *PriorityQueue, _ *framework.PodInfo) {
queue.clock.(*clock.FakeClock).Step(2 * time.Second)
}
)
// TestPodTimestamp tests the operations related to PodInfo.
@ -1210,7 +1195,7 @@ func TestPodTimestamp(t *testing.T) {
operations: []operation{
addPodUnschedulableQ,
addPodUnschedulableQ,
moveAllToActiveQ,
moveAllToActiveOrBackoffQ,
},
operands: []*framework.PodInfo{pInfo2, pInfo1, nil},
expected: []*framework.PodInfo{pInfo1, pInfo2},
@ -1222,7 +1207,7 @@ func TestPodTimestamp(t *testing.T) {
addPodBackoffQ,
backoffPod,
flushBackoffQ,
moveAllToActiveQ,
moveAllToActiveOrBackoffQ,
},
operands: []*framework.PodInfo{pInfo2, pInfo1, pInfo1, nil, nil},
expected: []*framework.PodInfo{pInfo1, pInfo2},
@ -1326,7 +1311,7 @@ scheduler_pending_pods{queue="unschedulable"} 10
name: "add pods to unschedulableQ and then move all to activeQ",
operations: []operation{
addPodUnschedulableQ,
moveAllToActiveQ,
moveAllToActiveOrBackoffQ,
},
operands: [][]*framework.PodInfo{
pInfos[:total],
@ -1346,7 +1331,7 @@ scheduler_pending_pods{queue="unschedulable"} 0
operations: []operation{
backoffPod,
addPodUnschedulableQ,
moveAllToActiveQ,
moveAllToActiveOrBackoffQ,
},
operands: [][]*framework.PodInfo{
pInfos[:20],
@ -1368,7 +1353,7 @@ scheduler_pending_pods{queue="unschedulable"} 0
backoffPod,
addPodUnschedulableQ,
addPodActiveQ,
moveAllToActiveQ,
moveAllToActiveOrBackoffQ,
flushBackoffQ,
},
operands: [][]*framework.PodInfo{
@ -1479,6 +1464,100 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
checkPerPodSchedulingMetrics("Attempt twice with update", t, pInfo, 2, timestamp)
}
func TestIncomingPodsMetrics(t *testing.T) {
timestamp := time.Now()
metrics.Register()
var pInfos = make([]*framework.PodInfo, 0, 3)
for i := 1; i <= 3; i++ {
p := &framework.PodInfo{
Pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("test-pod-%d", i),
Namespace: fmt.Sprintf("ns%d", i),
UID: types.UID(fmt.Sprintf("tp-%d", i)),
},
},
Timestamp: timestamp,
}
pInfos = append(pInfos, p)
}
tests := []struct {
name string
operations []operation
want string
}{
{
name: "add pods to activeQ",
operations: []operation{
add,
},
want: `
scheduler_queue_incoming_pods_total{event="PodAdd",queue="active"} 3
`,
},
{
name: "add pods to unschedulableQ",
operations: []operation{
addUnschedulablePodBackToUnschedulableQ,
},
want: `
scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="unschedulable"} 3
`,
},
{
name: "add pods to unschedulableQ and then move all to backoffQ",
operations: []operation{
addUnschedulablePodBackToUnschedulableQ,
moveAllToActiveOrBackoffQ,
},
want: ` scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="unschedulable"} 3
scheduler_queue_incoming_pods_total{event="test",queue="backoff"} 3
`,
},
{
name: "add pods to unschedulableQ and then move all to activeQ",
operations: []operation{
addUnschedulablePodBackToUnschedulableQ,
moveClockForward,
moveAllToActiveOrBackoffQ,
},
want: ` scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="unschedulable"} 3
scheduler_queue_incoming_pods_total{event="test",queue="active"} 3
`,
},
{
name: "make some pods subject to backoff and add them to backoffQ, then flush backoffQ",
operations: []operation{
addUnschedulablePodBackToBackoffQ,
moveClockForward,
flushBackoffQ,
},
want: ` scheduler_queue_incoming_pods_total{event="BackoffComplete",queue="active"} 3
scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="backoff"} 3
`,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
metrics.SchedulerQueueIncomingPods.Reset()
stop := make(chan struct{})
close(stop) // Stop the periodic flush
queue := NewPriorityQueue(stop, nil, WithClock(clock.NewFakeClock(timestamp)))
for _, op := range test.operations {
for _, pInfo := range pInfos {
op(queue, pInfo)
}
}
metricName := metrics.SchedulerSubsystem + "_" + metrics.SchedulerQueueIncomingPods.Name
if err := testutil.CollectAndCompare(metrics.SchedulerQueueIncomingPods, strings.NewReader(queueMetricMetadata+test.want), metricName); err != nil {
t.Errorf("unexpected collecting result:\n%s", err)
}
})
}
}
func checkPerPodSchedulingMetrics(name string, t *testing.T, pInfo *framework.PodInfo, wantAttemtps int, wantInitialAttemptTs time.Time) {
if pInfo.Attempts != wantAttemtps {
t.Errorf("[%s] Pod schedule attempt unexpected, got %v, want %v", name, pInfo.Attempts, wantAttemtps)

View File

@ -248,6 +248,14 @@ var (
},
[]string{"extension_point", "status"})
SchedulerQueueIncomingPods = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: SchedulerSubsystem,
Name: "queue_incoming_pods_total",
Help: "Number of pods added to scheduling queues by event and queue type.",
StabilityLevel: metrics.ALPHA,
}, []string{"queue", "event"})
metricsList = []metrics.Registerable{
scheduleAttempts,
SchedulingLatency,
@ -270,6 +278,7 @@ var (
PodSchedulingDuration,
PodSchedulingAttempts,
FrameworkExtensionPointDuration,
SchedulerQueueIncomingPods,
}
)