Merge pull request #60156 from bsalamat/sched_q_imprv

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Minor improvements to scheduling queue

**What this PR does / why we need it**:
Just minor improvements to the code of scheduling_queue.go. It shouldn't change the logic/behavior.

```release-note
NONE
```

/sig scheduling
This commit is contained in:
Kubernetes Submit Queue 2018-02-21 22:04:39 -08:00 committed by GitHub
commit 8112e3a5b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 31 additions and 62 deletions

View File

@ -144,18 +144,6 @@ func NominatedNodeName(pod *v1.Pod) string {
return pod.Status.NominatedNodeName return pod.Status.NominatedNodeName
} }
// UnschedulablePods is an interface for a queue that is used to keep unschedulable
// pods. These pods are not actively reevaluated for scheduling. They are moved
// to the active scheduling queue on certain events, such as termination of a pod
// in the cluster, addition of nodes, etc.
type UnschedulablePods interface {
Add(pod *v1.Pod)
Delete(pod *v1.Pod)
Update(pod *v1.Pod)
Get(pod *v1.Pod) *v1.Pod
Clear()
}
// PriorityQueue implements a scheduling queue. It is an alternative to FIFO. // PriorityQueue implements a scheduling queue. It is an alternative to FIFO.
// The head of PriorityQueue is the highest priority pending pod. This structure // The head of PriorityQueue is the highest priority pending pod. This structure
// has two sub queues. One sub-queue holds pods that are being considered for // has two sub queues. One sub-queue holds pods that are being considered for
@ -245,10 +233,10 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error {
if err != nil { if err != nil {
glog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err) glog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
} else { } else {
if p.unschedulableQ.Get(pod) != nil { if p.unschedulableQ.get(pod) != nil {
glog.Errorf("Error: pod %v is already in the unschedulable queue.", pod.Name) glog.Errorf("Error: pod %v is already in the unschedulable queue.", pod.Name)
p.deleteNominatedPodIfExists(pod) p.deleteNominatedPodIfExists(pod)
p.unschedulableQ.Delete(pod) p.unschedulableQ.delete(pod)
} }
p.addNominatedPodIfNeeded(pod) p.addNominatedPodIfNeeded(pod)
p.cond.Broadcast() p.cond.Broadcast()
@ -261,7 +249,7 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error {
func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error { func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
if p.unschedulableQ.Get(pod) != nil { if p.unschedulableQ.get(pod) != nil {
return nil return nil
} }
if _, exists, _ := p.activeQ.Get(pod); exists { if _, exists, _ := p.activeQ.Get(pod); exists {
@ -288,14 +276,14 @@ func isPodUnschedulable(pod *v1.Pod) bool {
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error { func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
if p.unschedulableQ.Get(pod) != nil { if p.unschedulableQ.get(pod) != nil {
return fmt.Errorf("pod is already present in unschedulableQ") return fmt.Errorf("pod is already present in unschedulableQ")
} }
if _, exists, _ := p.activeQ.Get(pod); exists { if _, exists, _ := p.activeQ.Get(pod); exists {
return fmt.Errorf("pod is already present in the activeQ") return fmt.Errorf("pod is already present in the activeQ")
} }
if !p.receivedMoveRequest && isPodUnschedulable(pod) { if !p.receivedMoveRequest && isPodUnschedulable(pod) {
p.unschedulableQ.Add(pod) p.unschedulableQ.addOrUpdate(pod)
p.addNominatedPodIfNeeded(pod) p.addNominatedPodIfNeeded(pod)
return nil return nil
} }
@ -352,17 +340,17 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
return err return err
} }
// If the pod is in the unschedulable queue, updating it may make it schedulable. // If the pod is in the unschedulable queue, updating it may make it schedulable.
if usPod := p.unschedulableQ.Get(newPod); usPod != nil { if usPod := p.unschedulableQ.get(newPod); usPod != nil {
p.updateNominatedPod(oldPod, newPod) p.updateNominatedPod(oldPod, newPod)
if isPodUpdated(oldPod, newPod) { if isPodUpdated(oldPod, newPod) {
p.unschedulableQ.Delete(usPod) p.unschedulableQ.delete(usPod)
err := p.activeQ.Add(newPod) err := p.activeQ.Add(newPod)
if err == nil { if err == nil {
p.cond.Broadcast() p.cond.Broadcast()
} }
return err return err
} }
p.unschedulableQ.Update(newPod) p.unschedulableQ.addOrUpdate(newPod)
return nil return nil
} }
// If pod is not in any of the two queue, we put it in the active queue. // If pod is not in any of the two queue, we put it in the active queue.
@ -382,7 +370,7 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error {
p.deleteNominatedPodIfExists(pod) p.deleteNominatedPodIfExists(pod)
err := p.activeQ.Delete(pod) err := p.activeQ.Delete(pod)
if err != nil { // The item was probably not found in the activeQ. if err != nil { // The item was probably not found in the activeQ.
p.unschedulableQ.Delete(pod) p.unschedulableQ.delete(pod)
} }
return nil return nil
} }
@ -415,7 +403,7 @@ func (p *PriorityQueue) MoveAllToActiveQueue() {
glog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err) glog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
} }
} }
p.unschedulableQ.Clear() p.unschedulableQ.clear()
p.receivedMoveRequest = true p.receivedMoveRequest = true
p.cond.Broadcast() p.cond.Broadcast()
} }
@ -425,7 +413,7 @@ func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {
defer p.lock.Unlock() defer p.lock.Unlock()
for _, pod := range pods { for _, pod := range pods {
if err := p.activeQ.Add(pod); err == nil { if err := p.activeQ.Add(pod); err == nil {
p.unschedulableQ.Delete(pod) p.unschedulableQ.delete(pod)
} else { } else {
glog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err) glog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
} }
@ -439,7 +427,7 @@ func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*v1.Pod { func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*v1.Pod {
p.lock.RLock() p.lock.RLock()
defer p.lock.RUnlock() defer p.lock.RUnlock()
podsToMove := []*v1.Pod{} var podsToMove []*v1.Pod
for _, up := range p.unschedulableQ.pods { for _, up := range p.unschedulableQ.pods {
affinity := up.Spec.Affinity affinity := up.Spec.Affinity
if affinity != nil && affinity.PodAffinity != nil { if affinity != nil && affinity.PodAffinity != nil {
@ -480,38 +468,19 @@ type UnschedulablePodsMap struct {
keyFunc func(*v1.Pod) string keyFunc func(*v1.Pod) string
} }
var _ = UnschedulablePods(&UnschedulablePodsMap{})
// Add adds a pod to the unschedulable pods. // Add adds a pod to the unschedulable pods.
func (u *UnschedulablePodsMap) Add(pod *v1.Pod) { func (u *UnschedulablePodsMap) addOrUpdate(pod *v1.Pod) {
podKey := u.keyFunc(pod) u.pods[u.keyFunc(pod)] = pod
if _, exists := u.pods[podKey]; !exists {
u.pods[podKey] = pod
}
} }
// Delete deletes a pod from the unschedulable pods. // Delete deletes a pod from the unschedulable pods.
func (u *UnschedulablePodsMap) Delete(pod *v1.Pod) { func (u *UnschedulablePodsMap) delete(pod *v1.Pod) {
podKey := u.keyFunc(pod) delete(u.pods, u.keyFunc(pod))
if _, exists := u.pods[podKey]; exists {
delete(u.pods, podKey)
}
}
// Update updates a pod in the unschedulable pods.
func (u *UnschedulablePodsMap) Update(pod *v1.Pod) {
podKey := u.keyFunc(pod)
_, exists := u.pods[podKey]
if !exists {
u.Add(pod)
return
}
u.pods[podKey] = pod
} }
// Get returns the pod if a pod with the same key as the key of the given "pod" // Get returns the pod if a pod with the same key as the key of the given "pod"
// is found in the map. It returns nil otherwise. // is found in the map. It returns nil otherwise.
func (u *UnschedulablePodsMap) Get(pod *v1.Pod) *v1.Pod { func (u *UnschedulablePodsMap) get(pod *v1.Pod) *v1.Pod {
podKey := u.keyFunc(pod) podKey := u.keyFunc(pod)
if p, exists := u.pods[podKey]; exists { if p, exists := u.pods[podKey]; exists {
return p return p
@ -520,7 +489,7 @@ func (u *UnschedulablePodsMap) Get(pod *v1.Pod) *v1.Pod {
} }
// Clear removes all the entries from the unschedulable maps. // Clear removes all the entries from the unschedulable maps.
func (u *UnschedulablePodsMap) Clear() { func (u *UnschedulablePodsMap) clear() {
u.pods = make(map[string]*v1.Pod) u.pods = make(map[string]*v1.Pod)
} }

View File

@ -117,7 +117,7 @@ func TestPriorityQueue_Add(t *testing.T) {
func TestPriorityQueue_AddIfNotPresent(t *testing.T) { func TestPriorityQueue_AddIfNotPresent(t *testing.T) {
q := NewPriorityQueue() q := NewPriorityQueue()
q.unschedulableQ.Add(&highPriNominatedPod) q.unschedulableQ.addOrUpdate(&highPriNominatedPod)
q.AddIfNotPresent(&highPriNominatedPod) // Must not add anything. q.AddIfNotPresent(&highPriNominatedPod) // Must not add anything.
q.AddIfNotPresent(&medPriorityPod) q.AddIfNotPresent(&medPriorityPod)
q.AddIfNotPresent(&unschedulablePod) q.AddIfNotPresent(&unschedulablePod)
@ -136,7 +136,7 @@ func TestPriorityQueue_AddIfNotPresent(t *testing.T) {
if len(q.nominatedPods) != 0 { if len(q.nominatedPods) != 0 {
t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods) t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods)
} }
if q.unschedulableQ.Get(&highPriNominatedPod) != &highPriNominatedPod { if q.unschedulableQ.get(&highPriNominatedPod) != &highPriNominatedPod {
t.Errorf("Pod %v was not found in the unschedulableQ.", highPriNominatedPod.Name) t.Errorf("Pod %v was not found in the unschedulableQ.", highPriNominatedPod.Name)
} }
} }
@ -162,7 +162,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
if len(q.nominatedPods) != 1 { if len(q.nominatedPods) != 1 {
t.Errorf("Expected nomindatePods to have one element: %v", q.nominatedPods) t.Errorf("Expected nomindatePods to have one element: %v", q.nominatedPods)
} }
if q.unschedulableQ.Get(&unschedulablePod) != &unschedulablePod { if q.unschedulableQ.get(&unschedulablePod) != &unschedulablePod {
t.Errorf("Pod %v was not found in the unschedulableQ.", unschedulablePod.Name) t.Errorf("Pod %v was not found in the unschedulableQ.", unschedulablePod.Name)
} }
} }
@ -243,8 +243,8 @@ func TestPriorityQueue_Delete(t *testing.T) {
func TestPriorityQueue_MoveAllToActiveQueue(t *testing.T) { func TestPriorityQueue_MoveAllToActiveQueue(t *testing.T) {
q := NewPriorityQueue() q := NewPriorityQueue()
q.Add(&medPriorityPod) q.Add(&medPriorityPod)
q.unschedulableQ.Add(&unschedulablePod) q.unschedulableQ.addOrUpdate(&unschedulablePod)
q.unschedulableQ.Add(&highPriorityPod) q.unschedulableQ.addOrUpdate(&highPriorityPod)
q.MoveAllToActiveQueue() q.MoveAllToActiveQueue()
if q.activeQ.data.Len() != 3 { if q.activeQ.data.Len() != 3 {
t.Error("Expected all items to be in activeQ.") t.Error("Expected all items to be in activeQ.")
@ -290,19 +290,19 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
q := NewPriorityQueue() q := NewPriorityQueue()
q.Add(&medPriorityPod) q.Add(&medPriorityPod)
// Add a couple of pods to the unschedulableQ. // Add a couple of pods to the unschedulableQ.
q.unschedulableQ.Add(&unschedulablePod) q.unschedulableQ.addOrUpdate(&unschedulablePod)
q.unschedulableQ.Add(affinityPod) q.unschedulableQ.addOrUpdate(affinityPod)
// Simulate addition of an assigned pod. The pod has matching labels for // Simulate addition of an assigned pod. The pod has matching labels for
// affinityPod. So, affinityPod should go to activeQ. // affinityPod. So, affinityPod should go to activeQ.
q.AssignedPodAdded(&labelPod) q.AssignedPodAdded(&labelPod)
if q.unschedulableQ.Get(affinityPod) != nil { if q.unschedulableQ.get(affinityPod) != nil {
t.Error("affinityPod is still in the unschedulableQ.") t.Error("affinityPod is still in the unschedulableQ.")
} }
if _, exists, _ := q.activeQ.Get(affinityPod); !exists { if _, exists, _ := q.activeQ.Get(affinityPod); !exists {
t.Error("affinityPod is not moved to activeQ.") t.Error("affinityPod is not moved to activeQ.")
} }
// Check that the other pod is still in the unschedulableQ. // Check that the other pod is still in the unschedulableQ.
if q.unschedulableQ.Get(&unschedulablePod) == nil { if q.unschedulableQ.get(&unschedulablePod) == nil {
t.Error("unschedulablePod is not in the unschedulableQ.") t.Error("unschedulablePod is not in the unschedulableQ.")
} }
} }
@ -438,7 +438,7 @@ func TestUnschedulablePodsMap(t *testing.T) {
for i, test := range tests { for i, test := range tests {
upm := newUnschedulablePodsMap() upm := newUnschedulablePodsMap()
for _, p := range test.podsToAdd { for _, p := range test.podsToAdd {
upm.Add(p) upm.addOrUpdate(p)
} }
if !reflect.DeepEqual(upm.pods, test.expectedMapAfterAdd) { if !reflect.DeepEqual(upm.pods, test.expectedMapAfterAdd) {
t.Errorf("#%d: Unexpected map after adding pods. Expected: %v, got: %v", t.Errorf("#%d: Unexpected map after adding pods. Expected: %v, got: %v",
@ -447,7 +447,7 @@ func TestUnschedulablePodsMap(t *testing.T) {
if len(test.podsToUpdate) > 0 { if len(test.podsToUpdate) > 0 {
for _, p := range test.podsToUpdate { for _, p := range test.podsToUpdate {
upm.Update(p) upm.addOrUpdate(p)
} }
if !reflect.DeepEqual(upm.pods, test.expectedMapAfterUpdate) { if !reflect.DeepEqual(upm.pods, test.expectedMapAfterUpdate) {
t.Errorf("#%d: Unexpected map after updating pods. Expected: %v, got: %v", t.Errorf("#%d: Unexpected map after updating pods. Expected: %v, got: %v",
@ -455,13 +455,13 @@ func TestUnschedulablePodsMap(t *testing.T) {
} }
} }
for _, p := range test.podsToDelete { for _, p := range test.podsToDelete {
upm.Delete(p) upm.delete(p)
} }
if !reflect.DeepEqual(upm.pods, test.expectedMapAfterDelete) { if !reflect.DeepEqual(upm.pods, test.expectedMapAfterDelete) {
t.Errorf("#%d: Unexpected map after deleting pods. Expected: %v, got: %v", t.Errorf("#%d: Unexpected map after deleting pods. Expected: %v, got: %v",
i, test.expectedMapAfterDelete, upm.pods) i, test.expectedMapAfterDelete, upm.pods)
} }
upm.Clear() upm.clear()
if len(upm.pods) != 0 { if len(upm.pods) != 0 {
t.Errorf("Expected the map to be empty, but has %v elements.", len(upm.pods)) t.Errorf("Expected the map to be empty, but has %v elements.", len(upm.pods))
} }