diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 7efb313e548..748b5d5057f 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -88,7 +88,7 @@ type Config struct { // is available. We don't use a channel for this, because scheduling // a pod may take some amount of time and we don't want pods to get // stale while they sit in a channel. - NextPod func() *v1.Pod + NextPod func() *framework.PodInfo // WaitForCacheSync waits for scheduler cache to populate. // It returns true if it was successful, false if the controller should shutdown. @@ -96,7 +96,7 @@ type Config struct { // Error is called if there is an error. It is passed the pod in // question, and the error - Error func(*v1.Pod, error) + Error func(*framework.PodInfo, error) // Recorder is the EventRecorder to use Recorder events.EventRecorder @@ -470,7 +470,7 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced) }, NextPod: internalqueue.MakeNextPodFunc(podQueue), - Error: MakeDefaultErrorFunc(c.client, podQueue, c.schedulerCache, c.StopEverything), + Error: MakeDefaultErrorFunc(c.client, podQueue, c.schedulerCache), StopEverything: c.StopEverything, VolumeBinder: c.volumeBinder, SchedulingQueue: podQueue, @@ -636,8 +636,9 @@ func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) core } // MakeDefaultErrorFunc construct a function to handle pod scheduler error -func MakeDefaultErrorFunc(client clientset.Interface, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache, stopEverything <-chan struct{}) func(pod *v1.Pod, err error) { - return func(pod *v1.Pod, err error) { +func MakeDefaultErrorFunc(client clientset.Interface, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache) func(*framework.PodInfo, error) { + return func(podInfo *framework.PodInfo, err error) { + pod := podInfo.Pod if err == core.ErrNoNodesAvailable { klog.V(2).Infof("Unable to schedule %v/%v: no nodes are registered to the cluster; waiting", pod.Namespace, pod.Name) } else { @@ -681,7 +682,8 @@ func MakeDefaultErrorFunc(client clientset.Interface, podQueue internalqueue.Sch pod, err := client.CoreV1().Pods(podID.Namespace).Get(podID.Name, metav1.GetOptions{}) if err == nil { if len(pod.Spec.NodeName) == 0 { - if err := podQueue.AddUnschedulableIfNotPresent(pod, podSchedulingCycle); err != nil { + podInfo.Pod = pod + if err := podQueue.AddUnschedulableIfNotPresent(podInfo, podSchedulingCycle); err != nil { klog.Error(err) } } diff --git a/pkg/scheduler/factory/factory_test.go b/pkg/scheduler/factory/factory_test.go index 5433a493111..a99a407683d 100644 --- a/pkg/scheduler/factory/factory_test.go +++ b/pkg/scheduler/factory/factory_test.go @@ -251,6 +251,7 @@ func TestDefaultErrorFunc(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"}, Spec: apitesting.V1DeepEqualSafePodSpec(), } + testPodInfo := &framework.PodInfo{Pod: testPod} client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}}) stopCh := make(chan struct{}) defer close(stopCh) @@ -258,10 +259,10 @@ func TestDefaultErrorFunc(t *testing.T) { timestamp := time.Now() queue := internalqueue.NewPriorityQueue(nil, nil, internalqueue.WithClock(clock.NewFakeClock(timestamp))) schedulerCache := internalcache.New(30*time.Second, stopCh) - errFunc := MakeDefaultErrorFunc(client, queue, schedulerCache, stopCh) + errFunc := MakeDefaultErrorFunc(client, queue, schedulerCache) // Trigger error handling again to put the pod in unschedulable queue - errFunc(testPod, nil) + errFunc(testPodInfo, nil) // Try up to a minute to retrieve the error pod from priority queue foundPodFlag := false @@ -295,7 +296,7 @@ func TestDefaultErrorFunc(t *testing.T) { queue.MoveAllToActiveQueue() // Trigger error handling again to put the pod in backoff queue - errFunc(testPod, nil) + errFunc(testPodInfo, nil) foundPodFlag = false for i := 0; i < maxIterations; i++ { diff --git a/pkg/scheduler/framework/v1alpha1/interface.go b/pkg/scheduler/framework/v1alpha1/interface.go index e4e97310bfe..6e47ecc0c3a 100644 --- a/pkg/scheduler/framework/v1alpha1/interface.go +++ b/pkg/scheduler/framework/v1alpha1/interface.go @@ -153,11 +153,30 @@ type Plugin interface { Name() string } -// PodInfo is minimum cell in the scheduling queue. +// PodInfo is a wrapper to a Pod with additional information for purposes such as tracking +// the timestamp when it's added to the queue or recording per-pod metrics. type PodInfo struct { Pod *v1.Pod // The time pod added to the scheduling queue. Timestamp time.Time + // Number of schedule attempts before successfully scheduled. + // It's used to record the # attempts metric. + Attempts int + // The time when the pod is added to the queue for the first time. The pod may be added + // back to the queue multiple times before it's successfully scheduled. + // It shouldn't be updated once initialized. It's used to record the e2e scheduling + // latency for a pod. + InitialAttemptTimestamp time.Time +} + +// DeepCopy returns a deep copy of the PodInfo object. +func (podInfo *PodInfo) DeepCopy() *PodInfo { + return &PodInfo{ + Pod: podInfo.Pod.DeepCopy(), + Timestamp: podInfo.Timestamp, + Attempts: podInfo.Attempts, + InitialAttemptTimestamp: podInfo.InitialAttemptTimestamp, + } } // LessFunc is the function to sort pod info diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 79effb613a7..f8168b1dfba 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -73,14 +73,14 @@ type SchedulingQueue interface { // AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue. // The podSchedulingCycle represents the current scheduling cycle number which can be // returned by calling SchedulingCycle(). - AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error + AddUnschedulableIfNotPresent(pod *framework.PodInfo, podSchedulingCycle int64) error // SchedulingCycle returns the current number of scheduling cycle which is // cached by scheduling queue. Normally, incrementing this number whenever // a pod is popped (e.g. called Pop()) is enough. SchedulingCycle() int64 // Pop removes the head of the queue and returns it. It blocks if the // queue is empty and waits until a new item is added to the queue. - Pop() (*v1.Pod, error) + Pop() (*framework.PodInfo, error) Update(oldPod, newPod *v1.Pod) error Delete(pod *v1.Pod) error MoveAllToActiveQueue() @@ -350,14 +350,16 @@ func (p *PriorityQueue) SchedulingCycle() int64 { // the queue, unless it is already in the queue. Normally, PriorityQueue puts // unschedulable pods in `unschedulableQ`. But if there has been a recent move // request, then the pod is put in `podBackoffQ`. -func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error { +func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.PodInfo, podSchedulingCycle int64) error { p.lock.Lock() defer p.lock.Unlock() + pod := pInfo.Pod if p.unschedulableQ.get(pod) != nil { return fmt.Errorf("pod is already present in unschedulableQ") } - pInfo := p.newPodInfo(pod) + // Refresh the timestamp since the pod is re-added. + pInfo.Timestamp = p.clock.Now() if _, exists, _ := p.activeQ.Get(pInfo); exists { return fmt.Errorf("pod is already present in the activeQ") } @@ -439,7 +441,7 @@ func (p *PriorityQueue) flushUnschedulableQLeftover() { // Pop removes the head of the active queue and returns it. It blocks if the // activeQ is empty and waits until a new item is added to the queue. It // increments scheduling cycle when a pod is popped. -func (p *PriorityQueue) Pop() (*v1.Pod, error) { +func (p *PriorityQueue) Pop() (*framework.PodInfo, error) { p.lock.Lock() defer p.lock.Unlock() for p.activeQ.Len() == 0 { @@ -456,8 +458,9 @@ func (p *PriorityQueue) Pop() (*v1.Pod, error) { return nil, err } pInfo := obj.(*framework.PodInfo) + pInfo.Attempts++ p.schedulingCycle++ - return pInfo.Pod, err + return pInfo, err } // isPodUpdated checks if the pod is updated in a way that it may have become @@ -486,19 +489,15 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { // If the pod is already in the active queue, just update it there. if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists { p.nominatedPods.update(oldPod, newPod) - newPodInfo := newPodInfoNoTimestamp(newPod) - newPodInfo.Timestamp = oldPodInfo.(*framework.PodInfo).Timestamp - err := p.activeQ.Update(newPodInfo) + err := p.activeQ.Update(updatePod(oldPodInfo, newPod)) return err } // If the pod is in the backoff queue, update it there. if oldPodInfo, exists, _ := p.podBackoffQ.Get(oldPodInfo); exists { p.nominatedPods.update(oldPod, newPod) - p.podBackoffQ.Delete(newPodInfoNoTimestamp(oldPod)) - newPodInfo := newPodInfoNoTimestamp(newPod) - newPodInfo.Timestamp = oldPodInfo.(*framework.PodInfo).Timestamp - err := p.activeQ.Add(newPodInfo) + p.podBackoffQ.Delete(oldPodInfo) + err := p.activeQ.Add(updatePod(oldPodInfo, newPod)) if err == nil { p.cond.Broadcast() } @@ -509,20 +508,18 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { // If the pod is in the unschedulable queue, updating it may make it schedulable. if usPodInfo := p.unschedulableQ.get(newPod); usPodInfo != nil { p.nominatedPods.update(oldPod, newPod) - newPodInfo := newPodInfoNoTimestamp(newPod) - newPodInfo.Timestamp = usPodInfo.Timestamp if isPodUpdated(oldPod, newPod) { // If the pod is updated reset backoff p.clearPodBackoff(newPod) p.unschedulableQ.delete(usPodInfo.Pod) - err := p.activeQ.Add(newPodInfo) + err := p.activeQ.Add(updatePod(usPodInfo, newPod)) if err == nil { p.cond.Broadcast() } return err } // Pod is already in unschedulable queue and hasnt updated, no need to backoff again - p.unschedulableQ.addOrUpdate(newPodInfo) + p.unschedulableQ.addOrUpdate(updatePod(usPodInfo, newPod)) return nil } // If pod is not in any of the queues, we put it in the active queue. @@ -718,18 +715,20 @@ func (p *PriorityQueue) NumUnschedulablePods() int { // newPodInfo builds a PodInfo object. func (p *PriorityQueue) newPodInfo(pod *v1.Pod) *framework.PodInfo { - if p.clock == nil { - return &framework.PodInfo{ - Pod: pod, - } - } - + now := p.clock.Now() return &framework.PodInfo{ - Pod: pod, - Timestamp: p.clock.Now(), + Pod: pod, + Timestamp: now, + InitialAttemptTimestamp: now, } } +func updatePod(oldPodInfo interface{}, newPod *v1.Pod) *framework.PodInfo { + pInfo := oldPodInfo.(*framework.PodInfo) + pInfo.Pod = newPod + return pInfo +} + // UnschedulablePodsMap holds pods that cannot be scheduled. This data structure // is used to implement unschedulableQ. type UnschedulablePodsMap struct { @@ -876,12 +875,12 @@ func newNominatedPodMap() *nominatedPodMap { // MakeNextPodFunc returns a function to retrieve the next pod from a given // scheduling queue -func MakeNextPodFunc(queue SchedulingQueue) func() *v1.Pod { - return func() *v1.Pod { - pod, err := queue.Pop() +func MakeNextPodFunc(queue SchedulingQueue) func() *framework.PodInfo { + return func() *framework.PodInfo { + podInfo, err := queue.Pop() if err == nil { - klog.V(4).Infof("About to try and schedule pod %v/%v", pod.Namespace, pod.Name) - return pod + klog.V(4).Infof("About to try and schedule pod %v/%v", podInfo.Pod.Namespace, podInfo.Pod.Name) + return podInfo } klog.Errorf("Error while retrieving next pod from scheduling queue: %v", err) return nil diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 1f2c8159b81..e661f58d148 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -141,14 +141,14 @@ func TestPriorityQueue_Add(t *testing.T) { if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) { t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods) } - if p, err := q.Pop(); err != nil || p != &highPriorityPod { - t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name) + if p, err := q.Pop(); err != nil || p.Pod != &highPriorityPod { + t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Pod.Name) } - if p, err := q.Pop(); err != nil || p != &medPriorityPod { - t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name) + if p, err := q.Pop(); err != nil || p.Pod != &medPriorityPod { + t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Pod.Name) } - if p, err := q.Pop(); err != nil || p != &unschedulablePod { - t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Name) + if p, err := q.Pop(); err != nil || p.Pod != &unschedulablePod { + t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Pod.Name) } if len(q.nominatedPods.nominatedPods["node1"]) != 2 { t.Errorf("Expected medPriorityPod and unschedulablePod to be still present in nomindatePods: %v", q.nominatedPods.nominatedPods["node1"]) @@ -235,11 +235,11 @@ func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) { if err := q.Add(&highPriorityPod); err != nil { t.Errorf("add failed: %v", err) } - if p, err := q.Pop(); err != nil || p != &medPriorityPod { - t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name) + if p, err := q.Pop(); err != nil || p.Pod != &medPriorityPod { + t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Pod.Name) } - if p, err := q.Pop(); err != nil || p != &highPriorityPod { - t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name) + if p, err := q.Pop(); err != nil || p.Pod != &highPriorityPod { + t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Pod.Name) } } @@ -261,11 +261,11 @@ func TestPriorityQueue_AddIfNotPresent(t *testing.T) { if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) { t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods) } - if p, err := q.Pop(); err != nil || p != &medPriorityPod { - t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name) + if p, err := q.Pop(); err != nil || p.Pod != &medPriorityPod { + t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Pod.Name) } - if p, err := q.Pop(); err != nil || p != &unschedulablePod { - t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Name) + if p, err := q.Pop(); err != nil || p.Pod != &unschedulablePod { + t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Pod.Name) } if len(q.nominatedPods.nominatedPods["node1"]) != 2 { t.Errorf("Expected medPriorityPod and unschedulablePod to be still present in nomindatePods: %v", q.nominatedPods.nominatedPods["node1"]) @@ -278,8 +278,8 @@ func TestPriorityQueue_AddIfNotPresent(t *testing.T) { func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { q := NewPriorityQueue(nil, nil) q.Add(&highPriNominatedPod) - q.AddUnschedulableIfNotPresent(&highPriNominatedPod, q.SchedulingCycle()) // Must not add anything. - q.AddUnschedulableIfNotPresent(&unschedulablePod, q.SchedulingCycle()) + q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(&highPriNominatedPod), q.SchedulingCycle()) // Must not add anything. + q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(&unschedulablePod), q.SchedulingCycle()) expectedNominatedPods := &nominatedPodMap{ nominatedPodToNode: map[types.UID]string{ unschedulablePod.UID: "node1", @@ -292,8 +292,8 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) { t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods) } - if p, err := q.Pop(); err != nil || p != &highPriNominatedPod { - t.Errorf("Expected: %v after Pop, but got: %v", highPriNominatedPod.Name, p.Name) + if p, err := q.Pop(); err != nil || p.Pod != &highPriNominatedPod { + t.Errorf("Expected: %v after Pop, but got: %v", highPriNominatedPod.Name, p.Pod.Name) } if len(q.nominatedPods.nominatedPods) != 1 { t.Errorf("Expected nomindatePods to have one element: %v", q.nominatedPods) @@ -331,7 +331,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) { // Pop all pods except for the first one for i := totalNum - 1; i > 0; i-- { p, _ := q.Pop() - if !reflect.DeepEqual(&expectedPods[i], p) { + if !reflect.DeepEqual(&expectedPods[i], p.Pod) { t.Errorf("Unexpected pod. Expected: %v, got: %v", &expectedPods[i], p) } } @@ -341,7 +341,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) { oldCycle := q.SchedulingCycle() firstPod, _ := q.Pop() - if !reflect.DeepEqual(&expectedPods[0], firstPod) { + if !reflect.DeepEqual(&expectedPods[0], firstPod.Pod) { t.Errorf("Unexpected pod. Expected: %v, got: %v", &expectedPods[0], firstPod) } @@ -358,7 +358,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) { }, } - if err := q.AddUnschedulableIfNotPresent(unschedulablePod, oldCycle); err != nil { + if err := q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(unschedulablePod), oldCycle); err != nil { t.Errorf("Failed to call AddUnschedulableIfNotPresent(%v): %v", unschedulablePod.Name, err) } } @@ -380,8 +380,8 @@ func TestPriorityQueue_Pop(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - if p, err := q.Pop(); err != nil || p != &medPriorityPod { - t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name) + if p, err := q.Pop(); err != nil || p.Pod != &medPriorityPod { + t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Pod.Name) } if len(q.nominatedPods.nominatedPods["node1"]) != 1 { t.Errorf("Expected medPriorityPod to be present in nomindatePods: %v", q.nominatedPods.nominatedPods["node1"]) @@ -422,8 +422,8 @@ func TestPriorityQueue_Update(t *testing.T) { if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&unschedulablePod)); !exists { t.Errorf("Expected: %v to be added to activeQ.", unschedulablePod.Name) } - if p, err := q.Pop(); err != nil || p != &highPriNominatedPod { - t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name) + if p, err := q.Pop(); err != nil || p.Pod != &highPriNominatedPod { + t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Pod.Name) } } @@ -523,8 +523,8 @@ func TestPriorityQueue_NominatedPodsForNode(t *testing.T) { q.Add(&medPriorityPod) q.Add(&unschedulablePod) q.Add(&highPriorityPod) - if p, err := q.Pop(); err != nil || p != &highPriorityPod { - t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name) + if p, err := q.Pop(); err != nil || p.Pod != &highPriorityPod { + t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Pod.Name) } expectedList := []*v1.Pod{&medPriorityPod, &unschedulablePod} if !reflect.DeepEqual(expectedList, q.NominatedPodsForNode("node1")) { @@ -584,8 +584,8 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) { t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods) } - if p, err := q.Pop(); err != nil || p != &medPriorityPod { - t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name) + if p, err := q.Pop(); err != nil || p.Pod != &medPriorityPod { + t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Pod.Name) } // List of nominated pods shouldn't change after popping them from the queue. if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) { @@ -857,7 +857,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) { t.Errorf("Error while popping the head of the queue: %v", err) } // Update pod condition to unschedulable. - podutil.UpdatePodCondition(&p1.Status, &v1.PodCondition{ + podutil.UpdatePodCondition(&p1.Pod.Status, &v1.PodCondition{ Type: v1.PodScheduled, Status: v1.ConditionFalse, Reason: v1.PodReasonUnschedulable, @@ -876,7 +876,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) { t.Errorf("Error while popping pods from the queue: %v", err) } if (i == 4) != (p1 == p) { - t.Errorf("A pod tried before is not the last pod popped: i: %v, pod name: %v", i, p.Name) + t.Errorf("A pod tried before is not the last pod popped: i: %v, pod name: %v", i, p.Pod.Name) } } } @@ -914,7 +914,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { }) // Put in the unschedulable queue - q.AddUnschedulableIfNotPresent(&unschedulablePod, q.SchedulingCycle()) + q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(&unschedulablePod), q.SchedulingCycle()) // Clear its backoff to simulate backoff its expiration q.clearPodBackoff(&unschedulablePod) // Move all unschedulable pods to the active queue. @@ -926,8 +926,8 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { if err != nil { t.Errorf("Error while popping the head of the queue: %v", err) } - if p1 != &unschedulablePod { - t.Errorf("Expected that test-pod-unscheduled was popped, got %v", p1.Name) + if p1.Pod != &unschedulablePod { + t.Errorf("Expected that test-pod-unscheduled was popped, got %v", p1.Pod.Name) } // Assume newer pod was added just after unschedulable pod @@ -957,7 +957,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { }) // And then, put unschedulable pod to the unschedulable queue - q.AddUnschedulableIfNotPresent(&unschedulablePod, q.SchedulingCycle()) + q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(&unschedulablePod), q.SchedulingCycle()) // Clear its backoff to simulate its backoff expiration q.clearPodBackoff(&unschedulablePod) // Move all unschedulable pods to the active queue. @@ -969,8 +969,8 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { if err2 != nil { t.Errorf("Error while popping the head of the queue: %v", err2) } - if p2 != &newerPod { - t.Errorf("Expected that test-newer-pod was popped, got %v", p2.Name) + if p2.Pod != &newerPod { + t.Errorf("Expected that test-newer-pod was popped, got %v", p2.Pod.Name) } } @@ -1013,11 +1013,11 @@ func TestHighPriorityBackoff(t *testing.T) { if err != nil { t.Errorf("Error while popping the head of the queue: %v", err) } - if p != &highPod { + if p.Pod != &highPod { t.Errorf("Expected to get high priority pod, got: %v", p) } // Update pod condition to unschedulable. - podutil.UpdatePodCondition(&p.Status, &v1.PodCondition{ + podutil.UpdatePodCondition(&p.Pod.Status, &v1.PodCondition{ Type: v1.PodScheduled, Status: v1.ConditionFalse, Reason: v1.PodReasonUnschedulable, @@ -1032,7 +1032,7 @@ func TestHighPriorityBackoff(t *testing.T) { if err != nil { t.Errorf("Error while popping the head of the queue: %v", err) } - if p != &midPod { + if p.Pod != &midPod { t.Errorf("Expected to get mid priority pod, got: %v", p) } } @@ -1091,11 +1091,11 @@ func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) { addOrUpdateUnschedulablePod(q, highPodInfo) addOrUpdateUnschedulablePod(q, midPodInfo) - if p, err := q.Pop(); err != nil || p != &highPod { - t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name) + if p, err := q.Pop(); err != nil || p.Pod != &highPod { + t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Pod.Name) } - if p, err := q.Pop(); err != nil || p != &midPod { - t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name) + if p, err := q.Pop(); err != nil || p.Pod != &midPod { + t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Pod.Name) } } @@ -1405,3 +1405,79 @@ scheduler_pending_pods{queue="unschedulable"} 0 }) } } + +// TestPerPodSchedulingMetrics makes sure pod schedule attempts is updated correctly while +// initialAttemptTimestamp stays the same during multiple add/pop operations. +func TestPerPodSchedulingMetrics(t *testing.T) { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "test-ns", + UID: types.UID("test-uid"), + }, + } + timestamp := time.Now() + + // Case 1: A pod is created and scheduled after 1 attempt. The queue operations are + // Add -> Pop. + c := clock.NewFakeClock(timestamp) + queue := NewPriorityQueue(nil, nil, WithClock(c)) + queue.Add(pod) + pInfo, err := queue.Pop() + if err != nil { + t.Fatalf("Failed to pop a pod %v", err) + } + checkPerPodSchedulingMetrics("Attempt once", t, pInfo, 1, timestamp) + + // Case 2: A pod is created and scheduled after 2 attempts. The queue operations are + // Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Pop. + c = clock.NewFakeClock(timestamp) + queue = NewPriorityQueue(nil, nil, WithClock(c)) + queue.Add(pod) + pInfo, err = queue.Pop() + if err != nil { + t.Fatalf("Failed to pop a pod %v", err) + } + queue.AddUnschedulableIfNotPresent(pInfo, 1) + // Override clock to exceed the unschedulableQTimeInterval so that unschedulable pods + // will be moved to activeQ + c.SetTime(timestamp.Add(unschedulableQTimeInterval + 1)) + queue.flushUnschedulableQLeftover() + pInfo, err = queue.Pop() + if err != nil { + t.Fatalf("Failed to pop a pod %v", err) + } + checkPerPodSchedulingMetrics("Attempt twice", t, pInfo, 2, timestamp) + + // Case 3: Similar to case 2, but before the second pop, call update, the queue operations are + // Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Update -> Pop. + c = clock.NewFakeClock(timestamp) + queue = NewPriorityQueue(nil, nil, WithClock(c)) + queue.Add(pod) + pInfo, err = queue.Pop() + if err != nil { + t.Fatalf("Failed to pop a pod %v", err) + } + queue.AddUnschedulableIfNotPresent(pInfo, 1) + // Override clock to exceed the unschedulableQTimeInterval so that unschedulable pods + // will be moved to activeQ + c.SetTime(timestamp.Add(unschedulableQTimeInterval + 1)) + queue.flushUnschedulableQLeftover() + newPod := pod.DeepCopy() + newPod.Generation = 1 + queue.Update(pod, newPod) + pInfo, err = queue.Pop() + if err != nil { + t.Fatalf("Failed to pop a pod %v", err) + } + checkPerPodSchedulingMetrics("Attempt twice with update", t, pInfo, 2, timestamp) +} + +func checkPerPodSchedulingMetrics(name string, t *testing.T, pInfo *framework.PodInfo, wantAttemtps int, wantInitialAttemptTs time.Time) { + if pInfo.Attempts != wantAttemtps { + t.Errorf("[%s] Pod schedule attempt unexpected, got %v, want %v", name, pInfo.Attempts, wantAttemtps) + } + if pInfo.InitialAttemptTimestamp != wantInitialAttemptTs { + t.Errorf("[%s] Pod initial schedule attempt timestamp unexpected, got %v, want %v", name, pInfo.InitialAttemptTimestamp, wantInitialAttemptTs) + } +} diff --git a/pkg/scheduler/metrics/metrics.go b/pkg/scheduler/metrics/metrics.go index 6342e8694ab..2d709cb27c8 100644 --- a/pkg/scheduler/metrics/metrics.go +++ b/pkg/scheduler/metrics/metrics.go @@ -218,6 +218,24 @@ var ( StabilityLevel: metrics.ALPHA, }, []string{"queue"}) + PodSchedulingDuration = metrics.NewHistogram( + &metrics.HistogramOpts{ + Subsystem: SchedulerSubsystem, + Name: "pod_scheduling_duration_seconds", + Help: "E2e latency for a pod being scheduled which may include multiple scheduling attempts.", + Buckets: metrics.ExponentialBuckets(0.001, 2, 15), + StabilityLevel: metrics.ALPHA, + }) + + PodSchedulingAttempts = metrics.NewHistogram( + &metrics.HistogramOpts{ + Subsystem: SchedulerSubsystem, + Name: "pod_scheduling_attempts", + Help: "Number of attempts to successfully schedule a pod.", + Buckets: metrics.ExponentialBuckets(1, 2, 5), + StabilityLevel: metrics.ALPHA, + }) + metricsList = []metrics.Registerable{ scheduleAttempts, SchedulingLatency, @@ -237,6 +255,8 @@ var ( PreemptionVictims, PreemptionAttempts, pendingPods, + PodSchedulingDuration, + PodSchedulingAttempts, } ) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index e820546d37f..2d0ae9f3559 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -85,7 +85,7 @@ type Scheduler struct { // is available. We don't use a channel for this, because scheduling // a pod may take some amount of time and we don't want pods to get // stale while they sit in a channel. - NextPod func() *v1.Pod + NextPod func() *framework.PodInfo // WaitForCacheSync waits for scheduler cache to populate. // It returns true if it was successful, false if the controller should shutdown. @@ -93,7 +93,7 @@ type Scheduler struct { // Error is called if there is an error. It is passed the pod in // question, and the error - Error func(*v1.Pod, error) + Error func(*framework.PodInfo, error) // Recorder is the EventRecorder to use Recorder events.EventRecorder @@ -389,8 +389,9 @@ func (sched *Scheduler) Run() { // recordFailedSchedulingEvent records an event for the pod that indicates the // pod has failed to schedule. // NOTE: This function modifies "pod". "pod" should be copied before being passed. -func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason string, message string) { - sched.Error(pod, err) +func (sched *Scheduler) recordSchedulingFailure(podInfo *framework.PodInfo, err error, reason string, message string) { + sched.Error(podInfo, err) + pod := podInfo.Pod sched.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", message) if err := sched.podConditionUpdater.update(pod, &v1.PodCondition{ Type: v1.PodScheduled, @@ -402,18 +403,6 @@ func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason s } } -// schedule implements the scheduling algorithm and returns the suggested result(host, -// evaluated nodes number,feasible nodes number). -func (sched *Scheduler) schedule(pod *v1.Pod, state *framework.CycleState) (core.ScheduleResult, error) { - result, err := sched.Algorithm.Schedule(state, pod) - if err != nil { - pod = pod.DeepCopy() - sched.recordSchedulingFailure(pod, err, v1.PodReasonUnschedulable, err.Error()) - return core.ScheduleResult{}, err - } - return result, nil -} - // preempt tries to create room for a pod that has failed to schedule, by preempting lower priority pods if possible. // If it succeeds, it adds the name of the node where preemption has happened to the pod spec. // It returns the node name and an error if any. @@ -474,18 +463,6 @@ func (sched *Scheduler) preempt(state *framework.CycleState, fwk framework.Frame return nodeName, err } -// assumeVolumes will update the volume cache with the chosen bindings -// -// This function modifies assumed if volume binding is required. -func (sched *Scheduler) assumeVolumes(assumed *v1.Pod, host string) (allBound bool, err error) { - allBound, err = sched.VolumeBinder.Binder.AssumePodVolumes(assumed, host) - if err != nil { - sched.recordSchedulingFailure(assumed, err, SchedulerError, - fmt.Sprintf("AssumePodVolumes failed: %v", err)) - } - return -} - // bindVolumes will make the API update with the assumed bindings and wait until // the PV controller has completely finished the binding operation. // @@ -502,7 +479,6 @@ func (sched *Scheduler) bindVolumes(assumed *v1.Pod) error { klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } - sched.recordSchedulingFailure(assumed, err, "VolumeBindingFailed", err.Error()) return err } @@ -521,14 +497,6 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error { if err := sched.SchedulerCache.AssumePod(assumed); err != nil { klog.Errorf("scheduler cache AssumePod failed: %v", err) - - // This is most probably result of a BUG in retrying logic. - // We report an error here so that pod scheduling can be retried. - // This relies on the fact that Error will check if the pod has been bound - // to a node and if so will not add it back to the unscheduled pods queue - // (otherwise this would cause an infinite loop). - sched.recordSchedulingFailure(assumed, err, SchedulerError, - fmt.Sprintf("AssumePod failed: %v", err)) return err } // if "assumed" is a nominated pod, we should remove it from internal cache @@ -584,7 +552,8 @@ func (sched *Scheduler) bind(assumed *v1.Pod, targetNode string, state *framewor func (sched *Scheduler) scheduleOne() { fwk := sched.Framework - pod := sched.NextPod() + podInfo := sched.NextPod() + pod := podInfo.Pod // pod could be nil when schedulerQueue is closed if pod == nil { return @@ -600,9 +569,10 @@ func (sched *Scheduler) scheduleOne() { // Synchronously attempt to find a fit for the pod. start := time.Now() state := framework.NewCycleState() - scheduleResult, err := sched.schedule(pod, state) + scheduleResult, err := sched.Algorithm.Schedule(state, pod) if err != nil { - // schedule() may have failed because the pod would not fit on any host, so we try to + sched.recordSchedulingFailure(podInfo.DeepCopy(), err, v1.PodReasonUnschedulable, err.Error()) + // Schedule() may have failed because the pod would not fit on any host, so we try to // preempt, with the expectation that the next time the pod is tried for scheduling it // will fit due to the preemption. It is also possible that a different pod will schedule // into the resources that were preempted, but this is harmless. @@ -633,7 +603,8 @@ func (sched *Scheduler) scheduleOne() { metrics.DeprecatedSchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start)) // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet. // This allows us to keep scheduling without waiting on binding to occur. - assumedPod := pod.DeepCopy() + assumedPodInfo := podInfo.DeepCopy() + assumedPod := assumedPodInfo.Pod // Assume volumes first before assuming the pod. // @@ -642,16 +613,17 @@ func (sched *Scheduler) scheduleOne() { // Otherwise, binding of volumes is started after the pod is assumed, but before pod binding. // // This function modifies 'assumedPod' if volume binding is required. - allBound, err := sched.assumeVolumes(assumedPod, scheduleResult.SuggestedHost) + allBound, err := sched.VolumeBinder.Binder.AssumePodVolumes(assumedPod, scheduleResult.SuggestedHost) if err != nil { - klog.Errorf("error assuming volumes: %v", err) + sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, + fmt.Sprintf("AssumePodVolumes failed: %v", err)) metrics.PodScheduleErrors.Inc() return } // Run "reserve" plugins. if sts := fwk.RunReservePlugins(state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { - sched.recordSchedulingFailure(assumedPod, sts.AsError(), SchedulerError, sts.Message()) + sched.recordSchedulingFailure(assumedPodInfo, sts.AsError(), SchedulerError, sts.Message()) metrics.PodScheduleErrors.Inc() return } @@ -659,7 +631,12 @@ func (sched *Scheduler) scheduleOne() { // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost err = sched.assume(assumedPod, scheduleResult.SuggestedHost) if err != nil { - klog.Errorf("error assuming pod: %v", err) + // This is most probably result of a BUG in retrying logic. + // We report an error here so that pod scheduling can be retried. + // This relies on the fact that Error will check if the pod has been bound + // to a node and if so will not add it back to the unscheduled pods queue + // (otherwise this would cause an infinite loop). + sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("AssumePod failed: %v", err)) metrics.PodScheduleErrors.Inc() // trigger un-reserve plugins to clean up state associated with the reserved Pod fwk.RunUnreservePlugins(state, assumedPod, scheduleResult.SuggestedHost) @@ -671,7 +648,7 @@ func (sched *Scheduler) scheduleOne() { if !allBound { err := sched.bindVolumes(assumedPod) if err != nil { - klog.Errorf("error binding volumes: %v", err) + sched.recordSchedulingFailure(assumedPodInfo, err, "VolumeBindingFailed", err.Error()) metrics.PodScheduleErrors.Inc() // trigger un-reserve plugins to clean up state associated with the reserved Pod fwk.RunUnreservePlugins(state, assumedPod, scheduleResult.SuggestedHost) @@ -695,7 +672,7 @@ func (sched *Scheduler) scheduleOne() { } // trigger un-reserve plugins to clean up state associated with the reserved Pod fwk.RunUnreservePlugins(state, assumedPod, scheduleResult.SuggestedHost) - sched.recordSchedulingFailure(assumedPod, permitStatus.AsError(), reason, permitStatus.Message()) + sched.recordSchedulingFailure(assumedPodInfo, permitStatus.AsError(), reason, permitStatus.Message()) return } @@ -710,7 +687,7 @@ func (sched *Scheduler) scheduleOne() { } // trigger un-reserve plugins to clean up state associated with the reserved Pod fwk.RunUnreservePlugins(state, assumedPod, scheduleResult.SuggestedHost) - sched.recordSchedulingFailure(assumedPod, preBindStatus.AsError(), reason, preBindStatus.Message()) + sched.recordSchedulingFailure(assumedPodInfo, preBindStatus.AsError(), reason, preBindStatus.Message()) return } @@ -718,11 +695,10 @@ func (sched *Scheduler) scheduleOne() { metrics.E2eSchedulingLatency.Observe(metrics.SinceInSeconds(start)) metrics.DeprecatedE2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start)) if err != nil { - klog.Errorf("error binding pod: %v", err) metrics.PodScheduleErrors.Inc() // trigger un-reserve plugins to clean up state associated with the reserved Pod fwk.RunUnreservePlugins(state, assumedPod, scheduleResult.SuggestedHost) - sched.recordSchedulingFailure(assumedPod, err, SchedulerError, fmt.Sprintf("Binding rejected: %v", err)) + sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("Binding rejected: %v", err)) } else { // Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2. if klog.V(2) { @@ -731,6 +707,8 @@ func (sched *Scheduler) scheduleOne() { } metrics.PodScheduleSuccesses.Inc() + metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts)) + metrics.PodSchedulingDuration.Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp)) // Run "postbind" plugins. fwk.RunPostBindPlugins(state, assumedPod, scheduleResult.SuggestedHost) diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 9f7591f25c0..044253fdb9e 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -292,12 +292,12 @@ func TestScheduler(t *testing.T) { }} }, podConditionUpdater: fakePodConditionUpdater{}, - Error: func(p *v1.Pod, err error) { - gotPod = p + Error: func(p *framework.PodInfo, err error) { + gotPod = p.Pod gotError = err }, - NextPod: func() *v1.Pod { - return item.sendPod + NextPod: func() *framework.PodInfo { + return &framework.PodInfo{Pod: item.sendPod} }, Framework: emptyFramework, Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, "scheduler"), @@ -675,10 +675,10 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C return nil }} }, - NextPod: func() *v1.Pod { - return clientcache.Pop(queuedPodStore).(*v1.Pod) + NextPod: func() *framework.PodInfo { + return &framework.PodInfo{Pod: clientcache.Pop(queuedPodStore).(*v1.Pod)} }, - Error: func(p *v1.Pod, err error) { + Error: func(p *framework.PodInfo, err error) { errChan <- err }, Recorder: &events.FakeRecorder{}, @@ -696,7 +696,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C } func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) { - framework, _ := framework.NewFramework(emptyPluginRegistry, nil, []kubeschedulerconfig.PluginConfig{}) + fwk, _ := framework.NewFramework(emptyPluginRegistry, nil, []kubeschedulerconfig.PluginConfig{}) algo := core.NewGenericScheduler( scache, internalqueue.NewSchedulingQueue(nil, nil), @@ -704,7 +704,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc predicates.EmptyPredicateMetadataProducer, []priorities.PriorityConfig{}, priorities.EmptyPriorityMetadataProducer, - framework, + fwk, []algorithm.SchedulerExtender{}, nil, informerFactory.Core().V1().PersistentVolumeClaims().Lister(), @@ -729,17 +729,17 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc WaitForCacheSync: func() bool { return true }, - NextPod: func() *v1.Pod { - return clientcache.Pop(queuedPodStore).(*v1.Pod) + NextPod: func() *framework.PodInfo { + return &framework.PodInfo{Pod: clientcache.Pop(queuedPodStore).(*v1.Pod)} }, - Error: func(p *v1.Pod, err error) { + Error: func(p *framework.PodInfo, err error) { queuedPodStore.AddIfNotPresent(p) }, Recorder: &events.FakeRecorder{}, podConditionUpdater: fakePodConditionUpdater{}, PodPreemptor: fakePodPreemptor{}, StopEverything: stop, - Framework: framework, + Framework: fwk, VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}), }