From be0db3f93d752ee157355e0fbde837ae5256ca76 Mon Sep 17 00:00:00 2001 From: Kensei Nakada Date: Tue, 4 Jul 2023 15:00:12 +0000 Subject: [PATCH] clean up the implementation around QueueingHintFn --- pkg/scheduler/framework/runtime/framework.go | 12 +------- pkg/scheduler/framework/types.go | 16 +++++++++- .../internal/queue/scheduling_queue.go | 29 ++++++++++++------- pkg/scheduler/scheduler.go | 8 ++++- pkg/scheduler/scheduler_test.go | 7 ++--- 5 files changed, 45 insertions(+), 27 deletions(-) diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 9986abf3850..a9b7e639a82 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -551,17 +551,7 @@ func (p *defaultEnqueueExtension) EventsToRegister() []framework.ClusterEventWit // because the returning values are used to register event handlers. // If we return the wildcard here, it won't affect the event handlers registered by the plugin // and some events may not be registered in the event handlers. - return []framework.ClusterEventWithHint{ - {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}}, - {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.All}}, - {Event: framework.ClusterEvent{Resource: framework.CSINode, ActionType: framework.All}}, - {Event: framework.ClusterEvent{Resource: framework.CSIDriver, ActionType: framework.All}}, - {Event: framework.ClusterEvent{Resource: framework.CSIStorageCapacity, ActionType: framework.All}}, - {Event: framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.All}}, - {Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.All}}, - {Event: framework.ClusterEvent{Resource: framework.StorageClass, ActionType: framework.All}}, - {Event: framework.ClusterEvent{Resource: framework.PodSchedulingContext, ActionType: framework.All}}, - } + return framework.UnrollWildCardResource() } func updatePluginList(pluginList interface{}, pluginSet config.PluginSet, pluginsMap map[string]framework.Plugin) error { diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index acfd16f120c..0026691177d 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -107,7 +107,7 @@ const ( QueueSkip QueueingHint = iota // QueueAfterBackoff implies that the Pod may be schedulable by the event, - // and worth retring the scheduling again after backoff. + // and worth retrying the scheduling again after backoff. QueueAfterBackoff // QueueImmediately is returned only when it is highly possible that the Pod gets scheduled in the next scheduling. @@ -147,6 +147,20 @@ func (ce ClusterEvent) IsWildCard() bool { return ce.Resource == WildCard && ce.ActionType == All } +func UnrollWildCardResource() []ClusterEventWithHint { + return []ClusterEventWithHint{ + {Event: ClusterEvent{Resource: Pod, ActionType: All}}, + {Event: ClusterEvent{Resource: Node, ActionType: All}}, + {Event: ClusterEvent{Resource: CSINode, ActionType: All}}, + {Event: ClusterEvent{Resource: CSIDriver, ActionType: All}}, + {Event: ClusterEvent{Resource: CSIStorageCapacity, ActionType: All}}, + {Event: ClusterEvent{Resource: PersistentVolume, ActionType: All}}, + {Event: ClusterEvent{Resource: PersistentVolumeClaim, ActionType: All}}, + {Event: ClusterEvent{Resource: StorageClass, ActionType: All}}, + {Event: ClusterEvent{Resource: PodSchedulingContext, ActionType: All}}, + } +} + // QueuedPodInfo is a Pod wrapper with additional information related to // the pod's status in the scheduling queue, such as the timestamp when // it's added to the queue. diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 55cc52c4c3a..b15cc1cd54a 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -389,18 +389,17 @@ func (p *PriorityQueue) isPodWorthRequeuing(logger klog.Logger, pInfo *framework continue } - h := hintfn.QueueingHintFn(pod, oldObj, newObj) - if h == framework.QueueSkip { + switch h := hintfn.QueueingHintFn(pod, oldObj, newObj); h { + case framework.QueueSkip: continue - } - - if h == framework.QueueImmediately { + case framework.QueueImmediately: return h + case framework.QueueAfterBackoff: + // replace queueHint with the returned value, + // but continue to other queueHintFn to check because other plugins may want to return QueueImmediately. + queueHint = h } - // replace queueHint with the returned value, - // but continue to other queueHintFn to check because other plugins may want to return QueueImmediately. - queueHint = h } } @@ -855,7 +854,7 @@ func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *fra } pod := pInfo.Pod - if p.isPodBackingoff(pInfo) && schedulingHint == framework.QueueAfterBackoff { + if schedulingHint == framework.QueueAfterBackoff && p.isPodBackingoff(pInfo) { if err := p.podBackoffQ.Add(pInfo); err != nil { logger.Error(err, "Error adding pod to the backoff queue, queue this Pod to unschedulable pod pool", "pod", klog.KObj(pod)) p.unschedulablePods.addOrUpdate(pInfo) @@ -866,10 +865,20 @@ func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *fra return backoffQ } - if added, _ := p.addToActiveQ(logger, pInfo); added { + // Reach here if schedulingHint is QueueImmediately, or schedulingHint is QueueAfterBackoff but the pod is not backing off. + + added, err := p.addToActiveQ(logger, pInfo) + if err != nil { + logger.Error(err, "Error adding pod to the active queue, queue this Pod to unschedulable pod pool", "pod", klog.KObj(pod)) + } + if added { metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc() return activeQ } + if pInfo.Gated { + // In case the pod is gated, the Pod is pushed back to unschedulable Pods pool in addToActiveQ. + return unschedulablePods + } p.unschedulablePods.addOrUpdate(pInfo) metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc() diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 9809d548cd2..010851aac16 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -366,9 +366,15 @@ var defaultQueueingHintFn = func(_ *v1.Pod, _, _ interface{}) framework.Queueing } func buildQueueingHintMap(es []framework.EnqueueExtensions) internalqueue.QueueingHintMap { - queueingHintMap := make(map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction) + queueingHintMap := make(internalqueue.QueueingHintMap) for _, e := range es { events := e.EventsToRegister() + + // Note: Rarely, a plugin implements EnqueueExtensions but returns nil. + // We treat it as: the plugin is not interested in any event, and hence pod failed by that plugin + // cannot be moved by any regular cluster event. + // So, we can just ignore such EventsToRegister here. + for _, event := range events { fn := event.QueueingHintFn if fn == nil { diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 46114b8ec48..c3a37a694b7 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -657,10 +657,9 @@ const ( func Test_buildQueueingHintMap(t *testing.T) { tests := []struct { - name string - plugins []framework.Plugin - want map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction - assertFn func(t *testing.T, got map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction) bool + name string + plugins []framework.Plugin + want map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction }{ { name: "no-op plugin",