diff --git a/cmd/kube-scheduler/app/config/config.go b/cmd/kube-scheduler/app/config/config.go index 927a71b21b9..e007f846098 100644 --- a/cmd/kube-scheduler/app/config/config.go +++ b/cmd/kube-scheduler/app/config/config.go @@ -17,6 +17,8 @@ limitations under the License. package config import ( + "time" + apiserver "k8s.io/apiserver/pkg/server" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/informers" @@ -49,6 +51,12 @@ type Config struct { // LeaderElection is optional. LeaderElection *leaderelection.LeaderElectionConfig + + // PodMaxUnschedulableQDuration is the maximum time a pod can stay in + // unschedulableQ. If a pod stays in unschedulableQ for longer than this + // value, the pod will be moved from unschedulableQ to backoffQ or activeQ. + // If this value is empty, the default value (60s) will be used. + PodMaxUnschedulableQDuration time.Duration } type completedConfig struct { diff --git a/cmd/kube-scheduler/app/options/deprecated.go b/cmd/kube-scheduler/app/options/deprecated.go index 6693655d08e..89c725c4b48 100644 --- a/cmd/kube-scheduler/app/options/deprecated.go +++ b/cmd/kube-scheduler/app/options/deprecated.go @@ -17,6 +17,8 @@ limitations under the License. package options import ( + "time" + "github.com/spf13/pflag" componentbaseconfig "k8s.io/component-base/config" ) @@ -28,6 +30,11 @@ type DeprecatedOptions struct { componentbaseconfig.ClientConnectionConfiguration // Note that only the deprecated options (lock-object-name and lock-object-namespace) are populated here. componentbaseconfig.LeaderElectionConfiguration + // PodMaxUnschedulableQDuration is the maximum time a pod can stay in + // unschedulableQ. If a pod stays in unschedulableQ for longer than this + // value, the pod will be moved from unschedulableQ to backoffQ or activeQ. + // If this value is empty, the default value (60s) will be used. + PodMaxUnschedulableQDuration time.Duration } // AddFlags adds flags for the deprecated options. @@ -44,4 +51,5 @@ func (o *DeprecatedOptions) AddFlags(fs *pflag.FlagSet) { fs.Int32Var(&o.Burst, "kube-api-burst", 100, "DEPRECATED: burst to use while talking with kubernetes apiserver. This parameter is ignored if a config file is specified in --config.") fs.StringVar(&o.ResourceNamespace, "lock-object-namespace", "kube-system", "DEPRECATED: define the namespace of the lock object. Will be removed in favor of leader-elect-resource-namespace. This parameter is ignored if a config file is specified in --config.") fs.StringVar(&o.ResourceName, "lock-object-name", "kube-scheduler", "DEPRECATED: define the name of the lock object. Will be removed in favor of leader-elect-resource-name. This parameter is ignored if a config file is specified in --config.") + fs.DurationVar(&o.PodMaxUnschedulableQDuration, "pod-max-unschedulableq-duration", 60*time.Second, "DEPRECATED: the maximum time a pod can stay in unschedulableQ. If a pod stays in unschedulableQ for longer than this value, the pod will be moved from unschedulableQ to backoffQ or activeQ. This flag is deprecated and will be removed in 1.26") } diff --git a/cmd/kube-scheduler/app/options/options.go b/cmd/kube-scheduler/app/options/options.go index ab313266f60..82c2bca81f9 100644 --- a/cmd/kube-scheduler/app/options/options.go +++ b/cmd/kube-scheduler/app/options/options.go @@ -79,7 +79,9 @@ func NewOptions() *Options { SecureServing: apiserveroptions.NewSecureServingOptions().WithLoopback(), Authentication: apiserveroptions.NewDelegatingAuthenticationOptions(), Authorization: apiserveroptions.NewDelegatingAuthorizationOptions(), - Deprecated: &DeprecatedOptions{}, + Deprecated: &DeprecatedOptions{ + PodMaxUnschedulableQDuration: 60 * time.Second, + }, LeaderElection: &componentbaseconfig.LeaderElectionConfiguration{ LeaderElect: true, LeaseDuration: metav1.Duration{Duration: 15 * time.Second}, @@ -231,6 +233,12 @@ func (o *Options) ApplyTo(c *schedulerappconfig.Config) error { } } o.Metrics.Apply() + + // Apply value independently instead of using ApplyDeprecated() because it can't be configured via ComponentConfig. + if o.Deprecated != nil { + c.PodMaxUnschedulableQDuration = o.Deprecated.PodMaxUnschedulableQDuration + } + return nil } diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index fe065c80cc2..64ad506777a 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -328,6 +328,7 @@ func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry), scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds), scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds), + scheduler.WithPodMaxUnschedulableQDuration(cc.PodMaxUnschedulableQDuration), scheduler.WithExtenders(cc.ComponentConfig.Extenders...), scheduler.WithParallelism(cc.ComponentConfig.Parallelism), scheduler.WithBuildFrameworkCapturer(func(profile kubeschedulerconfig.KubeSchedulerProfile) { diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index f598a16d4c6..2e70f6a14c5 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -73,6 +73,8 @@ type Configurator struct { podMaxBackoffSeconds int64 + podMaxUnschedulableQDuration time.Duration + profiles []schedulerapi.KubeSchedulerProfile registry frameworkruntime.Registry nodeInfoSnapshot *internalcache.Snapshot @@ -168,6 +170,7 @@ func (c *Configurator) create() (*Scheduler, error) { internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second), internalqueue.WithPodNominator(nominator), internalqueue.WithClusterEventMap(c.clusterEventMap), + internalqueue.WithPodMaxUnschedulableQDuration(c.podMaxUnschedulableQDuration), ) // Setup cache debugger. diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index d2edcfc861c..8f76d81dcfb 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -48,9 +48,12 @@ import ( ) const ( - // If a pod stays in unschedulableQ longer than unschedulableQTimeInterval, - // the pod will be moved from unschedulableQ to backoffQ or activeQ. - unschedulableQTimeInterval = 60 * time.Second + // DefaultPodMaxUnschedulableQDuration is the default value for the maximum + // time a pod can stay in unschedulableQ. If a pod stays in unschedulableQ + // for longer than this value, the pod will be moved from unschedulableQ to + // backoffQ or activeQ. If this value is empty, the default value (60s) + // will be used. + DefaultPodMaxUnschedulableQDuration time.Duration = 60 * time.Second queueClosed = "scheduling queue is closed" ) @@ -136,6 +139,8 @@ type PriorityQueue struct { podInitialBackoffDuration time.Duration // pod maximum backoff duration. podMaxBackoffDuration time.Duration + // the maximum time a pod can stay in the unschedulableQ. + podMaxUnschedulableQDuration time.Duration lock sync.RWMutex cond sync.Cond @@ -167,11 +172,12 @@ type PriorityQueue struct { } type priorityQueueOptions struct { - clock util.Clock - podInitialBackoffDuration time.Duration - podMaxBackoffDuration time.Duration - podNominator framework.PodNominator - clusterEventMap map[framework.ClusterEvent]sets.String + clock util.Clock + podInitialBackoffDuration time.Duration + podMaxBackoffDuration time.Duration + podMaxUnschedulableQDuration time.Duration + podNominator framework.PodNominator + clusterEventMap map[framework.ClusterEvent]sets.String } // Option configures a PriorityQueue @@ -212,10 +218,18 @@ func WithClusterEventMap(m map[framework.ClusterEvent]sets.String) Option { } } +// WithPodMaxUnschedulableQDuration sets podMaxUnschedulableQDuration for PriorityQueue. +func WithPodMaxUnschedulableQDuration(duration time.Duration) Option { + return func(o *priorityQueueOptions) { + o.podMaxUnschedulableQDuration = duration + } +} + var defaultPriorityQueueOptions = priorityQueueOptions{ - clock: util.RealClock{}, - podInitialBackoffDuration: DefaultPodInitialBackoffDuration, - podMaxBackoffDuration: DefaultPodMaxBackoffDuration, + clock: util.RealClock{}, + podInitialBackoffDuration: DefaultPodInitialBackoffDuration, + podMaxBackoffDuration: DefaultPodMaxBackoffDuration, + podMaxUnschedulableQDuration: DefaultPodMaxUnschedulableQDuration, } // Making sure that PriorityQueue implements SchedulingQueue. @@ -253,15 +267,16 @@ func NewPriorityQueue( } pq := &PriorityQueue{ - PodNominator: options.podNominator, - clock: options.clock, - stop: make(chan struct{}), - podInitialBackoffDuration: options.podInitialBackoffDuration, - podMaxBackoffDuration: options.podMaxBackoffDuration, - activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()), - unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()), - moveRequestCycle: -1, - clusterEventMap: options.clusterEventMap, + PodNominator: options.podNominator, + clock: options.clock, + stop: make(chan struct{}), + podInitialBackoffDuration: options.podInitialBackoffDuration, + podMaxBackoffDuration: options.podMaxBackoffDuration, + podMaxUnschedulableQDuration: options.podMaxUnschedulableQDuration, + activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()), + unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()), + moveRequestCycle: -1, + clusterEventMap: options.clusterEventMap, } pq.cond.L = &pq.lock pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder()) @@ -437,7 +452,7 @@ func (p *PriorityQueue) flushBackoffQCompleted() { } } -// flushUnschedulableQLeftover moves pods which stay in unschedulableQ longer than unschedulableQTimeInterval +// flushUnschedulableQLeftover moves pods which stay in unschedulableQ longer than podMaxUnschedulableQDuration // to backoffQ or activeQ. func (p *PriorityQueue) flushUnschedulableQLeftover() { p.lock.Lock() @@ -447,7 +462,7 @@ func (p *PriorityQueue) flushUnschedulableQLeftover() { currentTime := p.clock.Now() for _, pInfo := range p.unschedulableQ.podInfoMap { lastScheduleTime := pInfo.Timestamp - if currentTime.Sub(lastScheduleTime) > unschedulableQTimeInterval { + if currentTime.Sub(lastScheduleTime) > p.podMaxUnschedulableQDuration { podsToMove = append(podsToMove, pInfo) } } diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 168e04df83e..3f5e3a42513 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -1374,7 +1374,7 @@ func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) { q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&highPod, "fakePlugin"), q.SchedulingCycle()) q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&midPod, "fakePlugin"), q.SchedulingCycle()) - c.Step(unschedulableQTimeInterval + time.Second) + c.Step(DefaultPodMaxUnschedulableQDuration + time.Second) q.flushUnschedulableQLeftover() if p, err := q.Pop(); err != nil || p.Pod != &highPod { @@ -1385,6 +1385,109 @@ func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) { } } +func TestPriorityQueue_initPodMaxUnschedulableQDuration(t *testing.T) { + pod1 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-1", + Namespace: "ns1", + UID: types.UID("tp-1"), + }, + Status: v1.PodStatus{ + NominatedNodeName: "node1", + }, + } + + pod2 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-2", + Namespace: "ns2", + UID: types.UID("tp-2"), + }, + Status: v1.PodStatus{ + NominatedNodeName: "node2", + }, + } + + var timestamp = time.Now() + pInfo1 := &framework.QueuedPodInfo{ + PodInfo: framework.NewPodInfo(pod1), + Timestamp: timestamp.Add(-time.Second), + } + pInfo2 := &framework.QueuedPodInfo{ + PodInfo: framework.NewPodInfo(pod2), + Timestamp: timestamp.Add(-2 * time.Second), + } + + tests := []struct { + name string + podMaxUnschedulableQDuration time.Duration + operations []operation + operands []*framework.QueuedPodInfo + expected []*framework.QueuedPodInfo + }{ + { + name: "New priority queue by the default value of podMaxUnschedulableQDuration", + operations: []operation{ + addPodUnschedulableQ, + addPodUnschedulableQ, + flushUnschedulerQ, + }, + operands: []*framework.QueuedPodInfo{pInfo1, pInfo2, nil}, + expected: []*framework.QueuedPodInfo{pInfo2, pInfo1}, + }, + { + name: "New priority queue by user-defined value of podMaxUnschedulableQDuration", + podMaxUnschedulableQDuration: 30 * time.Second, + operations: []operation{ + addPodUnschedulableQ, + addPodUnschedulableQ, + flushUnschedulerQ, + }, + operands: []*framework.QueuedPodInfo{pInfo1, pInfo2, nil}, + expected: []*framework.QueuedPodInfo{pInfo2, pInfo1}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + var queue *PriorityQueue + if test.podMaxUnschedulableQDuration > 0 { + queue = NewTestQueue(ctx, newDefaultQueueSort(), + WithClock(testingclock.NewFakeClock(timestamp)), + WithPodMaxUnschedulableQDuration(test.podMaxUnschedulableQDuration)) + } else { + queue = NewTestQueue(ctx, newDefaultQueueSort(), + WithClock(testingclock.NewFakeClock(timestamp))) + } + + var podInfoList []*framework.QueuedPodInfo + + for i, op := range test.operations { + op(queue, test.operands[i]) + } + + expectedLen := len(test.expected) + if queue.activeQ.Len() != expectedLen { + t.Fatalf("Expected %v items to be in activeQ, but got: %v", expectedLen, queue.activeQ.Len()) + } + + for i := 0; i < expectedLen; i++ { + if pInfo, err := queue.activeQ.Pop(); err != nil { + t.Errorf("Error while popping the head of the queue: %v", err) + } else { + podInfoList = append(podInfoList, pInfo.(*framework.QueuedPodInfo)) + } + } + + if diff := cmp.Diff(test.expected, podInfoList); diff != "" { + t.Errorf("Unexpected QueuedPodInfo list (-want, +got):\n%s", diff) + } + }) + } +} + type operation func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) var ( @@ -1426,6 +1529,10 @@ var ( moveClockForward = func(queue *PriorityQueue, _ *framework.QueuedPodInfo) { queue.clock.(*testingclock.FakeClock).Step(2 * time.Second) } + flushUnschedulerQ = func(queue *PriorityQueue, _ *framework.QueuedPodInfo) { + queue.clock.(*testingclock.FakeClock).Step(queue.podMaxUnschedulableQDuration) + queue.flushUnschedulableQLeftover() + } ) // TestPodTimestamp tests the operations related to QueuedPodInfo. @@ -1727,9 +1834,9 @@ func TestPerPodSchedulingMetrics(t *testing.T) { t.Fatalf("Failed to pop a pod %v", err) } queue.AddUnschedulableIfNotPresent(pInfo, 1) - // Override clock to exceed the unschedulableQTimeInterval so that unschedulable pods + // Override clock to exceed the DefaultPodMaxUnschedulableQDuration so that unschedulable pods // will be moved to activeQ - c.SetTime(timestamp.Add(unschedulableQTimeInterval + 1)) + c.SetTime(timestamp.Add(DefaultPodMaxUnschedulableQDuration + 1)) queue.flushUnschedulableQLeftover() pInfo, err = queue.Pop() if err != nil { @@ -1747,9 +1854,9 @@ func TestPerPodSchedulingMetrics(t *testing.T) { t.Fatalf("Failed to pop a pod %v", err) } queue.AddUnschedulableIfNotPresent(pInfo, 1) - // Override clock to exceed the unschedulableQTimeInterval so that unschedulable pods + // Override clock to exceed the DefaultPodMaxUnschedulableQDuration so that unschedulable pods // will be moved to activeQ - c.SetTime(timestamp.Add(unschedulableQTimeInterval + 1)) + c.SetTime(timestamp.Add(DefaultPodMaxUnschedulableQDuration + 1)) queue.flushUnschedulableQLeftover() newPod := pod.DeepCopy() newPod.Generation = 1 diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index fca985150cc..dc78a5c804e 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -96,11 +96,12 @@ type Scheduler struct { } type schedulerOptions struct { - componentConfigVersion string - kubeConfig *restclient.Config - percentageOfNodesToScore int32 - podInitialBackoffSeconds int64 - podMaxBackoffSeconds int64 + componentConfigVersion string + kubeConfig *restclient.Config + percentageOfNodesToScore int32 + podInitialBackoffSeconds int64 + podMaxBackoffSeconds int64 + podMaxUnschedulableQDuration time.Duration // Contains out-of-tree plugins to be merged with the in-tree registry. frameworkOutOfTreeRegistry frameworkruntime.Registry profiles []schedulerapi.KubeSchedulerProfile @@ -175,6 +176,13 @@ func WithPodMaxBackoffSeconds(podMaxBackoffSeconds int64) Option { } } +// WithPodMaxUnschedulableQDuration sets PodMaxUnschedulableQDuration for PriorityQueue. +func WithPodMaxUnschedulableQDuration(duration time.Duration) Option { + return func(o *schedulerOptions) { + o.podMaxUnschedulableQDuration = duration + } +} + // WithExtenders sets extenders for the Scheduler func WithExtenders(e ...schedulerapi.Extender) Option { return func(o *schedulerOptions) { @@ -193,10 +201,11 @@ func WithBuildFrameworkCapturer(fc FrameworkCapturer) Option { } var defaultSchedulerOptions = schedulerOptions{ - percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore, - podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()), - podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()), - parallelism: int32(parallelize.DefaultParallelism), + percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore, + podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()), + podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()), + podMaxUnschedulableQDuration: internalqueue.DefaultPodMaxUnschedulableQDuration, + parallelism: int32(parallelize.DefaultParallelism), // Ideally we would statically set the default profile here, but we can't because // creating the default profile may require testing feature gates, which may get // set dynamically in tests. Therefore, we delay creating it until New is actually @@ -242,23 +251,24 @@ func New(client clientset.Interface, clusterEventMap := make(map[framework.ClusterEvent]sets.String) configurator := &Configurator{ - componentConfigVersion: options.componentConfigVersion, - client: client, - kubeConfig: options.kubeConfig, - recorderFactory: recorderFactory, - informerFactory: informerFactory, - schedulerCache: schedulerCache, - StopEverything: stopEverything, - percentageOfNodesToScore: options.percentageOfNodesToScore, - podInitialBackoffSeconds: options.podInitialBackoffSeconds, - podMaxBackoffSeconds: options.podMaxBackoffSeconds, - profiles: append([]schedulerapi.KubeSchedulerProfile(nil), options.profiles...), - registry: registry, - nodeInfoSnapshot: snapshot, - extenders: options.extenders, - frameworkCapturer: options.frameworkCapturer, - parallellism: options.parallelism, - clusterEventMap: clusterEventMap, + componentConfigVersion: options.componentConfigVersion, + client: client, + kubeConfig: options.kubeConfig, + recorderFactory: recorderFactory, + informerFactory: informerFactory, + schedulerCache: schedulerCache, + StopEverything: stopEverything, + percentageOfNodesToScore: options.percentageOfNodesToScore, + podInitialBackoffSeconds: options.podInitialBackoffSeconds, + podMaxBackoffSeconds: options.podMaxBackoffSeconds, + podMaxUnschedulableQDuration: options.podMaxUnschedulableQDuration, + profiles: append([]schedulerapi.KubeSchedulerProfile(nil), options.profiles...), + registry: registry, + nodeInfoSnapshot: snapshot, + extenders: options.extenders, + frameworkCapturer: options.frameworkCapturer, + parallellism: options.parallelism, + clusterEventMap: clusterEventMap, } metrics.Register()