From 1e170637c3ce6c4ccd378275d9e52192f4be12b7 Mon Sep 17 00:00:00 2001 From: Mike Spreitzer Date: Sat, 18 Jan 2020 01:46:11 -0500 Subject: [PATCH] Refactored QueueSet configuration into two phases So that errors can be detected before resolving concurrency shares into concurrency counts. --- .../util/flowcontrol/fairqueuing/interface.go | 77 ++++--- .../fairqueuing/queueset/queueset.go | 193 +++++++++++------- .../fairqueuing/queueset/queueset_test.go | 42 ++-- .../fairqueuing/testing/no-restraint.go | 16 +- 4 files changed, 204 insertions(+), 124 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go index 12378153562..c48acea7543 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go @@ -21,9 +21,22 @@ import ( "time" ) -// QueueSetFactory is used to create QueueSet objects. +// QueueSetFactory is used to create QueueSet objects. Creation, like +// config update, is done in two phases: the first phase consumes the +// QueuingConfig and the second consumes the DispatchingConfig. They +// are separated so that errors from the first phase can be found +// before committing to a concurrency allotment for the second. type QueueSetFactory interface { - NewQueueSet(config QueueSetConfig) (QueueSet, error) + // QualifyQueuingConfig does the first phase of creating a QueueSet + QualifyQueuingConfig(QueuingConfig) (QueueSetCompleter, error) +} + +// QueueSetCompleter finishes the two-step process of creating or +// reconfiguring a QueueSet +type QueueSetCompleter interface { + // GetQueueSet returns a QueueSet configured by the given + // dispatching configuration. + GetQueueSet(DispatchingConfig) QueueSet } // QueueSet is the abstraction for the queuing and dispatching @@ -34,19 +47,27 @@ type QueueSetFactory interface { // . Some day we may have connections between priority levels, but // today is not that day. type QueueSet interface { - // SetConfiguration updates the configuration - SetConfiguration(QueueSetConfig) error + // QualifyQueuingConfig starts the two-step process of updating + // the configuration. No change is made until GetQueueSet is + // called. If `C := X.QualifyQueuingConfig(q)` then + // `C.GetQueueSet(d)` returns the same value `X`. If the + // QueuingConfig's DesiredNumQueues field is zero then the other + // queuing-specific config parameters are not changed, so that the + // queues continue draining as before. + QualifyQueuingConfig(QueuingConfig) (QueueSetCompleter, error) - // Quiesce controls whether the QueueSet is operating normally or is quiescing. - // A quiescing QueueSet drains as normal but does not admit any - // new requests. Passing a non-nil handler means the system should - // be quiescing, a nil handler means the system should operate - // normally. A call to Wait while the system is quiescing - // will be rebuffed by returning tryAnother=true. If all the - // queues have no requests waiting nor executing while the system - // is quiescing then the handler will eventually be called with no - // locks held (even if the system becomes non-quiescing between the - // triggering state and the required call). + // Quiesce controls whether the QueueSet is operating normally or + // is quiescing. A quiescing QueueSet drains as normal but does + // not admit any new requests. Passing a non-nil handler means the + // system should be quiescing, a nil handler means the system + // should operate normally. A call to Wait while the system is + // quiescing will be rebuffed by returning tryAnother=true. If all + // the queues have no requests waiting nor executing while the + // system is quiescing then the handler will eventually be called + // with no locks held (even if the system becomes non-quiescing + // between the triggering state and the required call). In Go + // Memory Model terms, the triggering state happens before the + // call to the EmptyHandler. Quiesce(EmptyHandler) // Wait uses the given hashValue as the source of entropy as it @@ -56,34 +77,44 @@ type QueueSet interface { // tryAnother==true at return then the QueueSet has become // undesirable and the client should try to find a different // QueueSet to use; execute and afterExecution are irrelevant in - // this case. Otherwise, if execute then the client should start - // executing the request and, once the request finishes execution - // or is canceled, call afterExecution(). Otherwise the client - // should not execute the request and afterExecution is - // irrelevant. + // this case. In the terms of the Go Memory Model, there was a + // call to Quiesce with a non-nil handler that happened before + // this return from Wait. Otherwise, if execute then the client + // should start executing the request and, once the request + // finishes execution or is canceled, call afterExecution(). + // Otherwise the client should not execute the request and + // afterExecution is irrelevant. Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func()) } -// QueueSetConfig defines the configuration of a QueueSet. -type QueueSetConfig struct { +// QueuingConfig defines the configuration of the queuing aspect of a QueueSet. +type QueuingConfig struct { // Name is used to identify a queue set, allowing for descriptive information about its intended use Name string - // ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time - ConcurrencyLimit int + // DesiredNumQueues is the number of queues that the API says // should exist now. This may be zero, in which case // QueueLengthLimit, HandSize, and RequestWaitLimit are ignored. DesiredNumQueues int + // QueueLengthLimit is the maximum number of requests that may be waiting in a given queue at a time QueueLengthLimit int + // HandSize is a parameter of shuffle sharding. Upon arrival of a request, a queue is chosen by randomly // dealing a "hand" of this many queues and then picking one of minimum length. HandSize int + // RequestWaitLimit is the maximum amount of time that a request may wait in a queue. // If, by the end of that time, the request has not been dispatched then it is rejected. RequestWaitLimit time.Duration } +// DispatchingConfig defines the configuration of the dispatching aspect of a QueueSet. +type DispatchingConfig struct { + // ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time + ConcurrencyLimit int +} + // EmptyHandler is used to notify the callee when all the queues // of a QueueSet have been drained. type EmptyHandler interface { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 7b789e7ca14..9ec03d8ab55 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -22,10 +22,10 @@ import ( "sync" "time" - "k8s.io/apimachinery/pkg/util/runtime" - "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apiserver/pkg/util/flowcontrol/counter" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise" @@ -43,12 +43,13 @@ type queueSetFactory struct { clock clock.PassiveClock } -// NewQueueSetFactory creates a new QueueSetFactory object -func NewQueueSetFactory(c clock.PassiveClock, counter counter.GoRoutineCounter) fq.QueueSetFactory { - return &queueSetFactory{ - counter: counter, - clock: c, - } +// `*queueSetCompleter` implements QueueSetCompleter. Exactly one of +// the fields `factory` and `theSet` is non-nil. +type queueSetCompleter struct { + factory *queueSetFactory + theSet *queueSet + qCfg fq.QueuingConfig + dealer *shufflesharding.Dealer } // queueSet implements the Fair Queuing for Server Requests technique @@ -65,12 +66,19 @@ type queueSet struct { lock sync.Mutex - // config holds the current configuration. Its DesiredNumQueues - // may be less than the current number of queues. If its - // DesiredNumQueues is zero then its other queuing parameters - // retain the settings they had when DesiredNumQueues was last - // non-zero (if ever). - config fq.QueueSetConfig + // qCfg holds the current queuing configuration. Its + // DesiredNumQueues may be less than the current number of queues. + // If its DesiredNumQueues is zero then its other queuing + // parameters retain the settings they had when DesiredNumQueues + // was last non-zero (if ever). + qCfg fq.QueuingConfig + + // the current dispatching configuration. + dCfg fq.DispatchingConfig + + // If `config.DesiredNumQueues` is non-zero then dealer is not nil + // and is good for `config`. + dealer *shufflesharding.Dealer // queues may be longer than the desired number, while the excess // queues are still draining. @@ -96,24 +104,55 @@ type queueSet struct { totRequestsExecuting int emptyHandler fq.EmptyHandler - dealer *shufflesharding.Dealer } -// NewQueueSet creates a new QueueSet object. -// There is a new QueueSet created for each priority level. -func (qsf queueSetFactory) NewQueueSet(config fq.QueueSetConfig) (fq.QueueSet, error) { - fq := &queueSet{ - clock: qsf.clock, - counter: qsf.counter, - estimatedServiceTime: 60, - config: config, - lastRealTime: qsf.clock.Now(), +// NewQueueSetFactory creates a new QueueSetFactory object +func NewQueueSetFactory(c clock.PassiveClock, counter counter.GoRoutineCounter) fq.QueueSetFactory { + return &queueSetFactory{ + counter: counter, + clock: c, } - err := fq.SetConfiguration(config) +} + +func (qsf *queueSetFactory) QualifyQueuingConfig(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) { + dealer, err := checkConfig(qCfg) if err != nil { return nil, err } - return fq, nil + return &queueSetCompleter{ + factory: qsf, + qCfg: qCfg, + dealer: dealer}, nil +} + +// checkConfig returns a non-nil Dealer if the config is valid and +// calls for one, and returns a non-nil error if the given config is +// invalid. +func checkConfig(qCfg fq.QueuingConfig) (*shufflesharding.Dealer, error) { + if qCfg.DesiredNumQueues == 0 { + return nil, nil + } + dealer, err := shufflesharding.NewDealer(qCfg.DesiredNumQueues, qCfg.HandSize) + if err != nil { + err = errors.Wrap(err, "the QueueSetConfig implies an invalid shuffle sharding config (DesiredNumQueues is deckSize)") + } + return dealer, err +} + +func (qsc *queueSetCompleter) GetQueueSet(dCfg fq.DispatchingConfig) fq.QueueSet { + qs := qsc.theSet + if qs == nil { + qs = &queueSet{ + clock: qsc.factory.clock, + counter: qsc.factory.counter, + estimatedServiceTime: 60, + qCfg: qsc.qCfg, + virtualTime: 0, + lastRealTime: qsc.factory.clock.Now(), + } + } + qs.setConfiguration(qsc.qCfg, qsc.dealer, dCfg) + return qs } // createQueues is a helper method for initializing an array of n queues @@ -125,40 +164,45 @@ func createQueues(n, baseIndex int) []*queue { return fqqueues } -// SetConfiguration is used to set the configuration for a queueSet -// update handling for when fields are updated is handled here as well - +func (qs *queueSet) QualifyQueuingConfig(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) { + dealer, err := checkConfig(qCfg) + if err != nil { + return nil, err + } + return &queueSetCompleter{ + theSet: qs, + qCfg: qCfg, + dealer: dealer}, nil +} + +// SetConfiguration is used to set the configuration for a queueSet. +// Update handling for when fields are updated is handled here as well - // eg: if DesiredNum is increased, SetConfiguration reconciles by // adding more queues. -func (qs *queueSet) SetConfiguration(config fq.QueueSetConfig) error { +func (qs *queueSet) setConfiguration(qCfg fq.QueuingConfig, dealer *shufflesharding.Dealer, dCfg fq.DispatchingConfig) { qs.lockAndSyncTime() defer qs.lock.Unlock() - var dealer *shufflesharding.Dealer - if config.DesiredNumQueues > 0 { - var err error - dealer, err = shufflesharding.NewDealer(config.DesiredNumQueues, config.HandSize) - if err != nil { - return errors.Wrap(err, "shuffle sharding dealer creation failed") - } + if qCfg.DesiredNumQueues > 0 { // Adding queues is the only thing that requires immediate action // Removing queues is handled by omitting indexes >DesiredNum from // chooseQueueIndexLocked numQueues := len(qs.queues) - if config.DesiredNumQueues > numQueues { + if qCfg.DesiredNumQueues > numQueues { qs.queues = append(qs.queues, - createQueues(config.DesiredNumQueues-numQueues, len(qs.queues))...) + createQueues(qCfg.DesiredNumQueues-numQueues, len(qs.queues))...) } } else { - config.QueueLengthLimit = qs.config.QueueLengthLimit - config.HandSize = qs.config.HandSize - config.RequestWaitLimit = qs.config.RequestWaitLimit + qCfg.QueueLengthLimit = qs.qCfg.QueueLengthLimit + qCfg.HandSize = qs.qCfg.HandSize + qCfg.RequestWaitLimit = qs.qCfg.RequestWaitLimit } - qs.config = config + qs.qCfg = qCfg + qs.dCfg = dCfg qs.dealer = dealer qs.dispatchAsMuchAsPossibleLocked() - return nil } // Quiesce controls whether the QueueSet is operating normally or is quiescing. @@ -216,16 +260,16 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i // A call to Wait while the system is quiescing will be rebuffed by // returning `tryAnother=true`. if qs.emptyHandler != nil { - klog.V(5).Infof("QS(%s): rebuffing request %#+v %#+v with TryAnother", qs.config.Name, descr1, descr2) + klog.V(5).Infof("QS(%s): rebuffing request %#+v %#+v with TryAnother", qs.qCfg.Name, descr1, descr2) return decisionTryAnother } // ======================================================================== // Step 0: // Apply only concurrency limit, if zero queues desired - if qs.config.DesiredNumQueues < 1 { - if qs.totRequestsExecuting >= qs.config.ConcurrencyLimit { - klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v because %d are executing and the limit is %d", qs.config.Name, descr1, descr2, qs.totRequestsExecuting, qs.config.ConcurrencyLimit) + if qs.qCfg.DesiredNumQueues < 1 { + if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit { + klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v because %d are executing and the limit is %d", qs.qCfg.Name, descr1, descr2, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit) return decisionReject } req = qs.dispatchSansQueue(descr1, descr2) @@ -243,8 +287,8 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i // req == nil means that the request was rejected - no remaining // concurrency shares and at max queue length already if req == nil { - klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v due to queue full", qs.config.Name, descr1, descr2) - metrics.AddReject(qs.config.Name, "queue-full") + klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v due to queue full", qs.qCfg.Name, descr1, descr2) + metrics.AddReject(qs.qCfg.Name, "queue-full") return decisionReject } @@ -274,7 +318,7 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i qs.goroutineDoneOrBlocked() select { case <-doneCh: - klog.V(6).Infof("QS(%s): Context of request %#+v %#+v is Done", qs.config.Name, descr1, descr2) + klog.V(6).Infof("QS(%s): Context of request %#+v %#+v is Done", qs.qCfg.Name, descr1, descr2) req.decision.Set(decisionCancel) } qs.goroutineDoneOrBlocked() @@ -291,18 +335,18 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i case requestDecision: decision = dec default: - klog.Errorf("QS(%s): Impossible decision %#+v (of type %T) for request %#+v %#+v", qs.config.Name, decisionAny, decisionAny, descr1, descr2) + klog.Errorf("QS(%s): Impossible decision %#+v (of type %T) for request %#+v %#+v", qs.qCfg.Name, decisionAny, decisionAny, descr1, descr2) decision = decisionExecute } switch decision { case decisionReject: - klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.config.Name, descr1, descr2) - metrics.AddReject(qs.config.Name, "time-out") + klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, descr1, descr2) + metrics.AddReject(qs.qCfg.Name, "time-out") case decisionCancel: qs.syncTimeLocked() // TODO(aaron-prindle) add metrics to these two cases if req.isWaiting { - klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.config.Name, descr1, descr2) + klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, descr1, descr2) // remove the request from the queue as it has timed out for i := range req.queue.requests { if req == req.queue.requests[i] { @@ -317,7 +361,7 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i // then a call to the EmptyHandler should be forked. qs.maybeForkEmptyHandlerLocked() } else { - klog.V(5).Infof("QS(%s): request %#+v %#+v canceled shortly after dispatch", qs.config.Name, descr1, descr2) + klog.V(5).Infof("QS(%s): request %#+v %#+v canceled shortly after dispatch", qs.qCfg.Name, descr1, descr2) } } return decision @@ -370,7 +414,7 @@ func (qs *queueSet) getVirtualTimeRatio() float64 { if activeQueues == 0 { return 0 } - return math.Min(float64(reqs), float64(qs.config.ConcurrencyLimit)) / float64(activeQueues) + return math.Min(float64(reqs), float64(qs.dCfg.ConcurrencyLimit)) / float64(activeQueues) } // timeoutOldRequestsAndRejectOrEnqueueLocked encapsulates the logic required @@ -404,7 +448,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue uint64, if ok := qs.rejectOrEnqueueLocked(req); !ok { return nil } - metrics.ObserveQueueLength(qs.config.Name, len(queue.requests)) + metrics.ObserveQueueLength(qs.qCfg.Name, len(queue.requests)) return req } @@ -415,13 +459,16 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte bestQueueLen := int(math.MaxInt32) // the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`. qs.dealer.Deal(hashValue, func(queueIdx int) { + if queueIdx < 0 || queueIdx >= len(qs.queues) { + return + } thisLen := len(qs.queues[queueIdx].requests) - klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of length %d", qs.config.Name, descr1, descr2, queueIdx, thisLen) + klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of length %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisLen) if thisLen < bestQueueLen { bestQueueIdx, bestQueueLen = queueIdx, thisLen } }) - klog.V(6).Infof("QS(%s): For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.config.Name, descr1, descr2, bestQueueIdx, bestQueueLen, qs.queues[bestQueueIdx].requestsExecuting) + klog.V(6).Infof("QS(%s): For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, descr1, descr2, bestQueueIdx, bestQueueLen, qs.queues[bestQueueIdx].requestsExecuting) return bestQueueIdx } @@ -436,7 +483,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue) { // as newer requests also will not have timed out // now - requestWaitLimit = waitLimit - waitLimit := now.Add(-qs.config.RequestWaitLimit) + waitLimit := now.Add(-qs.qCfg.RequestWaitLimit) for i, req := range reqs { if waitLimit.After(req.arrivalTime) { req.decision.SetLocked(decisionReject) @@ -463,8 +510,8 @@ func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool { queue := request.queue curQueueLength := len(queue.requests) // rejects the newly arrived request if resource criteria not met - if qs.totRequestsExecuting >= qs.config.ConcurrencyLimit && - curQueueLength >= qs.config.QueueLengthLimit { + if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit && + curQueueLength >= qs.qCfg.QueueLengthLimit { return false } @@ -479,12 +526,12 @@ func (qs *queueSet) enqueueLocked(request *request) { // the queue’s virtual start time is set to the virtual time. queue.virtualStart = qs.virtualTime if klog.V(6) { - klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2) + klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2) } } queue.Enqueue(request) qs.totRequestsWaiting++ - metrics.UpdateFlowControlRequestsInQueue(qs.config.Name, qs.totRequestsWaiting) + metrics.UpdateFlowControlRequestsInQueue(qs.qCfg.Name, qs.totRequestsWaiting) } // dispatchAsMuchAsPossibleLocked runs a loop, as long as there @@ -494,7 +541,7 @@ func (qs *queueSet) enqueueLocked(request *request) { // queue, increment the count of the number executing, and send true // to the request's channel. func (qs *queueSet) dispatchAsMuchAsPossibleLocked() { - for qs.totRequestsWaiting != 0 && qs.totRequestsExecuting < qs.config.ConcurrencyLimit { + for qs.totRequestsWaiting != 0 && qs.totRequestsExecuting < qs.dCfg.ConcurrencyLimit { ok := qs.dispatchLocked() if !ok { break @@ -512,9 +559,9 @@ func (qs *queueSet) dispatchSansQueue(descr1, descr2 interface{}) *request { } qs.totRequestsExecuting++ if klog.V(5) { - klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %#+v %#+v, qs will have %d executing", qs.config.Name, now.Format(nsTimeFmt), qs.virtualTime, descr1, descr2, qs.totRequestsExecuting) + klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, descr1, descr2, qs.totRequestsExecuting) } - metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, qs.totRequestsExecuting) + metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting) return req } @@ -537,11 +584,11 @@ func (qs *queueSet) dispatchLocked() bool { qs.totRequestsExecuting++ queue.requestsExecuting++ if klog.V(6) { - klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.index, queue.virtualStart, len(queue.requests), queue.requestsExecuting) + klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.index, queue.virtualStart, len(queue.requests), queue.requestsExecuting) } // When a request is dequeued for service -> qs.virtualStart += G queue.virtualStart += qs.estimatedServiceTime - metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, qs.totRequestsExecuting) + metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting) request.decision.SetLocked(decisionExecute) return ok } @@ -590,11 +637,11 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) { // callback updates important state in the queueSet func (qs *queueSet) finishRequestLocked(r *request) { qs.totRequestsExecuting-- - metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, qs.totRequestsExecuting) + metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting) if r.queue == nil { if klog.V(6) { - klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting) + klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting) } return } @@ -609,12 +656,12 @@ func (qs *queueSet) finishRequestLocked(r *request) { r.queue.requestsExecuting-- if klog.V(6) { - klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index, r.queue.virtualStart, S, len(r.queue.requests), r.queue.requestsExecuting) + klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index, r.queue.virtualStart, S, len(r.queue.requests), r.queue.requestsExecuting) } // If there are more queues than desired and this one has no // requests then remove it - if len(qs.queues) > qs.config.DesiredNumQueues && + if len(qs.queues) > qs.qCfg.DesiredNumQueues && len(r.queue.requests) == 0 && r.queue.requestsExecuting == 0 { qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.queue.index) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 024f7805a55..d198da00bf0 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -141,12 +141,11 @@ func init() { func TestNoRestraint(t *testing.T) { now := time.Now() clk, counter := clock.NewFakeEventClock(now, 0, nil) - nrf := test.NewNoRestraintFactory() - config := fq.QueueSetConfig{} - nr, err := nrf.NewQueueSet(config) + nrc, err := test.NewNoRestraintFactory().QualifyQueuingConfig(fq.QueuingConfig{}) if err != nil { - t.Fatalf("QueueSet creation failed with %v", err) + t.Fatal(err) } + nr := nrc.GetQueueSet(fq.DispatchingConfig{}) exerciseQueueSetUniformScenario(t, "NoRestraint", nr, []uniformClient{ {1001001001, 5, 10, time.Second, time.Second}, {2002002002, 2, 10, time.Second, time.Second / 2}, @@ -158,18 +157,18 @@ func TestUniformFlows(t *testing.T) { clk, counter := clock.NewFakeEventClock(now, 0, nil) qsf := NewQueueSetFactory(clk, counter) - config := fq.QueueSetConfig{ + qCfg := fq.QueuingConfig{ Name: "TestUniformFlows", - ConcurrencyLimit: 4, DesiredNumQueues: 8, QueueLengthLimit: 6, HandSize: 3, RequestWaitLimit: 10 * time.Minute, } - qs, err := qsf.NewQueueSet(config) + qsc, err := qsf.QualifyQueuingConfig(qCfg) if err != nil { - t.Fatalf("QueueSet creation failed with %v", err) + t.Fatal(err) } + qs := qsc.GetQueueSet(fq.DispatchingConfig{ConcurrencyLimit: 4}) exerciseQueueSetUniformScenario(t, "UniformFlows", qs, []uniformClient{ {1001001001, 5, 10, time.Second, time.Second}, @@ -182,18 +181,18 @@ func TestDifferentFlows(t *testing.T) { clk, counter := clock.NewFakeEventClock(now, 0, nil) qsf := NewQueueSetFactory(clk, counter) - config := fq.QueueSetConfig{ + qCfg := fq.QueuingConfig{ Name: "TestDifferentFlows", - ConcurrencyLimit: 4, DesiredNumQueues: 8, QueueLengthLimit: 6, HandSize: 3, RequestWaitLimit: 10 * time.Minute, } - qs, err := qsf.NewQueueSet(config) + qsc, err := qsf.QualifyQueuingConfig(qCfg) if err != nil { - t.Fatalf("QueueSet creation failed with %v", err) + t.Fatal(err) } + qs := qsc.GetQueueSet(fq.DispatchingConfig{ConcurrencyLimit: 4}) exerciseQueueSetUniformScenario(t, "DifferentFlows", qs, []uniformClient{ {1001001001, 6, 10, time.Second, time.Second}, @@ -206,18 +205,15 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) { clk, counter := clock.NewFakeEventClock(now, 0, nil) qsf := NewQueueSetFactory(clk, counter) - config := fq.QueueSetConfig{ + qCfg := fq.QueuingConfig{ Name: "TestDifferentFlowsWithoutQueuing", - ConcurrencyLimit: 4, DesiredNumQueues: 0, - QueueLengthLimit: 6, - HandSize: 3, - RequestWaitLimit: 10 * time.Minute, } - qs, err := qsf.NewQueueSet(config) + qsc, err := qsf.QualifyQueuingConfig(qCfg) if err != nil { - t.Fatalf("QueueSet creation failed with %v", err) + t.Fatal(err) } + qs := qsc.GetQueueSet(fq.DispatchingConfig{ConcurrencyLimit: 4}) exerciseQueueSetUniformScenario(t, "DifferentFlowsWithoutQueuing", qs, []uniformClient{ {1001001001, 6, 10, time.Second, 57 * time.Millisecond}, @@ -230,18 +226,18 @@ func TestTimeout(t *testing.T) { clk, counter := clock.NewFakeEventClock(now, 0, nil) qsf := NewQueueSetFactory(clk, counter) - config := fq.QueueSetConfig{ + qCfg := fq.QueuingConfig{ Name: "TestTimeout", - ConcurrencyLimit: 1, DesiredNumQueues: 128, QueueLengthLimit: 128, HandSize: 1, RequestWaitLimit: 0, } - qs, err := qsf.NewQueueSet(config) + qsc, err := qsf.QualifyQueuingConfig(qCfg) if err != nil { - t.Fatalf("QueueSet creation failed with %v", err) + t.Fatal(err) } + qs := qsc.GetQueueSet(fq.DispatchingConfig{ConcurrencyLimit: 1}) exerciseQueueSetUniformScenario(t, "Timeout", qs, []uniformClient{ {1001001001, 5, 100, time.Second, time.Second}, diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go index 5ac48be94d8..9b7efa0237a 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go @@ -31,14 +31,20 @@ func NewNoRestraintFactory() fq.QueueSetFactory { type noRestraintFactory struct{} -func (noRestraintFactory) NewQueueSet(config fq.QueueSetConfig) (fq.QueueSet, error) { - return noRestraint{}, nil -} +type noRestraintCompeter struct{} type noRestraint struct{} -func (noRestraint) SetConfiguration(config fq.QueueSetConfig) error { - return nil +func (noRestraintFactory) QualifyQueuingConfig(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) { + return noRestraintCompeter{}, nil +} + +func (noRestraintCompeter) GetQueueSet(dCfg fq.DispatchingConfig) fq.QueueSet { + return noRestraint{} +} + +func (noRestraint) QualifyQueuingConfig(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) { + return noRestraintCompeter{}, nil } func (noRestraint) Quiesce(fq.EmptyHandler) {