mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-04 23:17:50 +00:00
feature(scheduling_queue): track events per Pods (#118438)
* feature(sscheduling_queue): track events per Pods * fix typos * record events in one slice and make each in-flight Pod to refer it * fix: use Pop() in test before AddUnschedulableIfNotPresent to register in-flight Pods * eliminate MakeNextPodFuncs * call Done inside the scheduling queue * fix comment * implement done() not to require lock in it * fix UTs * improve the receivedEvents implementation based on suggestions * call DonePod when we don't call AddUnschedulableIfNotPresent * fix UT * use queuehint to filter out events for in-flight Pods * fix based on suggestion from aldo * fix based on suggestion from Wei * rename lastEventBefore → previousEvent * fix based on suggestion * address comments from aldo * fix based on the suggestion from Abdullah * gate in-flight Pods logic by the SchedulingQueueHints feature gate
This commit is contained in:
@@ -27,6 +27,7 @@ limitations under the License.
|
||||
package queue
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
@@ -104,6 +105,9 @@ type SchedulingQueue interface {
|
||||
// Pop removes the head of the queue and returns it. It blocks if the
|
||||
// queue is empty and waits until a new item is added to the queue.
|
||||
Pop() (*framework.QueuedPodInfo, error)
|
||||
// Done must be called for pod returned by Pop. This allows the queue to
|
||||
// keep track of which pods are currently being processed.
|
||||
Done(types.UID)
|
||||
Update(logger klog.Logger, oldPod, newPod *v1.Pod) error
|
||||
Delete(pod *v1.Pod) error
|
||||
// TODO(sanposhiho): move all PreEnqueueCkeck to Requeue and delete it from this parameter eventually.
|
||||
@@ -158,6 +162,13 @@ type PriorityQueue struct {
|
||||
|
||||
cond sync.Cond
|
||||
|
||||
// inFlightPods holds the UID of all pods which have been popped out for which Done
|
||||
// hasn't been called yet - in other words, all pods that are currently being
|
||||
// processed (being scheduled, in permit, or in the binding cycle).
|
||||
inFlightPods map[types.UID]inFlightPod
|
||||
// receivedEvents holds the events received by the scheduling queue.
|
||||
receivedEvents *list.List
|
||||
|
||||
// activeQ is heap structure that scheduler actively looks at to find pods to
|
||||
// schedule. Head of heap is the highest priority pod.
|
||||
activeQ *heap.Heap
|
||||
@@ -173,6 +184,7 @@ type PriorityQueue struct {
|
||||
// received a move request. Unschedulable pods in and before this scheduling
|
||||
// cycle will be put back to activeQueue if we were trying to schedule them
|
||||
// when we received move request.
|
||||
// TODO: this will be removed after SchedulingQueueHint goes to stable and the feature gate is removed.
|
||||
moveRequestCycle int64
|
||||
|
||||
// preEnqueuePluginMap is keyed with profile name, valued with registered preEnqueue plugins.
|
||||
@@ -189,6 +201,9 @@ type PriorityQueue struct {
|
||||
metricsRecorder metrics.MetricAsyncRecorder
|
||||
// pluginMetricsSamplePercent is the percentage of plugin metrics to be sampled.
|
||||
pluginMetricsSamplePercent int
|
||||
|
||||
// isSchedulingQueueHintEnabled indicates whether the feature gate for the scheduling queue is enabled.
|
||||
isSchedulingQueueHintEnabled bool
|
||||
}
|
||||
|
||||
// QueueingHintFunction is the wrapper of QueueingHintFn that has PluginName.
|
||||
@@ -197,6 +212,25 @@ type QueueingHintFunction struct {
|
||||
QueueingHintFn framework.QueueingHintFn
|
||||
}
|
||||
|
||||
type inFlightPod struct {
|
||||
// previousEvent is the latest observed event when the pod is popped.
|
||||
previousEvent *list.Element
|
||||
}
|
||||
|
||||
// clusterEvent has the event and involved objects.
|
||||
type clusterEvent struct {
|
||||
event framework.ClusterEvent
|
||||
// oldObj is the object that involved this event.
|
||||
oldObj interface{}
|
||||
// newObj is the object that involved this event.
|
||||
newObj interface{}
|
||||
|
||||
// inFlightPodsNum is the counter of pods referring to this cluster event.
|
||||
// It is initialized with the number of Pods being scheduled when the event is received,
|
||||
// and is decremented when the scheduling for those Pods are Done().
|
||||
inFlightPodsNum int
|
||||
}
|
||||
|
||||
type priorityQueueOptions struct {
|
||||
clock clock.Clock
|
||||
podInitialBackoffDuration time.Duration
|
||||
@@ -330,11 +364,14 @@ func NewPriorityQueue(
|
||||
podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration,
|
||||
activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
|
||||
unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()),
|
||||
moveRequestCycle: -1,
|
||||
inFlightPods: make(map[types.UID]inFlightPod),
|
||||
receivedEvents: list.New(),
|
||||
preEnqueuePluginMap: options.preEnqueuePluginMap,
|
||||
queueingHintMap: options.queueingHintMap,
|
||||
metricsRecorder: options.metricsRecorder,
|
||||
pluginMetricsSamplePercent: options.pluginMetricsSamplePercent,
|
||||
moveRequestCycle: -1,
|
||||
isSchedulingQueueHintEnabled: utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints),
|
||||
}
|
||||
pq.cond.L = &pq.lock
|
||||
pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
|
||||
@@ -567,25 +604,67 @@ func (p *PriorityQueue) SchedulingCycle() int64 {
|
||||
return p.schedulingCycle
|
||||
}
|
||||
|
||||
// AddUnschedulableIfNotPresent inserts a pod that cannot be scheduled into
|
||||
// determineSchedulingHintForInFlightPod looks at the unschedulable plugins of the given Pod
|
||||
// and determines the scheduling hint for this Pod while checking the events that happened during in-flight.
|
||||
func (p *PriorityQueue) determineSchedulingHintForInFlightPod(logger klog.Logger, pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) framework.QueueingHint {
|
||||
if len(pInfo.UnschedulablePlugins) == 0 {
|
||||
// When there is no unschedulable plugin, we cannot have a guess which event makes this Pod schedulable.
|
||||
// Here, we use the latest requestCycle so that this Pod won't be stuck in the unschedulable pod pool for a long time.
|
||||
if p.receivedEvents.Len() != 0 {
|
||||
return framework.QueueAfterBackoff
|
||||
}
|
||||
return framework.QueueSkip
|
||||
}
|
||||
|
||||
inFlightPod, ok := p.inFlightPods[pInfo.Pod.UID]
|
||||
if !ok {
|
||||
// It shouldn't reach here unless there is a bug somewhere.
|
||||
// But, set podSchedulingCycle to moveRequestCycle
|
||||
// so that this Pod won't stuck in the unschedulable pod pool.
|
||||
logger.Error(nil, "In flight Pod isn't found in the scheduling queue. If you see this error log, it's likely a bug in the scheduler.", "pod", klog.KObj(pInfo.Pod))
|
||||
return framework.QueueAfterBackoff
|
||||
}
|
||||
|
||||
// AddUnschedulableIfNotPresent is called with the Pod at the end of scheduling or binding.
|
||||
// So, given pInfo should have been Pop()ed before,
|
||||
// we can assume pInfo must be recorded in inFlightPods.
|
||||
// check if there is an event that makes this Pod schedulable based on pInfo.UnschedulablePlugins.
|
||||
event := p.receivedEvents.Front()
|
||||
if inFlightPod.previousEvent != nil {
|
||||
// only check events that happened after the Pod was popped.
|
||||
event = inFlightPod.previousEvent.Next()
|
||||
}
|
||||
schedulingHint := framework.QueueSkip
|
||||
for ; event != nil; event = event.Next() {
|
||||
e := event.Value.(*clusterEvent)
|
||||
|
||||
hint := p.isPodWorthRequeuing(logger, pInfo, e.event, e.oldObj, e.newObj)
|
||||
if hint == framework.QueueSkip {
|
||||
continue
|
||||
}
|
||||
|
||||
if hint == framework.QueueImmediately {
|
||||
// QueueImmediately is the strongest opinion, we don't need to check other events.
|
||||
schedulingHint = framework.QueueImmediately
|
||||
break
|
||||
}
|
||||
if hint == framework.QueueAfterBackoff {
|
||||
// replace schedulingHint with QueueAfterBackoff,
|
||||
// but continue to check other events because we may find it QueueImmediately with other events.
|
||||
schedulingHint = framework.QueueAfterBackoff
|
||||
}
|
||||
}
|
||||
return schedulingHint
|
||||
}
|
||||
|
||||
// addUnschedulableIfNotPresentWithoutQueueingHint inserts a pod that cannot be scheduled into
|
||||
// the queue, unless it is already in the queue. Normally, PriorityQueue puts
|
||||
// unschedulable pods in `unschedulablePods`. But if there has been a recent move
|
||||
// request, then the pod is put in `podBackoffQ`.
|
||||
func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
// TODO: This function is called only when p.isSchedulingQueueHintEnabled is false,
|
||||
// and this will be removed after SchedulingQueueHint goes to stable and the feature gate is removed.
|
||||
func (p *PriorityQueue) addUnschedulableWithoutQueueingHint(logger klog.Logger, pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error {
|
||||
pod := pInfo.Pod
|
||||
if p.unschedulablePods.get(pod) != nil {
|
||||
return fmt.Errorf("Pod %v is already present in unschedulable queue", klog.KObj(pod))
|
||||
}
|
||||
|
||||
if _, exists, _ := p.activeQ.Get(pInfo); exists {
|
||||
return fmt.Errorf("Pod %v is already present in the active queue", klog.KObj(pod))
|
||||
}
|
||||
if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
|
||||
return fmt.Errorf("Pod %v is already present in the backoff queue", klog.KObj(pod))
|
||||
}
|
||||
|
||||
// Refresh the timestamp since the pod is re-added.
|
||||
pInfo.Timestamp = p.clock.Now()
|
||||
|
||||
@@ -610,6 +689,54 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo *
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddUnschedulableIfNotPresent inserts a pod that cannot be scheduled into
|
||||
// the queue, unless it is already in the queue. Normally, PriorityQueue puts
|
||||
// unschedulable pods in `unschedulablePods`. But if there has been a recent move
|
||||
// request, then the pod is put in `podBackoffQ`.
|
||||
func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
// In any case, this Pod will be moved back to the queue and we should call Done.
|
||||
defer p.done(pInfo.Pod.UID)
|
||||
|
||||
pod := pInfo.Pod
|
||||
if p.unschedulablePods.get(pod) != nil {
|
||||
return fmt.Errorf("Pod %v is already present in unschedulable queue", klog.KObj(pod))
|
||||
}
|
||||
|
||||
if _, exists, _ := p.activeQ.Get(pInfo); exists {
|
||||
return fmt.Errorf("Pod %v is already present in the active queue", klog.KObj(pod))
|
||||
}
|
||||
if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
|
||||
return fmt.Errorf("Pod %v is already present in the backoff queue", klog.KObj(pod))
|
||||
}
|
||||
|
||||
if !p.isSchedulingQueueHintEnabled {
|
||||
// fall back to the old behavior which doesn't depend on the queueing hint.
|
||||
return p.addUnschedulableWithoutQueueingHint(logger, pInfo, podSchedulingCycle)
|
||||
}
|
||||
|
||||
// Refresh the timestamp since the pod is re-added.
|
||||
pInfo.Timestamp = p.clock.Now()
|
||||
|
||||
// If a move request has been received, move it to the BackoffQ, otherwise move
|
||||
// it to unschedulablePods.
|
||||
for plugin := range pInfo.UnschedulablePlugins {
|
||||
metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Inc()
|
||||
}
|
||||
|
||||
// Based on isPodWorthRequeuing(), we check whether this Pod may change its scheduling result by any of events that happened during scheduling.
|
||||
schedulingHint := p.determineSchedulingHintForInFlightPod(logger, pInfo, podSchedulingCycle)
|
||||
|
||||
// In this case, we try to requeue this Pod to activeQ/backoffQ.
|
||||
queue := p.requeuePodViaQueueingHint(logger, pInfo, schedulingHint, ScheduleAttemptFailure)
|
||||
logger.V(6).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", ScheduleAttemptFailure, "queue", queue, "schedulingCycle", podSchedulingCycle)
|
||||
|
||||
p.addNominatedPodUnlocked(logger, pInfo.PodInfo, nil)
|
||||
return nil
|
||||
}
|
||||
|
||||
// flushBackoffQCompleted Moves all pods from backoffQ which have completed backoff in to activeQ
|
||||
func (p *PriorityQueue) flushBackoffQCompleted(logger klog.Logger) {
|
||||
p.lock.Lock()
|
||||
@@ -685,9 +812,74 @@ func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {
|
||||
pInfo := obj.(*framework.QueuedPodInfo)
|
||||
pInfo.Attempts++
|
||||
p.schedulingCycle++
|
||||
// In flight, no move request yet.
|
||||
if p.isSchedulingQueueHintEnabled {
|
||||
p.inFlightPods[pInfo.Pod.UID] = inFlightPod{
|
||||
previousEvent: p.receivedEvents.Back(),
|
||||
}
|
||||
}
|
||||
|
||||
for plugin := range pInfo.UnschedulablePlugins {
|
||||
metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Dec()
|
||||
}
|
||||
|
||||
return pInfo, nil
|
||||
}
|
||||
|
||||
// Done must be called for pod returned by Pop. This allows the queue to
|
||||
// keep track of which pods are currently being processed.
|
||||
func (p *PriorityQueue) Done(pod types.UID) {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
p.done(pod)
|
||||
}
|
||||
|
||||
func (p *PriorityQueue) done(pod types.UID) {
|
||||
if !p.isSchedulingQueueHintEnabled {
|
||||
// do nothing if schedulingQueueHint is disabled.
|
||||
// In that case, we don't have inFlightPods and receivedEvents.
|
||||
return
|
||||
}
|
||||
inFlightPod, ok := p.inFlightPods[pod]
|
||||
if !ok {
|
||||
// This Pod is already done()ed.
|
||||
return
|
||||
}
|
||||
delete(p.inFlightPods, pod)
|
||||
|
||||
// remove events which is only referred from this Pod
|
||||
// so that the receivedEvents map doesn't grow infinitely.
|
||||
|
||||
// Find the event that we should start.
|
||||
// case1. If the previousEvent is nil, it means no receivedEvents when this Pod's scheduling started.
|
||||
// We start from the first event in the receivedEvents.
|
||||
// case2. If the previousEvent is not nil, but the inFlightPodsNum is 0,
|
||||
// this previousEvent is removed from the list already.
|
||||
// We start from the first event in the receivedEvents.
|
||||
event := p.receivedEvents.Front()
|
||||
if inFlightPod.previousEvent != nil && inFlightPod.previousEvent.Value.(*clusterEvent).inFlightPodsNum != 0 {
|
||||
// case3. If the previousEvent is not nil, and the inFlightPodsNum is not 0,
|
||||
// we can start from the next event of the previousEvent.
|
||||
event = inFlightPod.previousEvent.Next()
|
||||
}
|
||||
|
||||
for event != nil {
|
||||
e := event.Value.(*clusterEvent)
|
||||
// decrement inFlightPodsNum on events that happened after the Pod is popped.
|
||||
e.inFlightPodsNum--
|
||||
if e.inFlightPodsNum <= 0 {
|
||||
// remove the event from the list if no Pod refers to it.
|
||||
eventToDelete := event
|
||||
// we need to take next event before removal.
|
||||
event = event.Next()
|
||||
p.receivedEvents.Remove(eventToDelete)
|
||||
continue
|
||||
}
|
||||
event = event.Next()
|
||||
}
|
||||
}
|
||||
|
||||
// isPodUpdated checks if the pod is updated in a way that it may have become
|
||||
// schedulable. It drops status of the pod and compares it with old version,
|
||||
// except for pod.status.resourceClaimStatuses: changing that may have an
|
||||
@@ -853,7 +1045,7 @@ func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(logger klog.Logger, event
|
||||
func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *framework.QueuedPodInfo, schedulingHint framework.QueueingHint, event string) string {
|
||||
if schedulingHint == framework.QueueSkip {
|
||||
p.unschedulablePods.addOrUpdate(pInfo)
|
||||
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()
|
||||
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", event).Inc()
|
||||
return unschedulablePods
|
||||
}
|
||||
|
||||
@@ -907,7 +1099,22 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podIn
|
||||
activated = true
|
||||
}
|
||||
}
|
||||
|
||||
p.moveRequestCycle = p.schedulingCycle
|
||||
|
||||
// (no need to check the feature gate because there is always no p.inFlightPods when the feature is disabled.)
|
||||
if len(p.inFlightPods) != 0 {
|
||||
// AddUnschedulableIfNotPresent might get called for in-flight Pods later, and in
|
||||
// AddUnschedulableIfNotPresent we need to know whether events were
|
||||
// observed while scheduling them.
|
||||
p.receivedEvents.PushBack(&clusterEvent{
|
||||
event: event,
|
||||
inFlightPodsNum: len(p.inFlightPods),
|
||||
oldObj: oldObj,
|
||||
newObj: newObj,
|
||||
})
|
||||
}
|
||||
|
||||
if activated {
|
||||
p.cond.Broadcast()
|
||||
}
|
||||
@@ -1233,24 +1440,6 @@ func newPodNominator(podLister listersv1.PodLister) *nominator {
|
||||
}
|
||||
}
|
||||
|
||||
// MakeNextPodFunc returns a function to retrieve the next pod from a given
|
||||
// scheduling queue
|
||||
func MakeNextPodFunc(logger klog.Logger, queue SchedulingQueue) func() *framework.QueuedPodInfo {
|
||||
return func() *framework.QueuedPodInfo {
|
||||
podInfo, err := queue.Pop()
|
||||
if err == nil && podInfo != nil {
|
||||
logger.V(4).Info("About to try and schedule pod", "pod", klog.KObj(podInfo.Pod))
|
||||
for plugin := range podInfo.UnschedulablePlugins {
|
||||
metrics.UnschedulableReason(plugin, podInfo.Pod.Spec.SchedulerName).Dec()
|
||||
}
|
||||
return podInfo
|
||||
} else if err != nil {
|
||||
logger.Error(err, "Error while retrieving next pod from scheduling queue")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func podInfoKeyFunc(obj interface{}) (string, error) {
|
||||
return cache.MetaNamespaceKeyFunc(obj.(*framework.QueuedPodInfo).Pod)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user