Merge pull request #108017 from denkensk/add-flush-flag

Add a deprecated cmd flag for the time interval between flushing pods from unschedualbeQ to activeQ or backoffQ.
This commit is contained in:
Kubernetes Prow Robot 2022-02-16 07:56:38 -08:00 committed by GitHub
commit f538b0b105
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 214 additions and 54 deletions

View File

@ -17,6 +17,8 @@ limitations under the License.
package config
import (
"time"
apiserver "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
@ -49,6 +51,12 @@ type Config struct {
// LeaderElection is optional.
LeaderElection *leaderelection.LeaderElectionConfig
// PodMaxUnschedulableQDuration is the maximum time a pod can stay in
// unschedulableQ. If a pod stays in unschedulableQ for longer than this
// value, the pod will be moved from unschedulableQ to backoffQ or activeQ.
// If this value is empty, the default value (60s) will be used.
PodMaxUnschedulableQDuration time.Duration
}
type completedConfig struct {

View File

@ -17,6 +17,8 @@ limitations under the License.
package options
import (
"time"
"github.com/spf13/pflag"
componentbaseconfig "k8s.io/component-base/config"
)
@ -28,6 +30,11 @@ type DeprecatedOptions struct {
componentbaseconfig.ClientConnectionConfiguration
// Note that only the deprecated options (lock-object-name and lock-object-namespace) are populated here.
componentbaseconfig.LeaderElectionConfiguration
// PodMaxUnschedulableQDuration is the maximum time a pod can stay in
// unschedulableQ. If a pod stays in unschedulableQ for longer than this
// value, the pod will be moved from unschedulableQ to backoffQ or activeQ.
// If this value is empty, the default value (60s) will be used.
PodMaxUnschedulableQDuration time.Duration
}
// AddFlags adds flags for the deprecated options.
@ -44,4 +51,5 @@ func (o *DeprecatedOptions) AddFlags(fs *pflag.FlagSet) {
fs.Int32Var(&o.Burst, "kube-api-burst", 100, "DEPRECATED: burst to use while talking with kubernetes apiserver. This parameter is ignored if a config file is specified in --config.")
fs.StringVar(&o.ResourceNamespace, "lock-object-namespace", "kube-system", "DEPRECATED: define the namespace of the lock object. Will be removed in favor of leader-elect-resource-namespace. This parameter is ignored if a config file is specified in --config.")
fs.StringVar(&o.ResourceName, "lock-object-name", "kube-scheduler", "DEPRECATED: define the name of the lock object. Will be removed in favor of leader-elect-resource-name. This parameter is ignored if a config file is specified in --config.")
fs.DurationVar(&o.PodMaxUnschedulableQDuration, "pod-max-unschedulableq-duration", 60*time.Second, "DEPRECATED: the maximum time a pod can stay in unschedulableQ. If a pod stays in unschedulableQ for longer than this value, the pod will be moved from unschedulableQ to backoffQ or activeQ. This flag is deprecated and will be removed in 1.26")
}

View File

@ -79,7 +79,9 @@ func NewOptions() *Options {
SecureServing: apiserveroptions.NewSecureServingOptions().WithLoopback(),
Authentication: apiserveroptions.NewDelegatingAuthenticationOptions(),
Authorization: apiserveroptions.NewDelegatingAuthorizationOptions(),
Deprecated: &DeprecatedOptions{},
Deprecated: &DeprecatedOptions{
PodMaxUnschedulableQDuration: 60 * time.Second,
},
LeaderElection: &componentbaseconfig.LeaderElectionConfiguration{
LeaderElect: true,
LeaseDuration: metav1.Duration{Duration: 15 * time.Second},
@ -231,6 +233,12 @@ func (o *Options) ApplyTo(c *schedulerappconfig.Config) error {
}
}
o.Metrics.Apply()
// Apply value independently instead of using ApplyDeprecated() because it can't be configured via ComponentConfig.
if o.Deprecated != nil {
c.PodMaxUnschedulableQDuration = o.Deprecated.PodMaxUnschedulableQDuration
}
return nil
}

View File

@ -328,6 +328,7 @@ func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions
scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
scheduler.WithPodMaxUnschedulableQDuration(cc.PodMaxUnschedulableQDuration),
scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
scheduler.WithParallelism(cc.ComponentConfig.Parallelism),
scheduler.WithBuildFrameworkCapturer(func(profile kubeschedulerconfig.KubeSchedulerProfile) {

View File

@ -73,6 +73,8 @@ type Configurator struct {
podMaxBackoffSeconds int64
podMaxUnschedulableQDuration time.Duration
profiles []schedulerapi.KubeSchedulerProfile
registry frameworkruntime.Registry
nodeInfoSnapshot *internalcache.Snapshot
@ -168,6 +170,7 @@ func (c *Configurator) create() (*Scheduler, error) {
internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second),
internalqueue.WithPodNominator(nominator),
internalqueue.WithClusterEventMap(c.clusterEventMap),
internalqueue.WithPodMaxUnschedulableQDuration(c.podMaxUnschedulableQDuration),
)
// Setup cache debugger.

View File

@ -48,9 +48,12 @@ import (
)
const (
// If a pod stays in unschedulableQ longer than unschedulableQTimeInterval,
// the pod will be moved from unschedulableQ to backoffQ or activeQ.
unschedulableQTimeInterval = 60 * time.Second
// DefaultPodMaxUnschedulableQDuration is the default value for the maximum
// time a pod can stay in unschedulableQ. If a pod stays in unschedulableQ
// for longer than this value, the pod will be moved from unschedulableQ to
// backoffQ or activeQ. If this value is empty, the default value (60s)
// will be used.
DefaultPodMaxUnschedulableQDuration time.Duration = 60 * time.Second
queueClosed = "scheduling queue is closed"
)
@ -136,6 +139,8 @@ type PriorityQueue struct {
podInitialBackoffDuration time.Duration
// pod maximum backoff duration.
podMaxBackoffDuration time.Duration
// the maximum time a pod can stay in the unschedulableQ.
podMaxUnschedulableQDuration time.Duration
lock sync.RWMutex
cond sync.Cond
@ -167,11 +172,12 @@ type PriorityQueue struct {
}
type priorityQueueOptions struct {
clock util.Clock
podInitialBackoffDuration time.Duration
podMaxBackoffDuration time.Duration
podNominator framework.PodNominator
clusterEventMap map[framework.ClusterEvent]sets.String
clock util.Clock
podInitialBackoffDuration time.Duration
podMaxBackoffDuration time.Duration
podMaxUnschedulableQDuration time.Duration
podNominator framework.PodNominator
clusterEventMap map[framework.ClusterEvent]sets.String
}
// Option configures a PriorityQueue
@ -212,10 +218,18 @@ func WithClusterEventMap(m map[framework.ClusterEvent]sets.String) Option {
}
}
// WithPodMaxUnschedulableQDuration sets podMaxUnschedulableQDuration for PriorityQueue.
func WithPodMaxUnschedulableQDuration(duration time.Duration) Option {
return func(o *priorityQueueOptions) {
o.podMaxUnschedulableQDuration = duration
}
}
var defaultPriorityQueueOptions = priorityQueueOptions{
clock: util.RealClock{},
podInitialBackoffDuration: DefaultPodInitialBackoffDuration,
podMaxBackoffDuration: DefaultPodMaxBackoffDuration,
clock: util.RealClock{},
podInitialBackoffDuration: DefaultPodInitialBackoffDuration,
podMaxBackoffDuration: DefaultPodMaxBackoffDuration,
podMaxUnschedulableQDuration: DefaultPodMaxUnschedulableQDuration,
}
// Making sure that PriorityQueue implements SchedulingQueue.
@ -253,15 +267,16 @@ func NewPriorityQueue(
}
pq := &PriorityQueue{
PodNominator: options.podNominator,
clock: options.clock,
stop: make(chan struct{}),
podInitialBackoffDuration: options.podInitialBackoffDuration,
podMaxBackoffDuration: options.podMaxBackoffDuration,
activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
moveRequestCycle: -1,
clusterEventMap: options.clusterEventMap,
PodNominator: options.podNominator,
clock: options.clock,
stop: make(chan struct{}),
podInitialBackoffDuration: options.podInitialBackoffDuration,
podMaxBackoffDuration: options.podMaxBackoffDuration,
podMaxUnschedulableQDuration: options.podMaxUnschedulableQDuration,
activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
moveRequestCycle: -1,
clusterEventMap: options.clusterEventMap,
}
pq.cond.L = &pq.lock
pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
@ -437,7 +452,7 @@ func (p *PriorityQueue) flushBackoffQCompleted() {
}
}
// flushUnschedulableQLeftover moves pods which stay in unschedulableQ longer than unschedulableQTimeInterval
// flushUnschedulableQLeftover moves pods which stay in unschedulableQ longer than podMaxUnschedulableQDuration
// to backoffQ or activeQ.
func (p *PriorityQueue) flushUnschedulableQLeftover() {
p.lock.Lock()
@ -447,7 +462,7 @@ func (p *PriorityQueue) flushUnschedulableQLeftover() {
currentTime := p.clock.Now()
for _, pInfo := range p.unschedulableQ.podInfoMap {
lastScheduleTime := pInfo.Timestamp
if currentTime.Sub(lastScheduleTime) > unschedulableQTimeInterval {
if currentTime.Sub(lastScheduleTime) > p.podMaxUnschedulableQDuration {
podsToMove = append(podsToMove, pInfo)
}
}

View File

@ -1374,7 +1374,7 @@ func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) {
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&highPod, "fakePlugin"), q.SchedulingCycle())
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&midPod, "fakePlugin"), q.SchedulingCycle())
c.Step(unschedulableQTimeInterval + time.Second)
c.Step(DefaultPodMaxUnschedulableQDuration + time.Second)
q.flushUnschedulableQLeftover()
if p, err := q.Pop(); err != nil || p.Pod != &highPod {
@ -1385,6 +1385,109 @@ func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) {
}
}
func TestPriorityQueue_initPodMaxUnschedulableQDuration(t *testing.T) {
pod1 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod-1",
Namespace: "ns1",
UID: types.UID("tp-1"),
},
Status: v1.PodStatus{
NominatedNodeName: "node1",
},
}
pod2 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod-2",
Namespace: "ns2",
UID: types.UID("tp-2"),
},
Status: v1.PodStatus{
NominatedNodeName: "node2",
},
}
var timestamp = time.Now()
pInfo1 := &framework.QueuedPodInfo{
PodInfo: framework.NewPodInfo(pod1),
Timestamp: timestamp.Add(-time.Second),
}
pInfo2 := &framework.QueuedPodInfo{
PodInfo: framework.NewPodInfo(pod2),
Timestamp: timestamp.Add(-2 * time.Second),
}
tests := []struct {
name string
podMaxUnschedulableQDuration time.Duration
operations []operation
operands []*framework.QueuedPodInfo
expected []*framework.QueuedPodInfo
}{
{
name: "New priority queue by the default value of podMaxUnschedulableQDuration",
operations: []operation{
addPodUnschedulableQ,
addPodUnschedulableQ,
flushUnschedulerQ,
},
operands: []*framework.QueuedPodInfo{pInfo1, pInfo2, nil},
expected: []*framework.QueuedPodInfo{pInfo2, pInfo1},
},
{
name: "New priority queue by user-defined value of podMaxUnschedulableQDuration",
podMaxUnschedulableQDuration: 30 * time.Second,
operations: []operation{
addPodUnschedulableQ,
addPodUnschedulableQ,
flushUnschedulerQ,
},
operands: []*framework.QueuedPodInfo{pInfo1, pInfo2, nil},
expected: []*framework.QueuedPodInfo{pInfo2, pInfo1},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var queue *PriorityQueue
if test.podMaxUnschedulableQDuration > 0 {
queue = NewTestQueue(ctx, newDefaultQueueSort(),
WithClock(testingclock.NewFakeClock(timestamp)),
WithPodMaxUnschedulableQDuration(test.podMaxUnschedulableQDuration))
} else {
queue = NewTestQueue(ctx, newDefaultQueueSort(),
WithClock(testingclock.NewFakeClock(timestamp)))
}
var podInfoList []*framework.QueuedPodInfo
for i, op := range test.operations {
op(queue, test.operands[i])
}
expectedLen := len(test.expected)
if queue.activeQ.Len() != expectedLen {
t.Fatalf("Expected %v items to be in activeQ, but got: %v", expectedLen, queue.activeQ.Len())
}
for i := 0; i < expectedLen; i++ {
if pInfo, err := queue.activeQ.Pop(); err != nil {
t.Errorf("Error while popping the head of the queue: %v", err)
} else {
podInfoList = append(podInfoList, pInfo.(*framework.QueuedPodInfo))
}
}
if diff := cmp.Diff(test.expected, podInfoList); diff != "" {
t.Errorf("Unexpected QueuedPodInfo list (-want, +got):\n%s", diff)
}
})
}
}
type operation func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo)
var (
@ -1426,6 +1529,10 @@ var (
moveClockForward = func(queue *PriorityQueue, _ *framework.QueuedPodInfo) {
queue.clock.(*testingclock.FakeClock).Step(2 * time.Second)
}
flushUnschedulerQ = func(queue *PriorityQueue, _ *framework.QueuedPodInfo) {
queue.clock.(*testingclock.FakeClock).Step(queue.podMaxUnschedulableQDuration)
queue.flushUnschedulableQLeftover()
}
)
// TestPodTimestamp tests the operations related to QueuedPodInfo.
@ -1727,9 +1834,9 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
t.Fatalf("Failed to pop a pod %v", err)
}
queue.AddUnschedulableIfNotPresent(pInfo, 1)
// Override clock to exceed the unschedulableQTimeInterval so that unschedulable pods
// Override clock to exceed the DefaultPodMaxUnschedulableQDuration so that unschedulable pods
// will be moved to activeQ
c.SetTime(timestamp.Add(unschedulableQTimeInterval + 1))
c.SetTime(timestamp.Add(DefaultPodMaxUnschedulableQDuration + 1))
queue.flushUnschedulableQLeftover()
pInfo, err = queue.Pop()
if err != nil {
@ -1747,9 +1854,9 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
t.Fatalf("Failed to pop a pod %v", err)
}
queue.AddUnschedulableIfNotPresent(pInfo, 1)
// Override clock to exceed the unschedulableQTimeInterval so that unschedulable pods
// Override clock to exceed the DefaultPodMaxUnschedulableQDuration so that unschedulable pods
// will be moved to activeQ
c.SetTime(timestamp.Add(unschedulableQTimeInterval + 1))
c.SetTime(timestamp.Add(DefaultPodMaxUnschedulableQDuration + 1))
queue.flushUnschedulableQLeftover()
newPod := pod.DeepCopy()
newPod.Generation = 1

View File

@ -96,11 +96,12 @@ type Scheduler struct {
}
type schedulerOptions struct {
componentConfigVersion string
kubeConfig *restclient.Config
percentageOfNodesToScore int32
podInitialBackoffSeconds int64
podMaxBackoffSeconds int64
componentConfigVersion string
kubeConfig *restclient.Config
percentageOfNodesToScore int32
podInitialBackoffSeconds int64
podMaxBackoffSeconds int64
podMaxUnschedulableQDuration time.Duration
// Contains out-of-tree plugins to be merged with the in-tree registry.
frameworkOutOfTreeRegistry frameworkruntime.Registry
profiles []schedulerapi.KubeSchedulerProfile
@ -175,6 +176,13 @@ func WithPodMaxBackoffSeconds(podMaxBackoffSeconds int64) Option {
}
}
// WithPodMaxUnschedulableQDuration sets PodMaxUnschedulableQDuration for PriorityQueue.
func WithPodMaxUnschedulableQDuration(duration time.Duration) Option {
return func(o *schedulerOptions) {
o.podMaxUnschedulableQDuration = duration
}
}
// WithExtenders sets extenders for the Scheduler
func WithExtenders(e ...schedulerapi.Extender) Option {
return func(o *schedulerOptions) {
@ -193,10 +201,11 @@ func WithBuildFrameworkCapturer(fc FrameworkCapturer) Option {
}
var defaultSchedulerOptions = schedulerOptions{
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()),
podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()),
parallelism: int32(parallelize.DefaultParallelism),
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()),
podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()),
podMaxUnschedulableQDuration: internalqueue.DefaultPodMaxUnschedulableQDuration,
parallelism: int32(parallelize.DefaultParallelism),
// Ideally we would statically set the default profile here, but we can't because
// creating the default profile may require testing feature gates, which may get
// set dynamically in tests. Therefore, we delay creating it until New is actually
@ -242,23 +251,24 @@ func New(client clientset.Interface,
clusterEventMap := make(map[framework.ClusterEvent]sets.String)
configurator := &Configurator{
componentConfigVersion: options.componentConfigVersion,
client: client,
kubeConfig: options.kubeConfig,
recorderFactory: recorderFactory,
informerFactory: informerFactory,
schedulerCache: schedulerCache,
StopEverything: stopEverything,
percentageOfNodesToScore: options.percentageOfNodesToScore,
podInitialBackoffSeconds: options.podInitialBackoffSeconds,
podMaxBackoffSeconds: options.podMaxBackoffSeconds,
profiles: append([]schedulerapi.KubeSchedulerProfile(nil), options.profiles...),
registry: registry,
nodeInfoSnapshot: snapshot,
extenders: options.extenders,
frameworkCapturer: options.frameworkCapturer,
parallellism: options.parallelism,
clusterEventMap: clusterEventMap,
componentConfigVersion: options.componentConfigVersion,
client: client,
kubeConfig: options.kubeConfig,
recorderFactory: recorderFactory,
informerFactory: informerFactory,
schedulerCache: schedulerCache,
StopEverything: stopEverything,
percentageOfNodesToScore: options.percentageOfNodesToScore,
podInitialBackoffSeconds: options.podInitialBackoffSeconds,
podMaxBackoffSeconds: options.podMaxBackoffSeconds,
podMaxUnschedulableQDuration: options.podMaxUnschedulableQDuration,
profiles: append([]schedulerapi.KubeSchedulerProfile(nil), options.profiles...),
registry: registry,
nodeInfoSnapshot: snapshot,
extenders: options.extenders,
frameworkCapturer: options.frameworkCapturer,
parallellism: options.parallelism,
clusterEventMap: clusterEventMap,
}
metrics.Register()