Merge pull request #60062 from bsalamat/sched_q_imprv

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Improve scheduling queue's logic

**What this PR does / why we need it**:
Improves scheduling queue's code based on some recent comments on [the original PR](https://github.com/kubernetes/kubernetes/pull/55109).
This PR does not fix any bugs or make any change of behavior.

**Release note**:

```release-note
NONE
```

/sig scheduling
This commit is contained in:
Kubernetes Submit Queue 2018-02-20 20:00:25 -08:00 committed by GitHub
commit fe4b28cdf0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -380,10 +380,10 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
p.deleteNominatedPodIfExists(pod)
if _, exists, _ := p.activeQ.Get(pod); exists {
return p.activeQ.Delete(pod)
err := p.activeQ.Delete(pod)
if err != nil { // The item was probably not found in the activeQ.
p.unschedulableQ.Delete(pod)
}
p.unschedulableQ.Delete(pod)
return nil
}
@ -410,11 +410,11 @@ func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) {
func (p *PriorityQueue) MoveAllToActiveQueue() {
p.lock.Lock()
defer p.lock.Unlock()
var unschedulablePods []interface{}
for _, pod := range p.unschedulableQ.pods {
unschedulablePods = append(unschedulablePods, pod)
if err := p.activeQ.Add(pod); err != nil {
glog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
}
}
p.activeQ.BulkAdd(unschedulablePods)
p.unschedulableQ.Clear()
p.receivedMoveRequest = true
p.cond.Broadcast()
@ -424,8 +424,11 @@ func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {
p.lock.Lock()
defer p.lock.Unlock()
for _, pod := range pods {
p.activeQ.Add(pod)
p.unschedulableQ.Delete(pod)
if err := p.activeQ.Add(pod); err == nil {
p.unschedulableQ.Delete(pod)
} else {
glog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
}
}
p.receivedMoveRequest = true
p.cond.Broadcast()
@ -449,6 +452,7 @@ func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod
}
if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
podsToMove = append(podsToMove, up)
break
}
}
}
@ -645,23 +649,6 @@ func (h *Heap) Add(obj interface{}) error {
return nil
}
// BulkAdd adds all the items in the list to the queue.
func (h *Heap) BulkAdd(list []interface{}) error {
for _, obj := range list {
key, err := h.data.keyFunc(obj)
if err != nil {
return cache.KeyError{Obj: obj, Err: err}
}
if _, exists := h.data.items[key]; exists {
h.data.items[key].obj = obj
heap.Fix(h.data, h.data.items[key].index)
} else {
heap.Push(h.data, &itemKeyValue{key, obj})
}
}
return nil
}
// AddIfNotPresent inserts an item, and puts it in the queue. If an item with
// the key is present in the map, no changes is made to the item.
func (h *Heap) AddIfNotPresent(obj interface{}) error {