diff --git a/pkg/scheduler/core/scheduling_queue.go b/pkg/scheduler/core/scheduling_queue.go index 2c17a5443ee..9145416658b 100644 --- a/pkg/scheduler/core/scheduling_queue.go +++ b/pkg/scheduler/core/scheduling_queue.go @@ -144,18 +144,6 @@ func NominatedNodeName(pod *v1.Pod) string { return pod.Status.NominatedNodeName } -// UnschedulablePods is an interface for a queue that is used to keep unschedulable -// pods. These pods are not actively reevaluated for scheduling. They are moved -// to the active scheduling queue on certain events, such as termination of a pod -// in the cluster, addition of nodes, etc. -type UnschedulablePods interface { - Add(pod *v1.Pod) - Delete(pod *v1.Pod) - Update(pod *v1.Pod) - Get(pod *v1.Pod) *v1.Pod - Clear() -} - // PriorityQueue implements a scheduling queue. It is an alternative to FIFO. // The head of PriorityQueue is the highest priority pending pod. This structure // has two sub queues. One sub-queue holds pods that are being considered for @@ -245,10 +233,10 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error { if err != nil { glog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err) } else { - if p.unschedulableQ.Get(pod) != nil { + if p.unschedulableQ.get(pod) != nil { glog.Errorf("Error: pod %v is already in the unschedulable queue.", pod.Name) p.deleteNominatedPodIfExists(pod) - p.unschedulableQ.Delete(pod) + p.unschedulableQ.delete(pod) } p.addNominatedPodIfNeeded(pod) p.cond.Broadcast() @@ -261,7 +249,7 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error { func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error { p.lock.Lock() defer p.lock.Unlock() - if p.unschedulableQ.Get(pod) != nil { + if p.unschedulableQ.get(pod) != nil { return nil } if _, exists, _ := p.activeQ.Get(pod); exists { @@ -288,14 +276,14 @@ func isPodUnschedulable(pod *v1.Pod) bool { func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error { p.lock.Lock() defer p.lock.Unlock() - if p.unschedulableQ.Get(pod) != nil { + if p.unschedulableQ.get(pod) != nil { return fmt.Errorf("pod is already present in unschedulableQ") } if _, exists, _ := p.activeQ.Get(pod); exists { return fmt.Errorf("pod is already present in the activeQ") } if !p.receivedMoveRequest && isPodUnschedulable(pod) { - p.unschedulableQ.Add(pod) + p.unschedulableQ.addOrUpdate(pod) p.addNominatedPodIfNeeded(pod) return nil } @@ -352,17 +340,17 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { return err } // If the pod is in the unschedulable queue, updating it may make it schedulable. - if usPod := p.unschedulableQ.Get(newPod); usPod != nil { + if usPod := p.unschedulableQ.get(newPod); usPod != nil { p.updateNominatedPod(oldPod, newPod) if isPodUpdated(oldPod, newPod) { - p.unschedulableQ.Delete(usPod) + p.unschedulableQ.delete(usPod) err := p.activeQ.Add(newPod) if err == nil { p.cond.Broadcast() } return err } - p.unschedulableQ.Update(newPod) + p.unschedulableQ.addOrUpdate(newPod) return nil } // If pod is not in any of the two queue, we put it in the active queue. @@ -382,7 +370,7 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error { p.deleteNominatedPodIfExists(pod) err := p.activeQ.Delete(pod) if err != nil { // The item was probably not found in the activeQ. - p.unschedulableQ.Delete(pod) + p.unschedulableQ.delete(pod) } return nil } @@ -415,7 +403,7 @@ func (p *PriorityQueue) MoveAllToActiveQueue() { glog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err) } } - p.unschedulableQ.Clear() + p.unschedulableQ.clear() p.receivedMoveRequest = true p.cond.Broadcast() } @@ -425,7 +413,7 @@ func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) { defer p.lock.Unlock() for _, pod := range pods { if err := p.activeQ.Add(pod); err == nil { - p.unschedulableQ.Delete(pod) + p.unschedulableQ.delete(pod) } else { glog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err) } @@ -439,7 +427,7 @@ func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) { func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*v1.Pod { p.lock.RLock() defer p.lock.RUnlock() - podsToMove := []*v1.Pod{} + var podsToMove []*v1.Pod for _, up := range p.unschedulableQ.pods { affinity := up.Spec.Affinity if affinity != nil && affinity.PodAffinity != nil { @@ -480,38 +468,19 @@ type UnschedulablePodsMap struct { keyFunc func(*v1.Pod) string } -var _ = UnschedulablePods(&UnschedulablePodsMap{}) - // Add adds a pod to the unschedulable pods. -func (u *UnschedulablePodsMap) Add(pod *v1.Pod) { - podKey := u.keyFunc(pod) - if _, exists := u.pods[podKey]; !exists { - u.pods[podKey] = pod - } +func (u *UnschedulablePodsMap) addOrUpdate(pod *v1.Pod) { + u.pods[u.keyFunc(pod)] = pod } // Delete deletes a pod from the unschedulable pods. -func (u *UnschedulablePodsMap) Delete(pod *v1.Pod) { - podKey := u.keyFunc(pod) - if _, exists := u.pods[podKey]; exists { - delete(u.pods, podKey) - } -} - -// Update updates a pod in the unschedulable pods. -func (u *UnschedulablePodsMap) Update(pod *v1.Pod) { - podKey := u.keyFunc(pod) - _, exists := u.pods[podKey] - if !exists { - u.Add(pod) - return - } - u.pods[podKey] = pod +func (u *UnschedulablePodsMap) delete(pod *v1.Pod) { + delete(u.pods, u.keyFunc(pod)) } // Get returns the pod if a pod with the same key as the key of the given "pod" // is found in the map. It returns nil otherwise. -func (u *UnschedulablePodsMap) Get(pod *v1.Pod) *v1.Pod { +func (u *UnschedulablePodsMap) get(pod *v1.Pod) *v1.Pod { podKey := u.keyFunc(pod) if p, exists := u.pods[podKey]; exists { return p @@ -520,7 +489,7 @@ func (u *UnschedulablePodsMap) Get(pod *v1.Pod) *v1.Pod { } // Clear removes all the entries from the unschedulable maps. -func (u *UnschedulablePodsMap) Clear() { +func (u *UnschedulablePodsMap) clear() { u.pods = make(map[string]*v1.Pod) } diff --git a/pkg/scheduler/core/scheduling_queue_test.go b/pkg/scheduler/core/scheduling_queue_test.go index 05c293fde03..2c62963555d 100644 --- a/pkg/scheduler/core/scheduling_queue_test.go +++ b/pkg/scheduler/core/scheduling_queue_test.go @@ -117,7 +117,7 @@ func TestPriorityQueue_Add(t *testing.T) { func TestPriorityQueue_AddIfNotPresent(t *testing.T) { q := NewPriorityQueue() - q.unschedulableQ.Add(&highPriNominatedPod) + q.unschedulableQ.addOrUpdate(&highPriNominatedPod) q.AddIfNotPresent(&highPriNominatedPod) // Must not add anything. q.AddIfNotPresent(&medPriorityPod) q.AddIfNotPresent(&unschedulablePod) @@ -136,7 +136,7 @@ func TestPriorityQueue_AddIfNotPresent(t *testing.T) { if len(q.nominatedPods) != 0 { t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods) } - if q.unschedulableQ.Get(&highPriNominatedPod) != &highPriNominatedPod { + if q.unschedulableQ.get(&highPriNominatedPod) != &highPriNominatedPod { t.Errorf("Pod %v was not found in the unschedulableQ.", highPriNominatedPod.Name) } } @@ -162,7 +162,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { if len(q.nominatedPods) != 1 { t.Errorf("Expected nomindatePods to have one element: %v", q.nominatedPods) } - if q.unschedulableQ.Get(&unschedulablePod) != &unschedulablePod { + if q.unschedulableQ.get(&unschedulablePod) != &unschedulablePod { t.Errorf("Pod %v was not found in the unschedulableQ.", unschedulablePod.Name) } } @@ -243,8 +243,8 @@ func TestPriorityQueue_Delete(t *testing.T) { func TestPriorityQueue_MoveAllToActiveQueue(t *testing.T) { q := NewPriorityQueue() q.Add(&medPriorityPod) - q.unschedulableQ.Add(&unschedulablePod) - q.unschedulableQ.Add(&highPriorityPod) + q.unschedulableQ.addOrUpdate(&unschedulablePod) + q.unschedulableQ.addOrUpdate(&highPriorityPod) q.MoveAllToActiveQueue() if q.activeQ.data.Len() != 3 { t.Error("Expected all items to be in activeQ.") @@ -290,19 +290,19 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { q := NewPriorityQueue() q.Add(&medPriorityPod) // Add a couple of pods to the unschedulableQ. - q.unschedulableQ.Add(&unschedulablePod) - q.unschedulableQ.Add(affinityPod) + q.unschedulableQ.addOrUpdate(&unschedulablePod) + q.unschedulableQ.addOrUpdate(affinityPod) // Simulate addition of an assigned pod. The pod has matching labels for // affinityPod. So, affinityPod should go to activeQ. q.AssignedPodAdded(&labelPod) - if q.unschedulableQ.Get(affinityPod) != nil { + if q.unschedulableQ.get(affinityPod) != nil { t.Error("affinityPod is still in the unschedulableQ.") } if _, exists, _ := q.activeQ.Get(affinityPod); !exists { t.Error("affinityPod is not moved to activeQ.") } // Check that the other pod is still in the unschedulableQ. - if q.unschedulableQ.Get(&unschedulablePod) == nil { + if q.unschedulableQ.get(&unschedulablePod) == nil { t.Error("unschedulablePod is not in the unschedulableQ.") } } @@ -438,7 +438,7 @@ func TestUnschedulablePodsMap(t *testing.T) { for i, test := range tests { upm := newUnschedulablePodsMap() for _, p := range test.podsToAdd { - upm.Add(p) + upm.addOrUpdate(p) } if !reflect.DeepEqual(upm.pods, test.expectedMapAfterAdd) { t.Errorf("#%d: Unexpected map after adding pods. Expected: %v, got: %v", @@ -447,7 +447,7 @@ func TestUnschedulablePodsMap(t *testing.T) { if len(test.podsToUpdate) > 0 { for _, p := range test.podsToUpdate { - upm.Update(p) + upm.addOrUpdate(p) } if !reflect.DeepEqual(upm.pods, test.expectedMapAfterUpdate) { t.Errorf("#%d: Unexpected map after updating pods. Expected: %v, got: %v", @@ -455,13 +455,13 @@ func TestUnschedulablePodsMap(t *testing.T) { } } for _, p := range test.podsToDelete { - upm.Delete(p) + upm.delete(p) } if !reflect.DeepEqual(upm.pods, test.expectedMapAfterDelete) { t.Errorf("#%d: Unexpected map after deleting pods. Expected: %v, got: %v", i, test.expectedMapAfterDelete, upm.pods) } - upm.Clear() + upm.clear() if len(upm.pods) != 0 { t.Errorf("Expected the map to be empty, but has %v elements.", len(upm.pods)) }