ensure scheduler preemptor behaves in an efficient/correct path

- don't update nominatedMap cache when Pop() an element from activeQ
- instead, delete the nominated info from cache when it's "assumed"
- unit test behavior adjusted
- expose SchedulingQueue in factory.Config
This commit is contained in:
Wei Huang
2018-11-16 14:22:15 -08:00
parent 843a67b215
commit b4fd11512a
5 changed files with 40 additions and 18 deletions

View File

@@ -67,6 +67,8 @@ type SchedulingQueue interface {
// Close closes the SchedulingQueue so that the goroutine which is
// waiting to pop items can exit gracefully.
Close()
// DeleteNominatedPodIfExists deletes nominatedPod from internal cache
DeleteNominatedPodIfExists(pod *v1.Pod)
}
// NewSchedulingQueue initializes a new scheduling queue. If pod priority is
@@ -157,6 +159,9 @@ func (f *FIFO) Close() {
f.FIFO.Close()
}
// DeleteNominatedPodIfExists does nothing in FIFO.
func (f *FIFO) DeleteNominatedPodIfExists(pod *v1.Pod) {}
// NewFIFO creates a FIFO object.
func NewFIFO() *FIFO {
return &FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)}
@@ -219,7 +224,7 @@ func (p *PriorityQueue) addNominatedPodIfNeeded(pod *v1.Pod) {
if len(nnn) > 0 {
for _, np := range p.nominatedPods[nnn] {
if np.UID == pod.UID {
klog.Errorf("Pod %v/%v already exists in the nominated map!", pod.Namespace, pod.Name)
klog.V(4).Infof("Pod %v/%v already exists in the nominated map!", pod.Namespace, pod.Name)
return
}
}
@@ -228,6 +233,7 @@ func (p *PriorityQueue) addNominatedPodIfNeeded(pod *v1.Pod) {
}
// deleteNominatedPodIfExists deletes a pod from the nominatedPods.
// NOTE: this function assumes lock has been acquired in caller.
func (p *PriorityQueue) deleteNominatedPodIfExists(pod *v1.Pod) {
nnn := NominatedNodeName(pod)
if len(nnn) > 0 {
@@ -342,7 +348,6 @@ func (p *PriorityQueue) Pop() (*v1.Pod, error) {
return nil, err
}
pod := obj.(*v1.Pod)
p.deleteNominatedPodIfExists(pod)
p.receivedMoveRequest = false
return pod, err
}
@@ -411,13 +416,17 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error {
// AssignedPodAdded is called when a bound pod is added. Creation of this pod
// may make pending pods with matching affinity terms schedulable.
func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod) {
p.lock.Lock()
p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod))
p.lock.Unlock()
}
// AssignedPodUpdated is called when a bound pod is updated. Change of labels
// may make pending pods with matching affinity terms schedulable.
func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) {
p.lock.Lock()
p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod))
p.lock.Unlock()
}
// MoveAllToActiveQueue moves all pods from unschedulableQ to activeQ. This
@@ -441,9 +450,8 @@ func (p *PriorityQueue) MoveAllToActiveQueue() {
p.cond.Broadcast()
}
// NOTE: this function assumes lock has been acquired in caller
func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {
p.lock.Lock()
defer p.lock.Unlock()
for _, pod := range pods {
if err := p.activeQ.Add(pod); err == nil {
p.unschedulableQ.delete(pod)
@@ -457,9 +465,8 @@ func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {
// getUnschedulablePodsWithMatchingAffinityTerm returns unschedulable pods which have
// any affinity term that matches "pod".
// NOTE: this function assumes lock has been acquired in caller.
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*v1.Pod {
p.lock.RLock()
defer p.lock.RUnlock()
var podsToMove []*v1.Pod
for _, up := range p.unschedulableQ.pods {
affinity := up.Spec.Affinity
@@ -516,6 +523,13 @@ func (p *PriorityQueue) Close() {
p.cond.Broadcast()
}
// DeleteNominatedPodIfExists deletes pod from internal cache if it's a nominatedPod
func (p *PriorityQueue) DeleteNominatedPodIfExists(pod *v1.Pod) {
p.lock.Lock()
p.deleteNominatedPodIfExists(pod)
p.lock.Unlock()
}
// UnschedulablePodsMap holds pods that cannot be scheduled. This data structure
// is used to implement unschedulableQ.
type UnschedulablePodsMap struct {