clean up the implementation around QueueingHintFn

This commit is contained in:
Kensei Nakada 2023-07-04 15:00:12 +00:00
parent 2f563464bc
commit be0db3f93d
5 changed files with 45 additions and 27 deletions

View File

@ -551,17 +551,7 @@ func (p *defaultEnqueueExtension) EventsToRegister() []framework.ClusterEventWit
// because the returning values are used to register event handlers. // because the returning values are used to register event handlers.
// If we return the wildcard here, it won't affect the event handlers registered by the plugin // If we return the wildcard here, it won't affect the event handlers registered by the plugin
// and some events may not be registered in the event handlers. // and some events may not be registered in the event handlers.
return []framework.ClusterEventWithHint{ return framework.UnrollWildCardResource()
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}},
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.All}},
{Event: framework.ClusterEvent{Resource: framework.CSINode, ActionType: framework.All}},
{Event: framework.ClusterEvent{Resource: framework.CSIDriver, ActionType: framework.All}},
{Event: framework.ClusterEvent{Resource: framework.CSIStorageCapacity, ActionType: framework.All}},
{Event: framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.All}},
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.All}},
{Event: framework.ClusterEvent{Resource: framework.StorageClass, ActionType: framework.All}},
{Event: framework.ClusterEvent{Resource: framework.PodSchedulingContext, ActionType: framework.All}},
}
} }
func updatePluginList(pluginList interface{}, pluginSet config.PluginSet, pluginsMap map[string]framework.Plugin) error { func updatePluginList(pluginList interface{}, pluginSet config.PluginSet, pluginsMap map[string]framework.Plugin) error {

View File

@ -107,7 +107,7 @@ const (
QueueSkip QueueingHint = iota QueueSkip QueueingHint = iota
// QueueAfterBackoff implies that the Pod may be schedulable by the event, // QueueAfterBackoff implies that the Pod may be schedulable by the event,
// and worth retring the scheduling again after backoff. // and worth retrying the scheduling again after backoff.
QueueAfterBackoff QueueAfterBackoff
// QueueImmediately is returned only when it is highly possible that the Pod gets scheduled in the next scheduling. // QueueImmediately is returned only when it is highly possible that the Pod gets scheduled in the next scheduling.
@ -147,6 +147,20 @@ func (ce ClusterEvent) IsWildCard() bool {
return ce.Resource == WildCard && ce.ActionType == All return ce.Resource == WildCard && ce.ActionType == All
} }
func UnrollWildCardResource() []ClusterEventWithHint {
return []ClusterEventWithHint{
{Event: ClusterEvent{Resource: Pod, ActionType: All}},
{Event: ClusterEvent{Resource: Node, ActionType: All}},
{Event: ClusterEvent{Resource: CSINode, ActionType: All}},
{Event: ClusterEvent{Resource: CSIDriver, ActionType: All}},
{Event: ClusterEvent{Resource: CSIStorageCapacity, ActionType: All}},
{Event: ClusterEvent{Resource: PersistentVolume, ActionType: All}},
{Event: ClusterEvent{Resource: PersistentVolumeClaim, ActionType: All}},
{Event: ClusterEvent{Resource: StorageClass, ActionType: All}},
{Event: ClusterEvent{Resource: PodSchedulingContext, ActionType: All}},
}
}
// QueuedPodInfo is a Pod wrapper with additional information related to // QueuedPodInfo is a Pod wrapper with additional information related to
// the pod's status in the scheduling queue, such as the timestamp when // the pod's status in the scheduling queue, such as the timestamp when
// it's added to the queue. // it's added to the queue.

View File

@ -389,18 +389,17 @@ func (p *PriorityQueue) isPodWorthRequeuing(logger klog.Logger, pInfo *framework
continue continue
} }
h := hintfn.QueueingHintFn(pod, oldObj, newObj) switch h := hintfn.QueueingHintFn(pod, oldObj, newObj); h {
if h == framework.QueueSkip { case framework.QueueSkip:
continue continue
} case framework.QueueImmediately:
if h == framework.QueueImmediately {
return h return h
case framework.QueueAfterBackoff:
// replace queueHint with the returned value,
// but continue to other queueHintFn to check because other plugins may want to return QueueImmediately.
queueHint = h
} }
// replace queueHint with the returned value,
// but continue to other queueHintFn to check because other plugins may want to return QueueImmediately.
queueHint = h
} }
} }
@ -855,7 +854,7 @@ func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *fra
} }
pod := pInfo.Pod pod := pInfo.Pod
if p.isPodBackingoff(pInfo) && schedulingHint == framework.QueueAfterBackoff { if schedulingHint == framework.QueueAfterBackoff && p.isPodBackingoff(pInfo) {
if err := p.podBackoffQ.Add(pInfo); err != nil { if err := p.podBackoffQ.Add(pInfo); err != nil {
logger.Error(err, "Error adding pod to the backoff queue, queue this Pod to unschedulable pod pool", "pod", klog.KObj(pod)) logger.Error(err, "Error adding pod to the backoff queue, queue this Pod to unschedulable pod pool", "pod", klog.KObj(pod))
p.unschedulablePods.addOrUpdate(pInfo) p.unschedulablePods.addOrUpdate(pInfo)
@ -866,10 +865,20 @@ func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *fra
return backoffQ return backoffQ
} }
if added, _ := p.addToActiveQ(logger, pInfo); added { // Reach here if schedulingHint is QueueImmediately, or schedulingHint is QueueAfterBackoff but the pod is not backing off.
added, err := p.addToActiveQ(logger, pInfo)
if err != nil {
logger.Error(err, "Error adding pod to the active queue, queue this Pod to unschedulable pod pool", "pod", klog.KObj(pod))
}
if added {
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc() metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc()
return activeQ return activeQ
} }
if pInfo.Gated {
// In case the pod is gated, the Pod is pushed back to unschedulable Pods pool in addToActiveQ.
return unschedulablePods
}
p.unschedulablePods.addOrUpdate(pInfo) p.unschedulablePods.addOrUpdate(pInfo)
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc() metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()

View File

@ -366,9 +366,15 @@ var defaultQueueingHintFn = func(_ *v1.Pod, _, _ interface{}) framework.Queueing
} }
func buildQueueingHintMap(es []framework.EnqueueExtensions) internalqueue.QueueingHintMap { func buildQueueingHintMap(es []framework.EnqueueExtensions) internalqueue.QueueingHintMap {
queueingHintMap := make(map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction) queueingHintMap := make(internalqueue.QueueingHintMap)
for _, e := range es { for _, e := range es {
events := e.EventsToRegister() events := e.EventsToRegister()
// Note: Rarely, a plugin implements EnqueueExtensions but returns nil.
// We treat it as: the plugin is not interested in any event, and hence pod failed by that plugin
// cannot be moved by any regular cluster event.
// So, we can just ignore such EventsToRegister here.
for _, event := range events { for _, event := range events {
fn := event.QueueingHintFn fn := event.QueueingHintFn
if fn == nil { if fn == nil {

View File

@ -657,10 +657,9 @@ const (
func Test_buildQueueingHintMap(t *testing.T) { func Test_buildQueueingHintMap(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
plugins []framework.Plugin plugins []framework.Plugin
want map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction want map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction
assertFn func(t *testing.T, got map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction) bool
}{ }{
{ {
name: "no-op plugin", name: "no-op plugin",