Add per-pod metrics for scheduler.

This commit is contained in:
Cong Liu
2019-10-07 19:06:00 -04:00
parent 46dd075bab
commit 085852160a
8 changed files with 242 additions and 147 deletions

View File

@@ -73,14 +73,14 @@ type SchedulingQueue interface {
// AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue.
// The podSchedulingCycle represents the current scheduling cycle number which can be
// returned by calling SchedulingCycle().
AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error
AddUnschedulableIfNotPresent(pod *framework.PodInfo, podSchedulingCycle int64) error
// SchedulingCycle returns the current number of scheduling cycle which is
// cached by scheduling queue. Normally, incrementing this number whenever
// a pod is popped (e.g. called Pop()) is enough.
SchedulingCycle() int64
// Pop removes the head of the queue and returns it. It blocks if the
// queue is empty and waits until a new item is added to the queue.
Pop() (*v1.Pod, error)
Pop() (*framework.PodInfo, error)
Update(oldPod, newPod *v1.Pod) error
Delete(pod *v1.Pod) error
MoveAllToActiveQueue()
@@ -350,14 +350,16 @@ func (p *PriorityQueue) SchedulingCycle() int64 {
// the queue, unless it is already in the queue. Normally, PriorityQueue puts
// unschedulable pods in `unschedulableQ`. But if there has been a recent move
// request, then the pod is put in `podBackoffQ`.
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error {
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.PodInfo, podSchedulingCycle int64) error {
p.lock.Lock()
defer p.lock.Unlock()
pod := pInfo.Pod
if p.unschedulableQ.get(pod) != nil {
return fmt.Errorf("pod is already present in unschedulableQ")
}
pInfo := p.newPodInfo(pod)
// Refresh the timestamp since the pod is re-added.
pInfo.Timestamp = p.clock.Now()
if _, exists, _ := p.activeQ.Get(pInfo); exists {
return fmt.Errorf("pod is already present in the activeQ")
}
@@ -439,7 +441,7 @@ func (p *PriorityQueue) flushUnschedulableQLeftover() {
// Pop removes the head of the active queue and returns it. It blocks if the
// activeQ is empty and waits until a new item is added to the queue. It
// increments scheduling cycle when a pod is popped.
func (p *PriorityQueue) Pop() (*v1.Pod, error) {
func (p *PriorityQueue) Pop() (*framework.PodInfo, error) {
p.lock.Lock()
defer p.lock.Unlock()
for p.activeQ.Len() == 0 {
@@ -456,8 +458,9 @@ func (p *PriorityQueue) Pop() (*v1.Pod, error) {
return nil, err
}
pInfo := obj.(*framework.PodInfo)
pInfo.Attempts++
p.schedulingCycle++
return pInfo.Pod, err
return pInfo, err
}
// isPodUpdated checks if the pod is updated in a way that it may have become
@@ -486,19 +489,15 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
// If the pod is already in the active queue, just update it there.
if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists {
p.nominatedPods.update(oldPod, newPod)
newPodInfo := newPodInfoNoTimestamp(newPod)
newPodInfo.Timestamp = oldPodInfo.(*framework.PodInfo).Timestamp
err := p.activeQ.Update(newPodInfo)
err := p.activeQ.Update(updatePod(oldPodInfo, newPod))
return err
}
// If the pod is in the backoff queue, update it there.
if oldPodInfo, exists, _ := p.podBackoffQ.Get(oldPodInfo); exists {
p.nominatedPods.update(oldPod, newPod)
p.podBackoffQ.Delete(newPodInfoNoTimestamp(oldPod))
newPodInfo := newPodInfoNoTimestamp(newPod)
newPodInfo.Timestamp = oldPodInfo.(*framework.PodInfo).Timestamp
err := p.activeQ.Add(newPodInfo)
p.podBackoffQ.Delete(oldPodInfo)
err := p.activeQ.Add(updatePod(oldPodInfo, newPod))
if err == nil {
p.cond.Broadcast()
}
@@ -509,20 +508,18 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
// If the pod is in the unschedulable queue, updating it may make it schedulable.
if usPodInfo := p.unschedulableQ.get(newPod); usPodInfo != nil {
p.nominatedPods.update(oldPod, newPod)
newPodInfo := newPodInfoNoTimestamp(newPod)
newPodInfo.Timestamp = usPodInfo.Timestamp
if isPodUpdated(oldPod, newPod) {
// If the pod is updated reset backoff
p.clearPodBackoff(newPod)
p.unschedulableQ.delete(usPodInfo.Pod)
err := p.activeQ.Add(newPodInfo)
err := p.activeQ.Add(updatePod(usPodInfo, newPod))
if err == nil {
p.cond.Broadcast()
}
return err
}
// Pod is already in unschedulable queue and hasnt updated, no need to backoff again
p.unschedulableQ.addOrUpdate(newPodInfo)
p.unschedulableQ.addOrUpdate(updatePod(usPodInfo, newPod))
return nil
}
// If pod is not in any of the queues, we put it in the active queue.
@@ -718,18 +715,20 @@ func (p *PriorityQueue) NumUnschedulablePods() int {
// newPodInfo builds a PodInfo object.
func (p *PriorityQueue) newPodInfo(pod *v1.Pod) *framework.PodInfo {
if p.clock == nil {
return &framework.PodInfo{
Pod: pod,
}
}
now := p.clock.Now()
return &framework.PodInfo{
Pod: pod,
Timestamp: p.clock.Now(),
Pod: pod,
Timestamp: now,
InitialAttemptTimestamp: now,
}
}
func updatePod(oldPodInfo interface{}, newPod *v1.Pod) *framework.PodInfo {
pInfo := oldPodInfo.(*framework.PodInfo)
pInfo.Pod = newPod
return pInfo
}
// UnschedulablePodsMap holds pods that cannot be scheduled. This data structure
// is used to implement unschedulableQ.
type UnschedulablePodsMap struct {
@@ -876,12 +875,12 @@ func newNominatedPodMap() *nominatedPodMap {
// MakeNextPodFunc returns a function to retrieve the next pod from a given
// scheduling queue
func MakeNextPodFunc(queue SchedulingQueue) func() *v1.Pod {
return func() *v1.Pod {
pod, err := queue.Pop()
func MakeNextPodFunc(queue SchedulingQueue) func() *framework.PodInfo {
return func() *framework.PodInfo {
podInfo, err := queue.Pop()
if err == nil {
klog.V(4).Infof("About to try and schedule pod %v/%v", pod.Namespace, pod.Name)
return pod
klog.V(4).Infof("About to try and schedule pod %v/%v", podInfo.Pod.Namespace, podInfo.Pod.Name)
return podInfo
}
klog.Errorf("Error while retrieving next pod from scheduling queue: %v", err)
return nil