mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-07 11:13:48 +00:00
Merge pull request #119077 from sanposhiho/follow-up-hint
clean up the implementation around QueueingHintFn
This commit is contained in:
commit
aeed7da616
@ -569,17 +569,7 @@ func (p *defaultEnqueueExtension) EventsToRegister() []framework.ClusterEventWit
|
|||||||
// because the returning values are used to register event handlers.
|
// because the returning values are used to register event handlers.
|
||||||
// If we return the wildcard here, it won't affect the event handlers registered by the plugin
|
// If we return the wildcard here, it won't affect the event handlers registered by the plugin
|
||||||
// and some events may not be registered in the event handlers.
|
// and some events may not be registered in the event handlers.
|
||||||
return []framework.ClusterEventWithHint{
|
return framework.UnrollWildCardResource()
|
||||||
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}},
|
|
||||||
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.All}},
|
|
||||||
{Event: framework.ClusterEvent{Resource: framework.CSINode, ActionType: framework.All}},
|
|
||||||
{Event: framework.ClusterEvent{Resource: framework.CSIDriver, ActionType: framework.All}},
|
|
||||||
{Event: framework.ClusterEvent{Resource: framework.CSIStorageCapacity, ActionType: framework.All}},
|
|
||||||
{Event: framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.All}},
|
|
||||||
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.All}},
|
|
||||||
{Event: framework.ClusterEvent{Resource: framework.StorageClass, ActionType: framework.All}},
|
|
||||||
{Event: framework.ClusterEvent{Resource: framework.PodSchedulingContext, ActionType: framework.All}},
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func updatePluginList(pluginList interface{}, pluginSet config.PluginSet, pluginsMap map[string]framework.Plugin) error {
|
func updatePluginList(pluginList interface{}, pluginSet config.PluginSet, pluginsMap map[string]framework.Plugin) error {
|
||||||
|
@ -107,7 +107,7 @@ const (
|
|||||||
QueueSkip QueueingHint = iota
|
QueueSkip QueueingHint = iota
|
||||||
|
|
||||||
// QueueAfterBackoff implies that the Pod may be schedulable by the event,
|
// QueueAfterBackoff implies that the Pod may be schedulable by the event,
|
||||||
// and worth retring the scheduling again after backoff.
|
// and worth retrying the scheduling again after backoff.
|
||||||
QueueAfterBackoff
|
QueueAfterBackoff
|
||||||
|
|
||||||
// QueueImmediately is returned only when it is highly possible that the Pod gets scheduled in the next scheduling.
|
// QueueImmediately is returned only when it is highly possible that the Pod gets scheduled in the next scheduling.
|
||||||
@ -147,6 +147,20 @@ func (ce ClusterEvent) IsWildCard() bool {
|
|||||||
return ce.Resource == WildCard && ce.ActionType == All
|
return ce.Resource == WildCard && ce.ActionType == All
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func UnrollWildCardResource() []ClusterEventWithHint {
|
||||||
|
return []ClusterEventWithHint{
|
||||||
|
{Event: ClusterEvent{Resource: Pod, ActionType: All}},
|
||||||
|
{Event: ClusterEvent{Resource: Node, ActionType: All}},
|
||||||
|
{Event: ClusterEvent{Resource: CSINode, ActionType: All}},
|
||||||
|
{Event: ClusterEvent{Resource: CSIDriver, ActionType: All}},
|
||||||
|
{Event: ClusterEvent{Resource: CSIStorageCapacity, ActionType: All}},
|
||||||
|
{Event: ClusterEvent{Resource: PersistentVolume, ActionType: All}},
|
||||||
|
{Event: ClusterEvent{Resource: PersistentVolumeClaim, ActionType: All}},
|
||||||
|
{Event: ClusterEvent{Resource: StorageClass, ActionType: All}},
|
||||||
|
{Event: ClusterEvent{Resource: PodSchedulingContext, ActionType: All}},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// QueuedPodInfo is a Pod wrapper with additional information related to
|
// QueuedPodInfo is a Pod wrapper with additional information related to
|
||||||
// the pod's status in the scheduling queue, such as the timestamp when
|
// the pod's status in the scheduling queue, such as the timestamp when
|
||||||
// it's added to the queue.
|
// it's added to the queue.
|
||||||
|
@ -389,18 +389,17 @@ func (p *PriorityQueue) isPodWorthRequeuing(logger klog.Logger, pInfo *framework
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
h := hintfn.QueueingHintFn(pod, oldObj, newObj)
|
switch h := hintfn.QueueingHintFn(pod, oldObj, newObj); h {
|
||||||
if h == framework.QueueSkip {
|
case framework.QueueSkip:
|
||||||
continue
|
continue
|
||||||
}
|
case framework.QueueImmediately:
|
||||||
|
|
||||||
if h == framework.QueueImmediately {
|
|
||||||
return h
|
return h
|
||||||
|
case framework.QueueAfterBackoff:
|
||||||
|
// replace queueHint with the returned value,
|
||||||
|
// but continue to other queueHintFn to check because other plugins may want to return QueueImmediately.
|
||||||
|
queueHint = h
|
||||||
}
|
}
|
||||||
|
|
||||||
// replace queueHint with the returned value,
|
|
||||||
// but continue to other queueHintFn to check because other plugins may want to return QueueImmediately.
|
|
||||||
queueHint = h
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -855,7 +854,7 @@ func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *fra
|
|||||||
}
|
}
|
||||||
|
|
||||||
pod := pInfo.Pod
|
pod := pInfo.Pod
|
||||||
if p.isPodBackingoff(pInfo) && schedulingHint == framework.QueueAfterBackoff {
|
if schedulingHint == framework.QueueAfterBackoff && p.isPodBackingoff(pInfo) {
|
||||||
if err := p.podBackoffQ.Add(pInfo); err != nil {
|
if err := p.podBackoffQ.Add(pInfo); err != nil {
|
||||||
logger.Error(err, "Error adding pod to the backoff queue, queue this Pod to unschedulable pod pool", "pod", klog.KObj(pod))
|
logger.Error(err, "Error adding pod to the backoff queue, queue this Pod to unschedulable pod pool", "pod", klog.KObj(pod))
|
||||||
p.unschedulablePods.addOrUpdate(pInfo)
|
p.unschedulablePods.addOrUpdate(pInfo)
|
||||||
@ -866,10 +865,20 @@ func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *fra
|
|||||||
return backoffQ
|
return backoffQ
|
||||||
}
|
}
|
||||||
|
|
||||||
if added, _ := p.addToActiveQ(logger, pInfo); added {
|
// Reach here if schedulingHint is QueueImmediately, or schedulingHint is QueueAfterBackoff but the pod is not backing off.
|
||||||
|
|
||||||
|
added, err := p.addToActiveQ(logger, pInfo)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(err, "Error adding pod to the active queue, queue this Pod to unschedulable pod pool", "pod", klog.KObj(pod))
|
||||||
|
}
|
||||||
|
if added {
|
||||||
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc()
|
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc()
|
||||||
return activeQ
|
return activeQ
|
||||||
}
|
}
|
||||||
|
if pInfo.Gated {
|
||||||
|
// In case the pod is gated, the Pod is pushed back to unschedulable Pods pool in addToActiveQ.
|
||||||
|
return unschedulablePods
|
||||||
|
}
|
||||||
|
|
||||||
p.unschedulablePods.addOrUpdate(pInfo)
|
p.unschedulablePods.addOrUpdate(pInfo)
|
||||||
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()
|
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()
|
||||||
|
@ -366,9 +366,15 @@ var defaultQueueingHintFn = func(_ *v1.Pod, _, _ interface{}) framework.Queueing
|
|||||||
}
|
}
|
||||||
|
|
||||||
func buildQueueingHintMap(es []framework.EnqueueExtensions) internalqueue.QueueingHintMap {
|
func buildQueueingHintMap(es []framework.EnqueueExtensions) internalqueue.QueueingHintMap {
|
||||||
queueingHintMap := make(map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction)
|
queueingHintMap := make(internalqueue.QueueingHintMap)
|
||||||
for _, e := range es {
|
for _, e := range es {
|
||||||
events := e.EventsToRegister()
|
events := e.EventsToRegister()
|
||||||
|
|
||||||
|
// Note: Rarely, a plugin implements EnqueueExtensions but returns nil.
|
||||||
|
// We treat it as: the plugin is not interested in any event, and hence pod failed by that plugin
|
||||||
|
// cannot be moved by any regular cluster event.
|
||||||
|
// So, we can just ignore such EventsToRegister here.
|
||||||
|
|
||||||
for _, event := range events {
|
for _, event := range events {
|
||||||
fn := event.QueueingHintFn
|
fn := event.QueueingHintFn
|
||||||
if fn == nil {
|
if fn == nil {
|
||||||
|
@ -657,10 +657,9 @@ const (
|
|||||||
|
|
||||||
func Test_buildQueueingHintMap(t *testing.T) {
|
func Test_buildQueueingHintMap(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
plugins []framework.Plugin
|
plugins []framework.Plugin
|
||||||
want map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction
|
want map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction
|
||||||
assertFn func(t *testing.T, got map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction) bool
|
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "no-op plugin",
|
name: "no-op plugin",
|
||||||
|
Loading…
Reference in New Issue
Block a user