not updae timestamp for each scheduling attempt

This commit is contained in:
wangqingcan
2019-02-22 09:46:18 +08:00
parent bf20886f4e
commit ea9e1a4118
3 changed files with 355 additions and 163 deletions

View File

@@ -260,15 +260,29 @@ func podTimestamp(pod *v1.Pod) *metav1.Time {
return &condition.LastProbeTime
}
// podInfo is minimum cell in the scheduling queue.
type podInfo struct {
pod *v1.Pod
// The time pod added to the scheduling queue.
timestamp time.Time
}
// newPodInfoNoTimestamp builds a podInfo object without timestamp.
func newPodInfoNoTimestamp(pod *v1.Pod) *podInfo {
return &podInfo{
pod: pod,
}
}
// activeQComp is the function used by the activeQ heap algorithm to sort pods.
// It sorts pods based on their priority. When priorities are equal, it uses
// podTimestamp.
func activeQComp(pod1, pod2 interface{}) bool {
p1 := pod1.(*v1.Pod)
p2 := pod2.(*v1.Pod)
prio1 := util.GetPodPriority(p1)
prio2 := util.GetPodPriority(p2)
return (prio1 > prio2) || (prio1 == prio2 && podTimestamp(p1).Before(podTimestamp(p2)))
// podInfo.timestamp.
func activeQComp(podInfo1, podInfo2 interface{}) bool {
pInfo1 := podInfo1.(*podInfo)
pInfo2 := podInfo2.(*podInfo)
prio1 := util.GetPodPriority(pInfo1.pod)
prio2 := util.GetPodPriority(pInfo2.pod)
return (prio1 > prio2) || (prio1 == prio2 && pInfo1.timestamp.Before(pInfo2.timestamp))
}
// NewPriorityQueue creates a PriorityQueue object.
@@ -282,13 +296,13 @@ func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock) *Priority
clock: clock,
stop: stop,
podBackoff: util.CreatePodBackoffWithClock(1*time.Second, 10*time.Second, clock),
activeQ: util.NewHeap(cache.MetaNamespaceKeyFunc, activeQComp),
unschedulableQ: newUnschedulablePodsMap(),
activeQ: util.NewHeap(podInfoKeyFunc, activeQComp),
unschedulableQ: newUnschedulablePodsMap(clock),
nominatedPods: newNominatedPodMap(),
moveRequestCycle: -1,
}
pq.cond.L = &pq.lock
pq.podBackoffQ = util.NewHeap(cache.MetaNamespaceKeyFunc, pq.podsCompareBackoffCompleted)
pq.podBackoffQ = util.NewHeap(podInfoKeyFunc, pq.podsCompareBackoffCompleted)
pq.run()
@@ -306,7 +320,8 @@ func (p *PriorityQueue) run() {
func (p *PriorityQueue) Add(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
if err := p.activeQ.Add(pod); err != nil {
pInfo := p.newPodInfo(pod)
if err := p.activeQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
return err
}
@@ -315,7 +330,7 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error {
p.unschedulableQ.delete(pod)
}
// Delete pod from backoffQ if it is backing off
if err := p.podBackoffQ.Delete(pod); err == nil {
if err := p.podBackoffQ.Delete(pInfo); err == nil {
klog.Errorf("Error: pod %v/%v is already in the podBackoff queue.", pod.Namespace, pod.Name)
}
p.nominatedPods.add(pod, "")
@@ -332,13 +347,15 @@ func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error {
if p.unschedulableQ.get(pod) != nil {
return nil
}
if _, exists, _ := p.activeQ.Get(pod); exists {
pInfo := p.newPodInfo(pod)
if _, exists, _ := p.activeQ.Get(pInfo); exists {
return nil
}
if _, exists, _ := p.podBackoffQ.Get(pod); exists {
if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
return nil
}
err := p.activeQ.Add(pod)
err := p.activeQ.Add(pInfo)
if err != nil {
klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
} else {
@@ -405,22 +422,24 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingC
if p.unschedulableQ.get(pod) != nil {
return fmt.Errorf("pod is already present in unschedulableQ")
}
if _, exists, _ := p.activeQ.Get(pod); exists {
pInfo := p.newPodInfo(pod)
if _, exists, _ := p.activeQ.Get(pInfo); exists {
return fmt.Errorf("pod is already present in the activeQ")
}
if _, exists, _ := p.podBackoffQ.Get(pod); exists {
if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
return fmt.Errorf("pod is already present in the backoffQ")
}
if podSchedulingCycle > p.moveRequestCycle && isPodUnschedulable(pod) {
p.backoffPod(pod)
p.unschedulableQ.addOrUpdate(pod)
p.unschedulableQ.addOrUpdate(pInfo)
p.nominatedPods.add(pod, "")
return nil
}
// If a move request has been received and the pod is subject to backoff, move it to the BackoffQ.
if p.isPodBackingOff(pod) && isPodUnschedulable(pod) {
err := p.podBackoffQ.Add(pod)
err := p.podBackoffQ.Add(pInfo)
if err != nil {
klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
} else {
@@ -429,7 +448,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingC
return err
}
err := p.activeQ.Add(pod)
err := p.activeQ.Add(pInfo)
if err == nil {
p.nominatedPods.add(pod, "")
p.cond.Broadcast()
@@ -443,16 +462,16 @@ func (p *PriorityQueue) flushBackoffQCompleted() {
defer p.lock.Unlock()
for {
rawPod := p.podBackoffQ.Peek()
if rawPod == nil {
rawPodInfo := p.podBackoffQ.Peek()
if rawPodInfo == nil {
return
}
pod := rawPod.(*v1.Pod)
pod := rawPodInfo.(*podInfo).pod
boTime, found := p.podBackoff.GetBackoffTime(nsNameForPod(pod))
if !found {
klog.Errorf("Unable to find backoff value for pod %v in backoffQ", nsNameForPod(pod))
p.podBackoffQ.Pop()
p.activeQ.Add(pod)
p.activeQ.Add(rawPodInfo)
defer p.cond.Broadcast()
continue
}
@@ -465,7 +484,7 @@ func (p *PriorityQueue) flushBackoffQCompleted() {
klog.Errorf("Unable to pop pod %v from backoffQ despite backoff completion.", nsNameForPod(pod))
return
}
p.activeQ.Add(pod)
p.activeQ.Add(rawPodInfo)
defer p.cond.Broadcast()
}
}
@@ -476,12 +495,12 @@ func (p *PriorityQueue) flushUnschedulableQLeftover() {
p.lock.Lock()
defer p.lock.Unlock()
var podsToMove []*v1.Pod
var podsToMove []*podInfo
currentTime := p.clock.Now()
for _, pod := range p.unschedulableQ.pods {
lastScheduleTime := podTimestamp(pod)
if !lastScheduleTime.IsZero() && currentTime.Sub(lastScheduleTime.Time) > unschedulableQTimeInterval {
podsToMove = append(podsToMove, pod)
for _, pInfo := range p.unschedulableQ.podInfoMap {
lastScheduleTime := pInfo.timestamp
if currentTime.Sub(lastScheduleTime) > unschedulableQTimeInterval {
podsToMove = append(podsToMove, pInfo)
}
}
@@ -509,9 +528,9 @@ func (p *PriorityQueue) Pop() (*v1.Pod, error) {
if err != nil {
return nil, err
}
pod := obj.(*v1.Pod)
pInfo := obj.(*podInfo)
p.schedulingCycle++
return pod, err
return pInfo.pod, err
}
// isPodUpdated checks if the pod is updated in a way that it may have become
@@ -536,18 +555,23 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
defer p.lock.Unlock()
if oldPod != nil {
oldPodInfo := newPodInfoNoTimestamp(oldPod)
// If the pod is already in the active queue, just update it there.
if _, exists, _ := p.activeQ.Get(oldPod); exists {
if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists {
p.nominatedPods.update(oldPod, newPod)
err := p.activeQ.Update(newPod)
newPodInfo := newPodInfoNoTimestamp(newPod)
newPodInfo.timestamp = oldPodInfo.(*podInfo).timestamp
err := p.activeQ.Update(newPodInfo)
return err
}
// If the pod is in the backoff queue, update it there.
if _, exists, _ := p.podBackoffQ.Get(oldPod); exists {
if oldPodInfo, exists, _ := p.podBackoffQ.Get(oldPodInfo); exists {
p.nominatedPods.update(oldPod, newPod)
p.podBackoffQ.Delete(newPod)
err := p.activeQ.Add(newPod)
p.podBackoffQ.Delete(newPodInfoNoTimestamp(oldPod))
newPodInfo := newPodInfoNoTimestamp(newPod)
newPodInfo.timestamp = oldPodInfo.(*podInfo).timestamp
err := p.activeQ.Add(newPodInfo)
if err == nil {
p.cond.Broadcast()
}
@@ -556,24 +580,26 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
}
// If the pod is in the unschedulable queue, updating it may make it schedulable.
if usPod := p.unschedulableQ.get(newPod); usPod != nil {
if usPodInfo := p.unschedulableQ.get(newPod); usPodInfo != nil {
p.nominatedPods.update(oldPod, newPod)
newPodInfo := newPodInfoNoTimestamp(newPod)
newPodInfo.timestamp = usPodInfo.timestamp
if isPodUpdated(oldPod, newPod) {
// If the pod is updated reset backoff
p.clearPodBackoff(newPod)
p.unschedulableQ.delete(usPod)
err := p.activeQ.Add(newPod)
p.unschedulableQ.delete(usPodInfo.pod)
err := p.activeQ.Add(newPodInfo)
if err == nil {
p.cond.Broadcast()
}
return err
}
// Pod is already in unschedulable queue and hasnt updated, no need to backoff again
p.unschedulableQ.addOrUpdate(newPod)
p.unschedulableQ.addOrUpdate(newPodInfo)
return nil
}
// If pod is not in any of the queues, we put it in the active queue.
err := p.activeQ.Add(newPod)
err := p.activeQ.Add(p.newPodInfo(newPod))
if err == nil {
p.nominatedPods.add(newPod, "")
p.cond.Broadcast()
@@ -587,10 +613,10 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
p.nominatedPods.delete(pod)
err := p.activeQ.Delete(pod)
err := p.activeQ.Delete(newPodInfoNoTimestamp(pod))
if err != nil { // The item was probably not found in the activeQ.
p.clearPodBackoff(pod)
p.podBackoffQ.Delete(pod)
p.podBackoffQ.Delete(newPodInfoNoTimestamp(pod))
p.unschedulableQ.delete(pod)
}
return nil
@@ -619,13 +645,14 @@ func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) {
func (p *PriorityQueue) MoveAllToActiveQueue() {
p.lock.Lock()
defer p.lock.Unlock()
for _, pod := range p.unschedulableQ.pods {
for _, pInfo := range p.unschedulableQ.podInfoMap {
pod := pInfo.pod
if p.isPodBackingOff(pod) {
if err := p.podBackoffQ.Add(pod); err != nil {
if err := p.podBackoffQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
}
} else {
if err := p.activeQ.Add(pod); err != nil {
if err := p.activeQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
}
}
@@ -636,14 +663,15 @@ func (p *PriorityQueue) MoveAllToActiveQueue() {
}
// NOTE: this function assumes lock has been acquired in caller
func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {
for _, pod := range pods {
func (p *PriorityQueue) movePodsToActiveQueue(podInfoList []*podInfo) {
for _, pInfo := range podInfoList {
pod := pInfo.pod
if p.isPodBackingOff(pod) {
if err := p.podBackoffQ.Add(pod); err != nil {
if err := p.podBackoffQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
}
} else {
if err := p.activeQ.Add(pod); err != nil {
if err := p.activeQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
}
}
@@ -656,9 +684,10 @@ func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {
// getUnschedulablePodsWithMatchingAffinityTerm returns unschedulable pods which have
// any affinity term that matches "pod".
// NOTE: this function assumes lock has been acquired in caller.
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*v1.Pod {
var podsToMove []*v1.Pod
for _, up := range p.unschedulableQ.pods {
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*podInfo {
var podsToMove []*podInfo
for _, pInfo := range p.unschedulableQ.podInfoMap {
up := pInfo.pod
affinity := up.Spec.Affinity
if affinity != nil && affinity.PodAffinity != nil {
terms := predicates.GetPodAffinityTerms(affinity.PodAffinity)
@@ -669,7 +698,7 @@ func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod
klog.Errorf("Error getting label selectors for pod: %v.", up.Name)
}
if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
podsToMove = append(podsToMove, up)
podsToMove = append(podsToMove, pInfo)
break
}
}
@@ -692,16 +721,15 @@ func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*v1.Pod {
func (p *PriorityQueue) PendingPods() []*v1.Pod {
p.lock.Lock()
defer p.lock.Unlock()
result := []*v1.Pod{}
for _, pod := range p.activeQ.List() {
result = append(result, pod.(*v1.Pod))
for _, pInfo := range p.activeQ.List() {
result = append(result, pInfo.(*podInfo).pod)
}
for _, pod := range p.podBackoffQ.List() {
result = append(result, pod.(*v1.Pod))
for _, pInfo := range p.podBackoffQ.List() {
result = append(result, pInfo.(*podInfo).pod)
}
for _, pod := range p.unschedulableQ.pods {
result = append(result, pod)
for _, pInfo := range p.unschedulableQ.podInfoMap {
result = append(result, pInfo.pod)
}
return result
}
@@ -731,9 +759,11 @@ func (p *PriorityQueue) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string)
p.lock.Unlock()
}
func (p *PriorityQueue) podsCompareBackoffCompleted(p1, p2 interface{}) bool {
bo1, _ := p.podBackoff.GetBackoffTime(nsNameForPod(p1.(*v1.Pod)))
bo2, _ := p.podBackoff.GetBackoffTime(nsNameForPod(p2.(*v1.Pod)))
func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool {
pInfo1 := podInfo1.(*podInfo)
pInfo2 := podInfo2.(*podInfo)
bo1, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo1.pod))
bo2, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo2.pod))
return bo1.Before(bo2)
}
@@ -741,47 +771,61 @@ func (p *PriorityQueue) podsCompareBackoffCompleted(p1, p2 interface{}) bool {
func (p *PriorityQueue) NumUnschedulablePods() int {
p.lock.RLock()
defer p.lock.RUnlock()
return len(p.unschedulableQ.pods)
return len(p.unschedulableQ.podInfoMap)
}
// newPodInfo builds a podInfo object.
func (p *PriorityQueue) newPodInfo(pod *v1.Pod) *podInfo {
if p.clock == nil {
return &podInfo{
pod: pod,
}
}
return &podInfo{
pod: pod,
timestamp: p.clock.Now(),
}
}
// UnschedulablePodsMap holds pods that cannot be scheduled. This data structure
// is used to implement unschedulableQ.
type UnschedulablePodsMap struct {
// pods is a map key by a pod's full-name and the value is a pointer to the pod.
pods map[string]*v1.Pod
keyFunc func(*v1.Pod) string
// podInfoMap is a map key by a pod's full-name and the value is a pointer to the podInfo.
podInfoMap map[string]*podInfo
keyFunc func(*v1.Pod) string
}
// Add adds a pod to the unschedulable pods.
func (u *UnschedulablePodsMap) addOrUpdate(pod *v1.Pod) {
u.pods[u.keyFunc(pod)] = pod
// Add adds a pod to the unschedulable podInfoMap.
func (u *UnschedulablePodsMap) addOrUpdate(pInfo *podInfo) {
u.podInfoMap[u.keyFunc(pInfo.pod)] = pInfo
}
// Delete deletes a pod from the unschedulable pods.
// Delete deletes a pod from the unschedulable podInfoMap.
func (u *UnschedulablePodsMap) delete(pod *v1.Pod) {
delete(u.pods, u.keyFunc(pod))
delete(u.podInfoMap, u.keyFunc(pod))
}
// Get returns the pod if a pod with the same key as the key of the given "pod"
// Get returns the podInfo if a pod with the same key as the key of the given "pod"
// is found in the map. It returns nil otherwise.
func (u *UnschedulablePodsMap) get(pod *v1.Pod) *v1.Pod {
func (u *UnschedulablePodsMap) get(pod *v1.Pod) *podInfo {
podKey := u.keyFunc(pod)
if p, exists := u.pods[podKey]; exists {
return p
if pInfo, exists := u.podInfoMap[podKey]; exists {
return pInfo
}
return nil
}
// Clear removes all the entries from the unschedulable maps.
// Clear removes all the entries from the unschedulable podInfoMap.
func (u *UnschedulablePodsMap) clear() {
u.pods = make(map[string]*v1.Pod)
u.podInfoMap = make(map[string]*podInfo)
}
// newUnschedulablePodsMap initializes a new object of UnschedulablePodsMap.
func newUnschedulablePodsMap() *UnschedulablePodsMap {
func newUnschedulablePodsMap(clock util.Clock) *UnschedulablePodsMap {
return &UnschedulablePodsMap{
pods: make(map[string]*v1.Pod),
keyFunc: util.GetPodFullName,
podInfoMap: make(map[string]*podInfo),
keyFunc: util.GetPodFullName,
}
}
@@ -872,3 +916,7 @@ func MakeNextPodFunc(queue SchedulingQueue) func() *v1.Pod {
return nil
}
}
func podInfoKeyFunc(obj interface{}) (string, error) {
return cache.MetaNamespaceKeyFunc(obj.(*podInfo).pod)
}