mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Pop from the backoffQ when the activeQ is empty
This commit is contained in:
parent
6b8e5a9457
commit
c7919f5e22
@ -113,14 +113,22 @@ func (uaq *unlockedActiveQueue) has(pInfo *framework.QueuedPodInfo) bool {
|
||||
return uaq.queue.Has(pInfo)
|
||||
}
|
||||
|
||||
// backoffQPopper defines method that is used to pop from the backoffQ when the activeQ is empty.
|
||||
type backoffQPopper interface {
|
||||
// popBackoff pops the pInfo from the podBackoffQ.
|
||||
popBackoff() (*framework.QueuedPodInfo, error)
|
||||
// len returns length of the podBackoffQ queue.
|
||||
lenBackoff() int
|
||||
}
|
||||
|
||||
// activeQueue implements activeQueuer. All of the fields have to be protected using the lock.
|
||||
type activeQueue struct {
|
||||
// lock synchronizes all operations related to activeQ.
|
||||
// It protects activeQ, inFlightPods, inFlightEvents, schedulingCycle and closed fields.
|
||||
// Caution: DO NOT take "SchedulingQueue.lock" after taking "lock".
|
||||
// You should always take "SchedulingQueue.lock" first, otherwise the queue could end up in deadlock.
|
||||
// "lock" should not be taken after taking "nLock".
|
||||
// Correct locking order is: SchedulingQueue.lock > lock > nominator.nLock.
|
||||
// "lock" should not be taken after taking "backoffQueue.lock" or "nominator.nLock".
|
||||
// Correct locking order is: SchedulingQueue.lock > lock > backoffQueue.lock > nominator.nLock.
|
||||
lock sync.RWMutex
|
||||
|
||||
// activeQ is heap structure that scheduler actively looks at to find pods to
|
||||
@ -132,6 +140,8 @@ type activeQueue struct {
|
||||
unlockedQueue *unlockedActiveQueue
|
||||
|
||||
// cond is a condition that is notified when the pod is added to activeQ.
|
||||
// When SchedulerPopFromBackoffQ feature is enabled,
|
||||
// condition is also notified when the pod is added to backoffQ.
|
||||
// It is used with lock.
|
||||
cond sync.Cond
|
||||
|
||||
@ -171,9 +181,13 @@ type activeQueue struct {
|
||||
isSchedulingQueueHintEnabled bool
|
||||
|
||||
metricsRecorder metrics.MetricAsyncRecorder
|
||||
|
||||
// backoffQPopper is used to pop from backoffQ when activeQ is empty.
|
||||
// It is non-nil only when SchedulerPopFromBackoffQ feature is enabled.
|
||||
backoffQPopper backoffQPopper
|
||||
}
|
||||
|
||||
func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueueHintEnabled bool, metricRecorder metrics.MetricAsyncRecorder) *activeQueue {
|
||||
func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueueHintEnabled bool, metricRecorder metrics.MetricAsyncRecorder, backoffQPopper backoffQPopper) *activeQueue {
|
||||
aq := &activeQueue{
|
||||
queue: queue,
|
||||
inFlightPods: make(map[types.UID]*list.Element),
|
||||
@ -181,6 +195,7 @@ func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueu
|
||||
isSchedulingQueueHintEnabled: isSchedulingQueueHintEnabled,
|
||||
metricsRecorder: metricRecorder,
|
||||
unlockedQueue: newUnlockedActiveQueue(queue),
|
||||
backoffQPopper: backoffQPopper,
|
||||
}
|
||||
aq.cond.L = &aq.lock
|
||||
|
||||
@ -238,7 +253,13 @@ func (aq *activeQueue) pop(logger klog.Logger) (*framework.QueuedPodInfo, error)
|
||||
}
|
||||
|
||||
func (aq *activeQueue) unlockedPop(logger klog.Logger) (*framework.QueuedPodInfo, error) {
|
||||
var pInfo *framework.QueuedPodInfo
|
||||
for aq.queue.Len() == 0 {
|
||||
// backoffQPopper is non-nil only if SchedulerPopFromBackoffQ feature is enabled.
|
||||
// In case of non-empty backoffQ, try popping from there.
|
||||
if aq.backoffQPopper != nil && aq.backoffQPopper.lenBackoff() != 0 {
|
||||
break
|
||||
}
|
||||
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
|
||||
// When Close() is called, the p.closed is set and the condition is broadcast,
|
||||
// which causes this loop to continue and return from the Pop().
|
||||
@ -250,7 +271,15 @@ func (aq *activeQueue) unlockedPop(logger klog.Logger) (*framework.QueuedPodInfo
|
||||
}
|
||||
pInfo, err := aq.queue.Pop()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if aq.backoffQPopper == nil {
|
||||
return nil, err
|
||||
}
|
||||
// Try to pop from backoffQ when activeQ is empty.
|
||||
pInfo, err = aq.backoffQPopper.popBackoff()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", framework.PopFromBackoffQ).Inc()
|
||||
}
|
||||
pInfo.Attempts++
|
||||
pInfo.BackoffExpiration = time.Time{}
|
||||
|
@ -30,7 +30,7 @@ import (
|
||||
func TestClose(t *testing.T) {
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
rr := metrics.NewMetricsAsyncRecorder(10, time.Second, ctx.Done())
|
||||
aq := newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](newDefaultQueueSort()), metrics.NewActivePodsRecorder()), true, *rr)
|
||||
aq := newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](newDefaultQueueSort()), metrics.NewActivePodsRecorder()), true, *rr, nil)
|
||||
|
||||
aq.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
|
||||
unlockedActiveQ.add(&framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: st.MakePod().Namespace("foo").Name("p1").UID("p1").Obj()}}, framework.EventUnscheduledPodAdd.Label())
|
||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package queue
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
@ -35,13 +36,14 @@ import (
|
||||
const backoffQOrderingWindowDuration = time.Second
|
||||
|
||||
// backoffQueuer is a wrapper for backoffQ related operations.
|
||||
// Its methods that relies on the queues, take the lock inside.
|
||||
type backoffQueuer interface {
|
||||
// isPodBackingoff returns true if a pod is still waiting for its backoff timer.
|
||||
// If this returns true, the pod should not be re-tried.
|
||||
// If the pod backoff time is in the actual ordering window, it should still be backing off.
|
||||
isPodBackingoff(podInfo *framework.QueuedPodInfo) bool
|
||||
// popEachBackoffCompleted run fn for all pods from podBackoffQ and podErrorBackoffQ that completed backoff while popping them.
|
||||
popEachBackoffCompleted(logger klog.Logger, fn func(pInfo *framework.QueuedPodInfo))
|
||||
// popAllBackoffCompleted pops all pods from podBackoffQ and podErrorBackoffQ that completed backoff.
|
||||
popAllBackoffCompleted(logger klog.Logger) []*framework.QueuedPodInfo
|
||||
|
||||
// podInitialBackoffDuration returns initial backoff duration that pod can get.
|
||||
podInitialBackoffDuration() time.Duration
|
||||
@ -61,7 +63,8 @@ type backoffQueuer interface {
|
||||
// It returns new pod info if updated, nil otherwise.
|
||||
update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo
|
||||
// delete deletes the pInfo from backoffQueue.
|
||||
delete(pInfo *framework.QueuedPodInfo)
|
||||
// It returns true if the pod was deleted.
|
||||
delete(pInfo *framework.QueuedPodInfo) bool
|
||||
// get returns the pInfo matching given pInfoLookup, if exists.
|
||||
get(pInfoLookup *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool)
|
||||
// has inform if pInfo exists in the queue.
|
||||
@ -75,6 +78,14 @@ type backoffQueuer interface {
|
||||
// backoffQueue implements backoffQueuer and wraps two queues inside,
|
||||
// providing seamless access as if it were one queue.
|
||||
type backoffQueue struct {
|
||||
// lock synchronizes all operations related to backoffQ.
|
||||
// It protects both podBackoffQ and podErrorBackoffQ.
|
||||
// Caution: DO NOT take "SchedulingQueue.lock" or "activeQueue.lock" after taking "lock".
|
||||
// You should always take "SchedulingQueue.lock" and "activeQueue.lock" first, otherwise the queue could end up in deadlock.
|
||||
// "lock" should not be taken after taking "nominator.nLock".
|
||||
// Correct locking order is: SchedulingQueue.lock > activeQueue.lock > lock > nominator.nLock.
|
||||
lock sync.RWMutex
|
||||
|
||||
clock clock.WithTicker
|
||||
|
||||
// podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
|
||||
@ -239,7 +250,8 @@ func (bq *backoffQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInf
|
||||
return duration
|
||||
}
|
||||
|
||||
func (bq *backoffQueue) popEachBackoffCompletedWithQueue(logger klog.Logger, fn func(pInfo *framework.QueuedPodInfo), queue *heap.Heap[*framework.QueuedPodInfo]) {
|
||||
func (bq *backoffQueue) popAllBackoffCompletedWithQueue(logger klog.Logger, queue *heap.Heap[*framework.QueuedPodInfo]) []*framework.QueuedPodInfo {
|
||||
var poppedPods []*framework.QueuedPodInfo
|
||||
for {
|
||||
pInfo, ok := queue.Peek()
|
||||
if !ok || pInfo == nil {
|
||||
@ -254,23 +266,27 @@ func (bq *backoffQueue) popEachBackoffCompletedWithQueue(logger klog.Logger, fn
|
||||
logger.Error(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod))
|
||||
break
|
||||
}
|
||||
if fn != nil {
|
||||
fn(pInfo)
|
||||
}
|
||||
poppedPods = append(poppedPods, pInfo)
|
||||
}
|
||||
return poppedPods
|
||||
}
|
||||
|
||||
// popEachBackoffCompleted run fn for all pods from podBackoffQ and podErrorBackoffQ that completed backoff while popping them.
|
||||
func (bq *backoffQueue) popEachBackoffCompleted(logger klog.Logger, fn func(pInfo *framework.QueuedPodInfo)) {
|
||||
// popAllBackoffCompleted pops all pods from podBackoffQ and podErrorBackoffQ that completed backoff.
|
||||
func (bq *backoffQueue) popAllBackoffCompleted(logger klog.Logger) []*framework.QueuedPodInfo {
|
||||
bq.lock.Lock()
|
||||
defer bq.lock.Unlock()
|
||||
|
||||
// Ensure both queues are called
|
||||
bq.popEachBackoffCompletedWithQueue(logger, fn, bq.podBackoffQ)
|
||||
bq.popEachBackoffCompletedWithQueue(logger, fn, bq.podErrorBackoffQ)
|
||||
return append(bq.popAllBackoffCompletedWithQueue(logger, bq.podBackoffQ), bq.popAllBackoffCompletedWithQueue(logger, bq.podErrorBackoffQ)...)
|
||||
}
|
||||
|
||||
// add adds the pInfo to backoffQueue.
|
||||
// The event should show which event triggered this addition and is used for the metric recording.
|
||||
// It also ensures that pInfo is not in both queues.
|
||||
func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo, event string) {
|
||||
bq.lock.Lock()
|
||||
defer bq.lock.Unlock()
|
||||
|
||||
// If pod has empty both unschedulable plugins and pending plugins,
|
||||
// it means that it failed because of error and should be moved to podErrorBackoffQ.
|
||||
if pInfo.UnschedulablePlugins.Len() == 0 && pInfo.PendingPlugins.Len() == 0 {
|
||||
@ -297,6 +313,9 @@ func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo,
|
||||
// update updates the pod in backoffQueue if oldPodInfo is already in the queue.
|
||||
// It returns new pod info if updated, nil otherwise.
|
||||
func (bq *backoffQueue) update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo {
|
||||
bq.lock.Lock()
|
||||
defer bq.lock.Unlock()
|
||||
|
||||
// If the pod is in the backoff queue, update it there.
|
||||
if pInfo, exists := bq.podBackoffQ.Get(oldPodInfo); exists {
|
||||
_ = pInfo.Update(newPod)
|
||||
@ -313,13 +332,32 @@ func (bq *backoffQueue) update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodIn
|
||||
}
|
||||
|
||||
// delete deletes the pInfo from backoffQueue.
|
||||
func (bq *backoffQueue) delete(pInfo *framework.QueuedPodInfo) {
|
||||
_ = bq.podBackoffQ.Delete(pInfo)
|
||||
_ = bq.podErrorBackoffQ.Delete(pInfo)
|
||||
// It returns true if the pod was deleted.
|
||||
func (bq *backoffQueue) delete(pInfo *framework.QueuedPodInfo) bool {
|
||||
bq.lock.Lock()
|
||||
defer bq.lock.Unlock()
|
||||
|
||||
if bq.podBackoffQ.Delete(pInfo) == nil {
|
||||
return true
|
||||
}
|
||||
return bq.podErrorBackoffQ.Delete(pInfo) == nil
|
||||
}
|
||||
|
||||
// popBackoff pops the pInfo from the podBackoffQ.
|
||||
// It returns error if the queue is empty.
|
||||
// This doesn't pop the pods from the podErrorBackoffQ.
|
||||
func (bq *backoffQueue) popBackoff() (*framework.QueuedPodInfo, error) {
|
||||
bq.lock.Lock()
|
||||
defer bq.lock.Unlock()
|
||||
|
||||
return bq.podBackoffQ.Pop()
|
||||
}
|
||||
|
||||
// get returns the pInfo matching given pInfoLookup, if exists.
|
||||
func (bq *backoffQueue) get(pInfoLookup *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool) {
|
||||
bq.lock.RLock()
|
||||
defer bq.lock.RUnlock()
|
||||
|
||||
pInfo, exists := bq.podBackoffQ.Get(pInfoLookup)
|
||||
if exists {
|
||||
return pInfo, true
|
||||
@ -329,11 +367,17 @@ func (bq *backoffQueue) get(pInfoLookup *framework.QueuedPodInfo) (*framework.Qu
|
||||
|
||||
// has inform if pInfo exists in the queue.
|
||||
func (bq *backoffQueue) has(pInfo *framework.QueuedPodInfo) bool {
|
||||
bq.lock.RLock()
|
||||
defer bq.lock.RUnlock()
|
||||
|
||||
return bq.podBackoffQ.Has(pInfo) || bq.podErrorBackoffQ.Has(pInfo)
|
||||
}
|
||||
|
||||
// list returns all pods that are in the queue.
|
||||
func (bq *backoffQueue) list() []*v1.Pod {
|
||||
bq.lock.RLock()
|
||||
defer bq.lock.RUnlock()
|
||||
|
||||
var result []*v1.Pod
|
||||
for _, pInfo := range bq.podBackoffQ.List() {
|
||||
result = append(result, pInfo.Pod)
|
||||
@ -346,5 +390,16 @@ func (bq *backoffQueue) list() []*v1.Pod {
|
||||
|
||||
// len returns length of the queue.
|
||||
func (bq *backoffQueue) len() int {
|
||||
bq.lock.RLock()
|
||||
defer bq.lock.RUnlock()
|
||||
|
||||
return bq.podBackoffQ.Len() + bq.podErrorBackoffQ.Len()
|
||||
}
|
||||
|
||||
// lenBackoff returns length of the podBackoffQ.
|
||||
func (bq *backoffQueue) lenBackoff() int {
|
||||
bq.lock.RLock()
|
||||
defer bq.lock.RUnlock()
|
||||
|
||||
return bq.podBackoffQ.Len()
|
||||
}
|
||||
|
@ -78,7 +78,7 @@ func TestBackoffQueue_calculateBackoffDuration(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestBackoffQueue_popEachBackoffCompleted(t *testing.T) {
|
||||
func TestBackoffQueue_popAllBackoffCompleted(t *testing.T) {
|
||||
fakeClock := testingclock.NewFakeClock(time.Now())
|
||||
podInfos := map[string]*framework.QueuedPodInfo{
|
||||
"pod0": {
|
||||
@ -156,10 +156,11 @@ func TestBackoffQueue_popEachBackoffCompleted(t *testing.T) {
|
||||
for _, podName := range tt.podsInBackoff {
|
||||
bq.add(logger, podInfos[podName], framework.EventUnscheduledPodAdd.Label())
|
||||
}
|
||||
gotPodInfos := bq.popAllBackoffCompleted(logger)
|
||||
var gotPods []string
|
||||
bq.popEachBackoffCompleted(logger, func(pInfo *framework.QueuedPodInfo) {
|
||||
for _, pInfo := range gotPodInfos {
|
||||
gotPods = append(gotPods, pInfo.Pod.Name)
|
||||
})
|
||||
}
|
||||
if diff := cmp.Diff(tt.wantPods, gotPods); diff != "" {
|
||||
t.Errorf("Unexpected pods moved (-want, +got):\n%s", diff)
|
||||
}
|
||||
@ -248,10 +249,11 @@ func TestBackoffQueueOrdering(t *testing.T) {
|
||||
for _, podInfo := range podInfos {
|
||||
bq.add(logger, podInfo, framework.EventUnscheduledPodAdd.Label())
|
||||
}
|
||||
gotPodInfos := bq.popAllBackoffCompleted(logger)
|
||||
var gotPods []string
|
||||
bq.popEachBackoffCompleted(logger, func(pInfo *framework.QueuedPodInfo) {
|
||||
for _, pInfo := range gotPodInfos {
|
||||
gotPods = append(gotPods, pInfo.Pod.Name)
|
||||
})
|
||||
}
|
||||
if diff := cmp.Diff(tt.wantPods, gotPods); diff != "" {
|
||||
t.Errorf("Unexpected pods moved (-want, +got):\n%s", diff)
|
||||
}
|
||||
|
@ -35,10 +35,10 @@ import (
|
||||
type nominator struct {
|
||||
// nLock synchronizes all operations related to nominator.
|
||||
// It should not be used anywhere else.
|
||||
// Caution: DO NOT take ("SchedulingQueue.lock" or "activeQueue.lock") after taking "nLock".
|
||||
// You should always take "SchedulingQueue.lock" and "activeQueue.lock" first,
|
||||
// Caution: DO NOT take ("SchedulingQueue.lock" or "activeQueue.lock" or "backoffQueue.lock") after taking "nLock".
|
||||
// You should always take "SchedulingQueue.lock" and "activeQueue.lock" and "backoffQueue.lock" first,
|
||||
// otherwise the nominator could end up in deadlock.
|
||||
// Correct locking order is: SchedulingQueue.lock > activeQueue.lock > nLock.
|
||||
// Correct locking order is: SchedulingQueue.lock > activeQueue.lock = backoffQueue.lock > nLock.
|
||||
nLock sync.RWMutex
|
||||
|
||||
// podLister is used to verify if the given pod is alive.
|
||||
|
@ -160,8 +160,8 @@ type PriorityQueue struct {
|
||||
clock clock.WithTicker
|
||||
|
||||
// lock takes precedence and should be taken first,
|
||||
// before any other locks in the queue (activeQueue.lock or nominator.nLock).
|
||||
// Correct locking order is: lock > activeQueue.lock > nominator.nLock.
|
||||
// before any other locks in the queue (activeQueue.lock or backoffQueue.lock or nominator.nLock).
|
||||
// Correct locking order is: lock > activeQueue.lock > backoffQueue.lock > nominator.nLock.
|
||||
lock sync.RWMutex
|
||||
|
||||
// the maximum time a pod can stay in the unschedulablePods.
|
||||
@ -331,12 +331,12 @@ func NewPriorityQueue(
|
||||
isSchedulingQueueHintEnabled := utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints)
|
||||
isPopFromBackoffQEnabled := utilfeature.DefaultFeatureGate.Enabled(features.SchedulerPopFromBackoffQ)
|
||||
|
||||
backoffQ := newBackoffQueue(options.clock, options.podInitialBackoffDuration, options.podMaxBackoffDuration, lessFn, isPopFromBackoffQEnabled)
|
||||
pq := &PriorityQueue{
|
||||
clock: options.clock,
|
||||
stop: make(chan struct{}),
|
||||
podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration,
|
||||
activeQ: newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](lessFn), metrics.NewActivePodsRecorder()), isSchedulingQueueHintEnabled, options.metricsRecorder),
|
||||
backoffQ: newBackoffQueue(options.clock, options.podInitialBackoffDuration, options.podMaxBackoffDuration, lessFn, isPopFromBackoffQEnabled),
|
||||
backoffQ: backoffQ,
|
||||
unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()),
|
||||
preEnqueuePluginMap: options.preEnqueuePluginMap,
|
||||
queueingHintMap: options.queueingHintMap,
|
||||
@ -346,6 +346,11 @@ func NewPriorityQueue(
|
||||
isSchedulingQueueHintEnabled: isSchedulingQueueHintEnabled,
|
||||
isPopFromBackoffQEnabled: isPopFromBackoffQEnabled,
|
||||
}
|
||||
var backoffQPopper backoffQPopper
|
||||
if isPopFromBackoffQEnabled {
|
||||
backoffQPopper = backoffQ
|
||||
}
|
||||
pq.activeQ = newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](lessFn), metrics.NewActivePodsRecorder()), isSchedulingQueueHintEnabled, options.metricsRecorder, backoffQPopper)
|
||||
pq.nsLister = informerFactory.Core().V1().Namespaces().Lister()
|
||||
pq.nominator = newPodNominator(options.podLister)
|
||||
|
||||
@ -672,6 +677,12 @@ func (p *PriorityQueue) activate(logger klog.Logger, pod *v1.Pod) bool {
|
||||
if !exists {
|
||||
return false
|
||||
}
|
||||
// Delete pod from the backoffQ now to make sure it won't be popped from the backoffQ
|
||||
// just before moving it to the activeQ
|
||||
if deleted := p.backoffQ.delete(pInfo); !deleted {
|
||||
// Pod was popped from the backoffQ in the meantime. Don't activate it.
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if pInfo == nil {
|
||||
@ -756,7 +767,11 @@ func (p *PriorityQueue) addUnschedulableWithoutQueueingHint(logger klog.Logger,
|
||||
// - No unschedulable plugins are associated with this Pod,
|
||||
// meaning something unusual (a temporal failure on kube-apiserver, etc) happened and this Pod gets moved back to the queue.
|
||||
// In this case, we should retry scheduling it because this Pod may not be retried until the next flush.
|
||||
_ = p.moveToBackoffQ(logger, pInfo, framework.ScheduleAttemptFailure)
|
||||
if added := p.moveToBackoffQ(logger, pInfo, framework.ScheduleAttemptFailure); added {
|
||||
if p.isPopFromBackoffQEnabled {
|
||||
p.activeQ.broadcast()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
p.unschedulablePods.addOrUpdate(pInfo, framework.ScheduleAttemptFailure)
|
||||
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", framework.ScheduleAttemptFailure, "queue", unschedulablePods)
|
||||
@ -809,7 +824,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo *
|
||||
// In this case, we try to requeue this Pod to activeQ/backoffQ.
|
||||
queue := p.requeuePodViaQueueingHint(logger, pInfo, schedulingHint, framework.ScheduleAttemptFailure)
|
||||
logger.V(3).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", framework.ScheduleAttemptFailure, "queue", queue, "schedulingCycle", podSchedulingCycle, "hint", schedulingHint, "unschedulable plugins", rejectorPlugins)
|
||||
if queue == activeQ {
|
||||
if queue == activeQ || (p.isPopFromBackoffQEnabled && queue == backoffQ) {
|
||||
// When the Pod is moved to activeQ, need to let p.cond know so that the Pod will be pop()ed out.
|
||||
p.activeQ.broadcast()
|
||||
}
|
||||
@ -822,11 +837,12 @@ func (p *PriorityQueue) flushBackoffQCompleted(logger klog.Logger) {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
activated := false
|
||||
p.backoffQ.popEachBackoffCompleted(logger, func(pInfo *framework.QueuedPodInfo) {
|
||||
podsCompletedBackoff := p.backoffQ.popAllBackoffCompleted(logger)
|
||||
for _, pInfo := range podsCompletedBackoff {
|
||||
if added := p.moveToActiveQ(logger, pInfo, framework.BackoffComplete); added {
|
||||
activated = true
|
||||
}
|
||||
})
|
||||
}
|
||||
if activated {
|
||||
p.activeQ.broadcast()
|
||||
}
|
||||
@ -954,7 +970,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) {
|
||||
logger.V(5).Info("Pod moved to an internal scheduling queue because the Pod is updated", "pod", klog.KObj(newPod), "event", evt.Label(), "queue", queue)
|
||||
p.unschedulablePods.delete(pInfo.Pod, gated)
|
||||
}
|
||||
if queue == activeQ {
|
||||
if queue == activeQ || (p.isPopFromBackoffQEnabled && queue == backoffQ) {
|
||||
p.activeQ.broadcast()
|
||||
break
|
||||
}
|
||||
@ -967,6 +983,9 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) {
|
||||
if p.backoffQ.isPodBackingoff(pInfo) {
|
||||
if added := p.moveToBackoffQ(logger, pInfo, framework.EventUnscheduledPodUpdate.Label()); added {
|
||||
p.unschedulablePods.delete(pInfo.Pod, gated)
|
||||
if p.isPopFromBackoffQEnabled {
|
||||
p.activeQ.broadcast()
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
@ -995,12 +1014,14 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) {
|
||||
defer p.lock.Unlock()
|
||||
p.DeleteNominatedPodIfExists(pod)
|
||||
pInfo := newQueuedPodInfoForLookup(pod)
|
||||
if err := p.activeQ.delete(pInfo); err != nil {
|
||||
// The item was probably not found in the activeQ.
|
||||
p.backoffQ.delete(pInfo)
|
||||
if pInfo = p.unschedulablePods.get(pod); pInfo != nil {
|
||||
p.unschedulablePods.delete(pod, pInfo.Gated)
|
||||
}
|
||||
if err := p.activeQ.delete(pInfo); err == nil {
|
||||
return
|
||||
}
|
||||
if deleted := p.backoffQ.delete(pInfo); deleted {
|
||||
return
|
||||
}
|
||||
if pInfo = p.unschedulablePods.get(pod); pInfo != nil {
|
||||
p.unschedulablePods.delete(pod, pInfo.Gated)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1127,7 +1148,7 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podIn
|
||||
p.unschedulablePods.delete(pInfo.Pod, pInfo.Gated)
|
||||
queue := p.requeuePodViaQueueingHint(logger, pInfo, schedulingHint, event.Label())
|
||||
logger.V(4).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event.Label(), "queue", queue, "hint", schedulingHint)
|
||||
if queue == activeQ {
|
||||
if queue == activeQ || (p.isPopFromBackoffQEnabled && queue == backoffQ) {
|
||||
activated = true
|
||||
}
|
||||
}
|
||||
@ -1222,11 +1243,13 @@ func (p *PriorityQueue) PendingPods() ([]*v1.Pod, string) {
|
||||
defer p.lock.RUnlock()
|
||||
result := p.activeQ.list()
|
||||
activeQLen := len(result)
|
||||
result = append(result, p.backoffQ.list()...)
|
||||
backoffQPods := p.backoffQ.list()
|
||||
backoffQLen := len(backoffQPods)
|
||||
result = append(result, backoffQPods...)
|
||||
for _, pInfo := range p.unschedulablePods.podInfoMap {
|
||||
result = append(result, pInfo.Pod)
|
||||
}
|
||||
return result, fmt.Sprintf(pendingPodsSummary, activeQLen, p.backoffQ.len(), len(p.unschedulablePods.podInfoMap))
|
||||
return result, fmt.Sprintf(pendingPodsSummary, activeQLen, backoffQLen, len(p.unschedulablePods.podInfoMap))
|
||||
}
|
||||
|
||||
// Note: this function assumes the caller locks both p.lock.RLock and p.activeQ.getLock().RLock.
|
||||
|
@ -1010,25 +1010,88 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestPriorityQueue_Pop(t *testing.T) {
|
||||
objs := []runtime.Object{medPriorityPodInfo.Pod}
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs)
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
// tryPop tries to pop one pod from the queue and returns it.
|
||||
// It waits 5 seconds before timing out, assuming the queue is then empty.
|
||||
func tryPop(t *testing.T, logger klog.Logger, q *PriorityQueue) *framework.QueuedPodInfo {
|
||||
t.Helper()
|
||||
|
||||
var gotPod *framework.QueuedPodInfo
|
||||
popped := make(chan struct{}, 1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if p, err := q.Pop(logger); err != nil || p.Pod != medPriorityPodInfo.Pod {
|
||||
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name)
|
||||
pod, err := q.Pop(logger)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to pop pod from scheduling queue: %s", err)
|
||||
}
|
||||
if len(q.nominator.nominatedPods["node1"]) != 1 {
|
||||
t.Errorf("Expected medPriorityPodInfo to be present in nominatedPods: %v", q.nominator.nominatedPods["node1"])
|
||||
if pod != nil {
|
||||
gotPod = pod
|
||||
}
|
||||
popped <- struct{}{}
|
||||
}()
|
||||
q.Add(logger, medPriorityPodInfo.Pod)
|
||||
wg.Wait()
|
||||
|
||||
timer := time.NewTimer(5 * time.Second)
|
||||
select {
|
||||
case <-timer.C:
|
||||
q.Close()
|
||||
case <-popped:
|
||||
timer.Stop()
|
||||
}
|
||||
return gotPod
|
||||
}
|
||||
|
||||
func TestPriorityQueue_Pop(t *testing.T) {
|
||||
highPriorityPodInfo2 := mustNewPodInfo(
|
||||
st.MakePod().Name("hpp2").Namespace("ns1").UID("hpp2ns1").Priority(highPriority).Obj(),
|
||||
)
|
||||
objs := []runtime.Object{medPriorityPodInfo.Pod, highPriorityPodInfo.Pod, highPriorityPodInfo2.Pod, unschedulablePodInfo.Pod}
|
||||
tests := []struct {
|
||||
name string
|
||||
popFromBackoffQEnabled bool
|
||||
wantPods []string
|
||||
}{
|
||||
{
|
||||
name: "Pop pods from both activeQ and backoffQ when PopFromBackoffQ is enabled",
|
||||
popFromBackoffQEnabled: true,
|
||||
wantPods: []string{medPriorityPodInfo.Pod.Name, highPriorityPodInfo.Pod.Name},
|
||||
},
|
||||
{
|
||||
name: "Pop pod only from activeQ when PopFromBackoffQ is disabled",
|
||||
popFromBackoffQEnabled: false,
|
||||
wantPods: []string{medPriorityPodInfo.Pod.Name},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerPopFromBackoffQ, tt.popFromBackoffQEnabled)
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs)
|
||||
|
||||
// Add medium priority pod to the activeQ
|
||||
q.Add(logger, medPriorityPodInfo.Pod)
|
||||
// Add high priority pod to the backoffQ
|
||||
backoffPodInfo := q.newQueuedPodInfo(highPriorityPodInfo.Pod, "plugin")
|
||||
q.backoffQ.add(logger, backoffPodInfo, framework.EventUnscheduledPodAdd.Label())
|
||||
// Add high priority pod to the errorBackoffQ
|
||||
errorBackoffPodInfo := q.newQueuedPodInfo(highPriorityPodInfo2.Pod)
|
||||
q.backoffQ.add(logger, errorBackoffPodInfo, framework.EventUnscheduledPodAdd.Label())
|
||||
// Add pod to the unschedulablePods
|
||||
unschedulablePodInfo := q.newQueuedPodInfo(unschedulablePodInfo.Pod, "plugin")
|
||||
q.unschedulablePods.addOrUpdate(unschedulablePodInfo, framework.EventUnscheduledPodAdd.Label())
|
||||
|
||||
var gotPods []string
|
||||
for i := 0; i < len(tt.wantPods)+1; i++ {
|
||||
gotPod := tryPop(t, logger, q)
|
||||
if gotPod == nil {
|
||||
break
|
||||
}
|
||||
gotPods = append(gotPods, gotPod.Pod.Name)
|
||||
}
|
||||
if diff := cmp.Diff(tt.wantPods, gotPods); diff != "" {
|
||||
t.Errorf("Unexpected popped pods (-want, +got): %s", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPriorityQueue_Update(t *testing.T) {
|
||||
@ -1951,7 +2014,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
|
||||
// pop out the pods in the backoffQ.
|
||||
// This doesn't make them in-flight pods.
|
||||
c.Step(q.backoffQ.podMaxBackoffDuration())
|
||||
q.backoffQ.popEachBackoffCompleted(logger, nil)
|
||||
_ = q.backoffQ.popAllBackoffCompleted(logger)
|
||||
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID)
|
||||
|
||||
q.Add(logger, unschedulablePodInfo.Pod)
|
||||
@ -2074,7 +2137,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi
|
||||
// pop out the pods in the backoffQ.
|
||||
// This doesn't make them in-flight pods.
|
||||
c.Step(q.backoffQ.podMaxBackoffDuration())
|
||||
q.backoffQ.popEachBackoffCompleted(logger, nil)
|
||||
_ = q.backoffQ.popAllBackoffCompleted(logger)
|
||||
|
||||
unschedulableQueuedPodInfo := attemptQueuedPodInfo(q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"))
|
||||
highPriorityQueuedPodInfo := attemptQueuedPodInfo(q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin"))
|
||||
@ -3883,9 +3946,10 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) {
|
||||
q.MoveAllToActiveOrBackoffQueue(logger, tt.event, nil, nil, tt.preEnqueueCheck)
|
||||
got := sets.New[string]()
|
||||
c.Step(2 * q.backoffQ.podMaxBackoffDuration())
|
||||
q.backoffQ.popEachBackoffCompleted(logger, func(pInfo *framework.QueuedPodInfo) {
|
||||
gotPodInfos := q.backoffQ.popAllBackoffCompleted(logger)
|
||||
for _, pInfo := range gotPodInfos {
|
||||
got.Insert(pInfo.Pod.Name)
|
||||
})
|
||||
}
|
||||
if diff := cmp.Diff(tt.want, got); diff != "" {
|
||||
t.Errorf("Unexpected diff (-want, +got):\n%s", diff)
|
||||
}
|
||||
|
@ -31,6 +31,8 @@ const (
|
||||
ScheduleAttemptFailure = "ScheduleAttemptFailure"
|
||||
// BackoffComplete is the event when a pod finishes backoff.
|
||||
BackoffComplete = "BackoffComplete"
|
||||
// PopFromBackoffQ is the event when a pod is popped from backoffQ when activeQ is empty.
|
||||
PopFromBackoffQ = "PopFromBackoffQ"
|
||||
// ForceActivate is the event when a pod is moved from unschedulablePods/backoffQ
|
||||
// to activeQ. Usually it's triggered by plugin implementations.
|
||||
ForceActivate = "ForceActivate"
|
||||
|
@ -212,6 +212,8 @@ func TestUpdateNominatedNodeName(t *testing.T) {
|
||||
for _, qHintEnabled := range []bool{false, true} {
|
||||
t.Run(fmt.Sprintf("%s, with queuehint(%v)", tt.name, qHintEnabled), func(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, qHintEnabled)
|
||||
// Set the SchedulerPopFromBackoffQ feature to false, because when it's enabled, we can't be sure the pod won't be popped from the backoffQ.
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerPopFromBackoffQ, false)
|
||||
|
||||
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 0, true,
|
||||
scheduler.WithClock(fakeClock),
|
||||
|
@ -309,14 +309,18 @@ var _ framework.PreFilterPlugin = &PreFilterPlugin{}
|
||||
var _ framework.PostFilterPlugin = &PostFilterPlugin{}
|
||||
var _ framework.ScorePlugin = &ScorePlugin{}
|
||||
var _ framework.FilterPlugin = &FilterPlugin{}
|
||||
var _ framework.EnqueueExtensions = &FilterPlugin{}
|
||||
var _ framework.ScorePlugin = &ScorePlugin{}
|
||||
var _ framework.ScorePlugin = &ScoreWithNormalizePlugin{}
|
||||
var _ framework.EnqueueExtensions = &ScorePlugin{}
|
||||
var _ framework.ReservePlugin = &ReservePlugin{}
|
||||
var _ framework.PreScorePlugin = &PreScorePlugin{}
|
||||
var _ framework.PreBindPlugin = &PreBindPlugin{}
|
||||
var _ framework.EnqueueExtensions = &PreBindPlugin{}
|
||||
var _ framework.BindPlugin = &BindPlugin{}
|
||||
var _ framework.PostBindPlugin = &PostBindPlugin{}
|
||||
var _ framework.PermitPlugin = &PermitPlugin{}
|
||||
var _ framework.EnqueueExtensions = &PermitPlugin{}
|
||||
var _ framework.QueueSortPlugin = &QueueSortPlugin{}
|
||||
|
||||
func (ep *QueueSortPlugin) Name() string {
|
||||
@ -377,6 +381,10 @@ func (sp *ScorePlugin) ScoreExtensions() framework.ScoreExtensions {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sp *ScorePlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Name returns name of the score plugin.
|
||||
func (sp *ScoreWithNormalizePlugin) Name() string {
|
||||
return scoreWithNormalizePluginName
|
||||
@ -427,6 +435,12 @@ func (fp *FilterPlugin) Filter(ctx context.Context, state *framework.CycleState,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fp *FilterPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
return []framework.ClusterEventWithHint{
|
||||
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Name returns name of the plugin.
|
||||
func (rp *ReservePlugin) Name() string {
|
||||
return rp.name
|
||||
@ -491,6 +505,10 @@ func (pp *PreBindPlugin) PreBind(ctx context.Context, state *framework.CycleStat
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pp *PreBindPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
const bindPluginAnnotation = "bindPluginName"
|
||||
|
||||
func (bp *BindPlugin) Name() string {
|
||||
@ -651,6 +669,10 @@ func (pp *PermitPlugin) rejectAllPods() {
|
||||
pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { wp.Reject(pp.name, "rejectAllPods") })
|
||||
}
|
||||
|
||||
func (pp *PermitPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// TestPreFilterPlugin tests invocation of prefilter plugins.
|
||||
func TestPreFilterPlugin(t *testing.T) {
|
||||
testContext := testutils.InitTestAPIServer(t, "prefilter-plugin", nil)
|
||||
|
@ -581,3 +581,38 @@ func (p *fakePermitPlugin) EventsToRegister(_ context.Context) ([]framework.Clus
|
||||
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeLabel}, QueueingHintFn: p.schedulingHint},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func TestPopFromBackoffQWhenActiveQEmpty(t *testing.T) {
|
||||
// Set initial backoff to 1000s to make sure pod won't go to the activeQ after being requeued.
|
||||
testCtx := testutils.InitTestSchedulerWithNS(t, "pop-from-backoffq", scheduler.WithPodInitialBackoffSeconds(1000), scheduler.WithPodMaxBackoffSeconds(1000))
|
||||
cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx
|
||||
|
||||
// Create node, so we can schedule pods.
|
||||
node := st.MakeNode().Name("node").Obj()
|
||||
if _, err := cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil {
|
||||
t.Fatal("Failed to create node")
|
||||
}
|
||||
|
||||
// Create a pod that will be unschedulable.
|
||||
pod := st.MakePod().Namespace(ns).Name("pod").NodeAffinityIn("foo", []string{"bar"}, st.NodeSelectorTypeMatchExpressions).Container(imageutils.GetPauseImageName()).Obj()
|
||||
if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
|
||||
t.Fatalf("Failed to create pod: %v", err)
|
||||
}
|
||||
|
||||
err := wait.PollUntilContextTimeout(ctx, 200*time.Millisecond, wait.ForeverTestTimeout, false, testutils.PodUnschedulable(cs, ns, pod.Name))
|
||||
if err != nil {
|
||||
t.Fatalf("Expected pod to be unschedulable: %v", err)
|
||||
}
|
||||
|
||||
// Create node with label to make the pod schedulable.
|
||||
node2 := st.MakeNode().Name("node-schedulable").Label("foo", "bar").Obj()
|
||||
if _, err := cs.CoreV1().Nodes().Create(ctx, node2, metav1.CreateOptions{}); err != nil {
|
||||
t.Fatal("Failed to create node-schedulable")
|
||||
}
|
||||
|
||||
// Pod should be scheduled, even if it was in the backoffQ, because PopFromBackoffQ feature is enabled.
|
||||
err = wait.PollUntilContextTimeout(ctx, 200*time.Millisecond, wait.ForeverTestTimeout, false, testutils.PodScheduled(cs, ns, pod.Name))
|
||||
if err != nil {
|
||||
t.Fatalf("Expected pod to be scheduled: %v", err)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user