make scheduling queue start before the scheduler starts and stops after the scheduler stops

This commit is contained in:
Abdullah Gharaibeh
2019-12-02 14:35:09 -05:00
parent aa67744438
commit f388534082
8 changed files with 54 additions and 52 deletions

View File

@@ -97,11 +97,13 @@ type SchedulingQueue interface {
DeleteNominatedPodIfExists(pod *v1.Pod)
// NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.
NumUnschedulablePods() int
// Run starts the goroutines managing the queue.
Run()
}
// NewSchedulingQueue initializes a priority queue as a new scheduling queue.
func NewSchedulingQueue(stop <-chan struct{}, fwk framework.Framework, opts ...Option) SchedulingQueue {
return NewPriorityQueue(stop, fwk, opts...)
func NewSchedulingQueue(fwk framework.Framework, opts ...Option) SchedulingQueue {
return NewPriorityQueue(fwk, opts...)
}
// NominatedNodeName returns nominated node name of a Pod.
@@ -117,7 +119,7 @@ func NominatedNodeName(pod *v1.Pod) string {
// is called unschedulableQ. The third queue holds pods that are moved from
// unschedulable queues and will be moved to active queue when backoff are completed.
type PriorityQueue struct {
stop <-chan struct{}
stop chan struct{}
clock util.Clock
// podBackoff tracks backoff for pods attempting to be rescheduled
podBackoff *PodBackoffMap
@@ -209,7 +211,6 @@ func activeQComp(podInfo1, podInfo2 interface{}) bool {
// NewPriorityQueue creates a PriorityQueue object.
func NewPriorityQueue(
stop <-chan struct{},
fwk framework.Framework,
opts ...Option,
) *PriorityQueue {
@@ -232,7 +233,7 @@ func NewPriorityQueue(
pq := &PriorityQueue{
clock: options.clock,
stop: stop,
stop: make(chan struct{}),
podBackoff: NewPodBackoffMap(options.podInitialBackoffDuration, options.podMaxBackoffDuration),
activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
@@ -242,13 +243,11 @@ func NewPriorityQueue(
pq.cond.L = &pq.lock
pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
pq.run()
return pq
}
// run starts the goroutine to pump from podBackoffQ to activeQ
func (p *PriorityQueue) run() {
// Run starts the goroutine to pump from podBackoffQ to activeQ
func (p *PriorityQueue) Run() {
go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}
@@ -636,6 +635,7 @@ func (p *PriorityQueue) PendingPods() []*v1.Pod {
func (p *PriorityQueue) Close() {
p.lock.Lock()
defer p.lock.Unlock()
close(p.stop)
p.closed = true
p.cond.Broadcast()
}