From f3885340826f45ddafa515b84fa6136aad687281 Mon Sep 17 00:00:00 2001 From: Abdullah Gharaibeh Date: Mon, 2 Dec 2019 14:35:09 -0500 Subject: [PATCH] make scheduling queue start before the scheduler starts and stops after the scheduler stops --- pkg/scheduler/core/extender_test.go | 2 +- pkg/scheduler/core/generic_scheduler_test.go | 10 ++-- pkg/scheduler/factory.go | 6 -- pkg/scheduler/factory_test.go | 2 +- .../internal/queue/scheduling_queue.go | 18 +++--- .../internal/queue/scheduling_queue_test.go | 59 ++++++++++--------- pkg/scheduler/scheduler.go | 3 +- pkg/scheduler/scheduler_test.go | 6 +- 8 files changed, 54 insertions(+), 52 deletions(-) diff --git a/pkg/scheduler/core/extender_test.go b/pkg/scheduler/core/extender_test.go index 1525c1c529a..f22af0f2407 100644 --- a/pkg/scheduler/core/extender_test.go +++ b/pkg/scheduler/core/extender_test.go @@ -539,7 +539,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { for _, name := range test.nodes { cache.AddNode(createNode(name)) } - queue := internalqueue.NewSchedulingQueue(nil, nil) + queue := internalqueue.NewSchedulingQueue(nil) scheduler := NewGenericScheduler( cache, queue, diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index becf8df04a3..c0779790a91 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -662,7 +662,7 @@ func TestGenericScheduler(t *testing.T) { } scheduler := NewGenericScheduler( cache, - internalqueue.NewSchedulingQueue(nil, nil), + internalqueue.NewSchedulingQueue(nil), test.predicates, predMetaProducer, test.prioritizers, @@ -702,7 +702,7 @@ func makeScheduler(predicates map[string]algorithmpredicates.FitPredicate, nodes s := NewGenericScheduler( cache, - internalqueue.NewSchedulingQueue(nil, nil), + internalqueue.NewSchedulingQueue(nil), predicates, algorithmpredicates.EmptyMetadataProducer, nil, @@ -819,7 +819,7 @@ func TestFindFitPredicateCallCounts(t *testing.T) { cache.AddNode(n) } - queue := internalqueue.NewSchedulingQueue(nil, nil) + queue := internalqueue.NewSchedulingQueue(nil) scheduler := NewGenericScheduler( cache, queue, @@ -1412,7 +1412,7 @@ func TestSelectNodesForPreemption(t *testing.T) { filterPlugin.failedNodeReturnCodeMap = filterFailedNodeReturnCodeMap scheduler := NewGenericScheduler( nil, - internalqueue.NewSchedulingQueue(nil, nil), + internalqueue.NewSchedulingQueue(nil), test.predicates, factory.GetPredicateMetadata, nil, @@ -2160,7 +2160,7 @@ func TestPreempt(t *testing.T) { } scheduler := NewGenericScheduler( cache, - internalqueue.NewSchedulingQueue(nil, nil), + internalqueue.NewSchedulingQueue(nil), map[string]algorithmpredicates.FitPredicate{"matches": predicate}, predMetaProducer, []priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}}, diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index 934c131797c..acf68a651f7 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -266,7 +266,6 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e } podQueue := internalqueue.NewSchedulingQueue( - c.StopEverything, framework, internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second), internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second), @@ -281,11 +280,6 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e ) debugger.ListenForSignal(c.StopEverything) - go func() { - <-c.StopEverything - podQueue.Close() - }() - algo := core.NewGenericScheduler( c.schedulerCache, podQueue, diff --git a/pkg/scheduler/factory_test.go b/pkg/scheduler/factory_test.go index df2995ed829..881bd8411c9 100644 --- a/pkg/scheduler/factory_test.go +++ b/pkg/scheduler/factory_test.go @@ -319,7 +319,7 @@ func TestDefaultErrorFunc(t *testing.T) { defer close(stopCh) timestamp := time.Now() - queue := internalqueue.NewPriorityQueue(nil, nil, internalqueue.WithClock(clock.NewFakeClock(timestamp))) + queue := internalqueue.NewPriorityQueue(nil, internalqueue.WithClock(clock.NewFakeClock(timestamp))) schedulerCache := internalcache.New(30*time.Second, stopCh) errFunc := MakeDefaultErrorFunc(client, queue, schedulerCache) diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 74e25c1b0cd..de3a8cf1a6c 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -97,11 +97,13 @@ type SchedulingQueue interface { DeleteNominatedPodIfExists(pod *v1.Pod) // NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue. NumUnschedulablePods() int + // Run starts the goroutines managing the queue. + Run() } // NewSchedulingQueue initializes a priority queue as a new scheduling queue. -func NewSchedulingQueue(stop <-chan struct{}, fwk framework.Framework, opts ...Option) SchedulingQueue { - return NewPriorityQueue(stop, fwk, opts...) +func NewSchedulingQueue(fwk framework.Framework, opts ...Option) SchedulingQueue { + return NewPriorityQueue(fwk, opts...) } // NominatedNodeName returns nominated node name of a Pod. @@ -117,7 +119,7 @@ func NominatedNodeName(pod *v1.Pod) string { // is called unschedulableQ. The third queue holds pods that are moved from // unschedulable queues and will be moved to active queue when backoff are completed. type PriorityQueue struct { - stop <-chan struct{} + stop chan struct{} clock util.Clock // podBackoff tracks backoff for pods attempting to be rescheduled podBackoff *PodBackoffMap @@ -209,7 +211,6 @@ func activeQComp(podInfo1, podInfo2 interface{}) bool { // NewPriorityQueue creates a PriorityQueue object. func NewPriorityQueue( - stop <-chan struct{}, fwk framework.Framework, opts ...Option, ) *PriorityQueue { @@ -232,7 +233,7 @@ func NewPriorityQueue( pq := &PriorityQueue{ clock: options.clock, - stop: stop, + stop: make(chan struct{}), podBackoff: NewPodBackoffMap(options.podInitialBackoffDuration, options.podMaxBackoffDuration), activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()), unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()), @@ -242,13 +243,11 @@ func NewPriorityQueue( pq.cond.L = &pq.lock pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder()) - pq.run() - return pq } -// run starts the goroutine to pump from podBackoffQ to activeQ -func (p *PriorityQueue) run() { +// Run starts the goroutine to pump from podBackoffQ to activeQ +func (p *PriorityQueue) Run() { go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop) go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop) } @@ -636,6 +635,7 @@ func (p *PriorityQueue) PendingPods() []*v1.Pod { func (p *PriorityQueue) Close() { p.lock.Lock() defer p.lock.Unlock() + close(p.stop) p.closed = true p.cond.Broadcast() } diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index f5dd01604a9..aa634a96f0c 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -129,7 +129,7 @@ func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod { } func TestPriorityQueue_Add(t *testing.T) { - q := NewPriorityQueue(nil, nil) + q := createAndRunPriorityQueue(nil) if err := q.Add(&medPriorityPod); err != nil { t.Errorf("add failed: %v", err) } @@ -259,7 +259,7 @@ func (*fakeFramework) SnapshotSharedLister() schedulerlisters.SharedLister { } func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) { - q := NewPriorityQueue(nil, &fakeFramework{}) + q := createAndRunPriorityQueue(&fakeFramework{}) if err := q.Add(&medPriorityPod); err != nil { t.Errorf("add failed: %v", err) } @@ -275,7 +275,7 @@ func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) { } func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { - q := NewPriorityQueue(nil, nil) + q := createAndRunPriorityQueue(nil) q.Add(&highPriNominatedPod) q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(&highPriNominatedPod), q.SchedulingCycle()) // Must not add anything. q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(&unschedulablePod), q.SchedulingCycle()) @@ -307,7 +307,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { // Pods in and before current scheduling cycle will be put back to activeQueue // if we were trying to schedule them when we received move request. func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) { - q := NewPriorityQueue(nil, nil, WithClock(clock.NewFakeClock(time.Now()))) + q := createAndRunPriorityQueue(nil, WithClock(clock.NewFakeClock(time.Now()))) totalNum := 10 expectedPods := make([]v1.Pod, 0, totalNum) for i := 0; i < totalNum; i++ { @@ -374,7 +374,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) { } func TestPriorityQueue_Pop(t *testing.T) { - q := NewPriorityQueue(nil, nil) + q := createAndRunPriorityQueue(nil) wg := sync.WaitGroup{} wg.Add(1) go func() { @@ -391,7 +391,7 @@ func TestPriorityQueue_Pop(t *testing.T) { } func TestPriorityQueue_Update(t *testing.T) { - q := NewPriorityQueue(nil, nil) + q := createAndRunPriorityQueue(nil) q.Update(nil, &highPriorityPod) if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&highPriorityPod)); !exists { t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name) @@ -427,7 +427,7 @@ func TestPriorityQueue_Update(t *testing.T) { } func TestPriorityQueue_Delete(t *testing.T) { - q := NewPriorityQueue(nil, nil) + q := createAndRunPriorityQueue(nil) q.Update(&highPriorityPod, &highPriNominatedPod) q.Add(&unschedulablePod) if err := q.Delete(&highPriNominatedPod); err != nil { @@ -451,7 +451,7 @@ func TestPriorityQueue_Delete(t *testing.T) { } func TestPriorityQueue_MoveAllToActiveQueue(t *testing.T) { - q := NewPriorityQueue(nil, nil) + q := createAndRunPriorityQueue(nil) q.Add(&medPriorityPod) addOrUpdateUnschedulablePod(q, q.newPodInfo(&unschedulablePod)) addOrUpdateUnschedulablePod(q, q.newPodInfo(&highPriorityPod)) @@ -497,7 +497,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { Spec: v1.PodSpec{NodeName: "machine1"}, } - q := NewPriorityQueue(nil, nil) + q := createAndRunPriorityQueue(nil) q.Add(&medPriorityPod) // Add a couple of pods to the unschedulableQ. addOrUpdateUnschedulablePod(q, q.newPodInfo(&unschedulablePod)) @@ -518,7 +518,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { } func TestPriorityQueue_NominatedPodsForNode(t *testing.T) { - q := NewPriorityQueue(nil, nil) + q := createAndRunPriorityQueue(nil) q.Add(&medPriorityPod) q.Add(&unschedulablePod) q.Add(&highPriorityPod) @@ -543,7 +543,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) { return pendingSet } - q := NewPriorityQueue(nil, nil) + q := createAndRunPriorityQueue(nil) q.Add(&medPriorityPod) addOrUpdateUnschedulablePod(q, q.newPodInfo(&unschedulablePod)) addOrUpdateUnschedulablePod(q, q.newPodInfo(&highPriorityPod)) @@ -559,7 +559,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) { } func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { - q := NewPriorityQueue(nil, nil) + q := createAndRunPriorityQueue(nil) if err := q.Add(&medPriorityPod); err != nil { t.Errorf("add failed: %v", err) } @@ -628,8 +628,7 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { } func TestPriorityQueue_NewWithOptions(t *testing.T) { - q := NewPriorityQueue( - nil, + q := createAndRunPriorityQueue( nil, WithPodInitialBackoffDuration(2*time.Second), WithPodMaxBackoffDuration(20*time.Second), @@ -802,7 +801,7 @@ func TestSchedulingQueue_Close(t *testing.T) { }{ { name: "PriorityQueue close", - q: NewPriorityQueue(nil, nil), + q: createAndRunPriorityQueue(nil), expectedErr: fmt.Errorf(queueClosed), }, } @@ -831,7 +830,7 @@ func TestSchedulingQueue_Close(t *testing.T) { // ensures that an unschedulable pod does not block head of the queue when there // are frequent events that move pods to the active queue. func TestRecentlyTriedPodsGoBack(t *testing.T) { - q := NewPriorityQueue(nil, nil) + q := createAndRunPriorityQueue(nil) // Add a few pods to priority queue. for i := 0; i < 5; i++ { p := v1.Pod{ @@ -885,7 +884,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) { // This behavior ensures that an unschedulable pod does not block head of the queue when there // are frequent events that move pods to the active queue. func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { - q := NewPriorityQueue(nil, nil) + q := createAndRunPriorityQueue(nil) // Add an unschedulable pod to a priority queue. // This makes a situation that the pod was tried to schedule @@ -976,7 +975,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { // TestHighPriorityBackoff tests that a high priority pod does not block // other pods if it is unschedulable func TestHighPriorityBackoff(t *testing.T) { - q := NewPriorityQueue(nil, nil) + q := createAndRunPriorityQueue(nil) midPod := v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -1039,7 +1038,7 @@ func TestHighPriorityBackoff(t *testing.T) { // TestHighPriorityFlushUnschedulableQLeftover tests that pods will be moved to // activeQ after one minutes if it is in unschedulableQ func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) { - q := NewPriorityQueue(nil, nil) + q := createAndRunPriorityQueue(nil) midPod := v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "test-midpod", @@ -1236,7 +1235,7 @@ func TestPodTimestamp(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - queue := NewPriorityQueue(nil, nil, WithClock(clock.NewFakeClock(timestamp))) + queue := createAndRunPriorityQueue(nil, WithClock(clock.NewFakeClock(timestamp))) var podInfoList []*framework.PodInfo for i, op := range test.operations { @@ -1403,7 +1402,7 @@ scheduler_pending_pods{queue="unschedulable"} 0 for _, test := range tests { t.Run(test.name, func(t *testing.T) { resetMetrics() - queue := NewPriorityQueue(nil, nil, WithClock(clock.NewFakeClock(timestamp))) + queue := createAndRunPriorityQueue(nil, WithClock(clock.NewFakeClock(timestamp))) for i, op := range test.operations { for _, pInfo := range test.operands[i] { op(queue, pInfo) @@ -1432,7 +1431,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) { // Case 1: A pod is created and scheduled after 1 attempt. The queue operations are // Add -> Pop. c := clock.NewFakeClock(timestamp) - queue := NewPriorityQueue(nil, nil, WithClock(c)) + queue := createAndRunPriorityQueue(nil, WithClock(c)) queue.Add(pod) pInfo, err := queue.Pop() if err != nil { @@ -1443,7 +1442,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) { // Case 2: A pod is created and scheduled after 2 attempts. The queue operations are // Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Pop. c = clock.NewFakeClock(timestamp) - queue = NewPriorityQueue(nil, nil, WithClock(c)) + queue = createAndRunPriorityQueue(nil, WithClock(c)) queue.Add(pod) pInfo, err = queue.Pop() if err != nil { @@ -1463,7 +1462,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) { // Case 3: Similar to case 2, but before the second pop, call update, the queue operations are // Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Update -> Pop. c = clock.NewFakeClock(timestamp) - queue = NewPriorityQueue(nil, nil, WithClock(c)) + queue = createAndRunPriorityQueue(nil, WithClock(c)) queue.Add(pod) pInfo, err = queue.Pop() if err != nil { @@ -1561,9 +1560,9 @@ func TestIncomingPodsMetrics(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { metrics.SchedulerQueueIncomingPods.Reset() - stop := make(chan struct{}) - close(stop) // Stop the periodic flush - queue := NewPriorityQueue(stop, nil, WithClock(clock.NewFakeClock(timestamp))) + queue := NewPriorityQueue(nil, WithClock(clock.NewFakeClock(timestamp))) + queue.Close() + queue.Run() for _, op := range test.operations { for _, pInfo := range pInfos { op(queue, pInfo) @@ -1586,3 +1585,9 @@ func checkPerPodSchedulingMetrics(name string, t *testing.T, pInfo *framework.Po t.Errorf("[%s] Pod initial schedule attempt timestamp unexpected, got %v, want %v", name, pInfo.InitialAttemptTimestamp, wantInitialAttemptTs) } } + +func createAndRunPriorityQueue(fwk framework.Framework, opts ...Option) *PriorityQueue { + q := NewPriorityQueue(fwk, opts...) + q.Run() + return q +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 4e1d522145c..3d8afb6277c 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -420,8 +420,9 @@ func (sched *Scheduler) Run(ctx context.Context) { if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) { return } - + sched.SchedulingQueue.Run() wait.UntilWithContext(ctx, sched.scheduleOne, 0) + sched.SchedulingQueue.Close() } // recordFailedSchedulingEvent records an event for the pod that indicates the diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 5075710359a..855ad601a2c 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -646,7 +646,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, recorder events.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) { algo := core.NewGenericScheduler( scache, - internalqueue.NewSchedulingQueue(nil, nil), + internalqueue.NewSchedulingQueue(nil), predicateMap, predicates.EmptyMetadataProducer, []priorities.PriorityConfig{}, @@ -695,9 +695,10 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C } func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) { + queue := internalqueue.NewSchedulingQueue(nil) algo := core.NewGenericScheduler( scache, - internalqueue.NewSchedulingQueue(nil, nil), + queue, predicateMap, predicates.EmptyMetadataProducer, []priorities.PriorityConfig{}, @@ -740,6 +741,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc StopEverything: stop, Framework: emptyFramework, VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}), + SchedulingQueue: queue, } return sched, bindingChan