Refactored QueueSet configuration into two phases

So that errors can be detected before resolving concurrency shares
into concurrency counts.
This commit is contained in:
Mike Spreitzer 2020-01-18 01:46:11 -05:00
parent 3d8317ae91
commit 1e170637c3
4 changed files with 204 additions and 124 deletions

View File

@ -21,9 +21,22 @@ import (
"time" "time"
) )
// QueueSetFactory is used to create QueueSet objects. // QueueSetFactory is used to create QueueSet objects. Creation, like
// config update, is done in two phases: the first phase consumes the
// QueuingConfig and the second consumes the DispatchingConfig. They
// are separated so that errors from the first phase can be found
// before committing to a concurrency allotment for the second.
type QueueSetFactory interface { type QueueSetFactory interface {
NewQueueSet(config QueueSetConfig) (QueueSet, error) // QualifyQueuingConfig does the first phase of creating a QueueSet
QualifyQueuingConfig(QueuingConfig) (QueueSetCompleter, error)
}
// QueueSetCompleter finishes the two-step process of creating or
// reconfiguring a QueueSet
type QueueSetCompleter interface {
// GetQueueSet returns a QueueSet configured by the given
// dispatching configuration.
GetQueueSet(DispatchingConfig) QueueSet
} }
// QueueSet is the abstraction for the queuing and dispatching // QueueSet is the abstraction for the queuing and dispatching
@ -34,19 +47,27 @@ type QueueSetFactory interface {
// . Some day we may have connections between priority levels, but // . Some day we may have connections between priority levels, but
// today is not that day. // today is not that day.
type QueueSet interface { type QueueSet interface {
// SetConfiguration updates the configuration // QualifyQueuingConfig starts the two-step process of updating
SetConfiguration(QueueSetConfig) error // the configuration. No change is made until GetQueueSet is
// called. If `C := X.QualifyQueuingConfig(q)` then
// `C.GetQueueSet(d)` returns the same value `X`. If the
// QueuingConfig's DesiredNumQueues field is zero then the other
// queuing-specific config parameters are not changed, so that the
// queues continue draining as before.
QualifyQueuingConfig(QueuingConfig) (QueueSetCompleter, error)
// Quiesce controls whether the QueueSet is operating normally or is quiescing. // Quiesce controls whether the QueueSet is operating normally or
// A quiescing QueueSet drains as normal but does not admit any // is quiescing. A quiescing QueueSet drains as normal but does
// new requests. Passing a non-nil handler means the system should // not admit any new requests. Passing a non-nil handler means the
// be quiescing, a nil handler means the system should operate // system should be quiescing, a nil handler means the system
// normally. A call to Wait while the system is quiescing // should operate normally. A call to Wait while the system is
// will be rebuffed by returning tryAnother=true. If all the // quiescing will be rebuffed by returning tryAnother=true. If all
// queues have no requests waiting nor executing while the system // the queues have no requests waiting nor executing while the
// is quiescing then the handler will eventually be called with no // system is quiescing then the handler will eventually be called
// locks held (even if the system becomes non-quiescing between the // with no locks held (even if the system becomes non-quiescing
// triggering state and the required call). // between the triggering state and the required call). In Go
// Memory Model terms, the triggering state happens before the
// call to the EmptyHandler.
Quiesce(EmptyHandler) Quiesce(EmptyHandler)
// Wait uses the given hashValue as the source of entropy as it // Wait uses the given hashValue as the source of entropy as it
@ -56,34 +77,44 @@ type QueueSet interface {
// tryAnother==true at return then the QueueSet has become // tryAnother==true at return then the QueueSet has become
// undesirable and the client should try to find a different // undesirable and the client should try to find a different
// QueueSet to use; execute and afterExecution are irrelevant in // QueueSet to use; execute and afterExecution are irrelevant in
// this case. Otherwise, if execute then the client should start // this case. In the terms of the Go Memory Model, there was a
// executing the request and, once the request finishes execution // call to Quiesce with a non-nil handler that happened before
// or is canceled, call afterExecution(). Otherwise the client // this return from Wait. Otherwise, if execute then the client
// should not execute the request and afterExecution is // should start executing the request and, once the request
// irrelevant. // finishes execution or is canceled, call afterExecution().
// Otherwise the client should not execute the request and
// afterExecution is irrelevant.
Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func()) Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func())
} }
// QueueSetConfig defines the configuration of a QueueSet. // QueuingConfig defines the configuration of the queuing aspect of a QueueSet.
type QueueSetConfig struct { type QueuingConfig struct {
// Name is used to identify a queue set, allowing for descriptive information about its intended use // Name is used to identify a queue set, allowing for descriptive information about its intended use
Name string Name string
// ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time
ConcurrencyLimit int
// DesiredNumQueues is the number of queues that the API says // DesiredNumQueues is the number of queues that the API says
// should exist now. This may be zero, in which case // should exist now. This may be zero, in which case
// QueueLengthLimit, HandSize, and RequestWaitLimit are ignored. // QueueLengthLimit, HandSize, and RequestWaitLimit are ignored.
DesiredNumQueues int DesiredNumQueues int
// QueueLengthLimit is the maximum number of requests that may be waiting in a given queue at a time // QueueLengthLimit is the maximum number of requests that may be waiting in a given queue at a time
QueueLengthLimit int QueueLengthLimit int
// HandSize is a parameter of shuffle sharding. Upon arrival of a request, a queue is chosen by randomly // HandSize is a parameter of shuffle sharding. Upon arrival of a request, a queue is chosen by randomly
// dealing a "hand" of this many queues and then picking one of minimum length. // dealing a "hand" of this many queues and then picking one of minimum length.
HandSize int HandSize int
// RequestWaitLimit is the maximum amount of time that a request may wait in a queue. // RequestWaitLimit is the maximum amount of time that a request may wait in a queue.
// If, by the end of that time, the request has not been dispatched then it is rejected. // If, by the end of that time, the request has not been dispatched then it is rejected.
RequestWaitLimit time.Duration RequestWaitLimit time.Duration
} }
// DispatchingConfig defines the configuration of the dispatching aspect of a QueueSet.
type DispatchingConfig struct {
// ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time
ConcurrencyLimit int
}
// EmptyHandler is used to notify the callee when all the queues // EmptyHandler is used to notify the callee when all the queues
// of a QueueSet have been drained. // of a QueueSet have been drained.
type EmptyHandler interface { type EmptyHandler interface {

View File

@ -22,10 +22,10 @@ import (
"sync" "sync"
"time" "time"
"k8s.io/apimachinery/pkg/util/runtime"
"github.com/pkg/errors" "github.com/pkg/errors"
"k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/util/flowcontrol/counter" "k8s.io/apiserver/pkg/util/flowcontrol/counter"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise" "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise"
@ -43,12 +43,13 @@ type queueSetFactory struct {
clock clock.PassiveClock clock clock.PassiveClock
} }
// NewQueueSetFactory creates a new QueueSetFactory object // `*queueSetCompleter` implements QueueSetCompleter. Exactly one of
func NewQueueSetFactory(c clock.PassiveClock, counter counter.GoRoutineCounter) fq.QueueSetFactory { // the fields `factory` and `theSet` is non-nil.
return &queueSetFactory{ type queueSetCompleter struct {
counter: counter, factory *queueSetFactory
clock: c, theSet *queueSet
} qCfg fq.QueuingConfig
dealer *shufflesharding.Dealer
} }
// queueSet implements the Fair Queuing for Server Requests technique // queueSet implements the Fair Queuing for Server Requests technique
@ -65,12 +66,19 @@ type queueSet struct {
lock sync.Mutex lock sync.Mutex
// config holds the current configuration. Its DesiredNumQueues // qCfg holds the current queuing configuration. Its
// may be less than the current number of queues. If its // DesiredNumQueues may be less than the current number of queues.
// DesiredNumQueues is zero then its other queuing parameters // If its DesiredNumQueues is zero then its other queuing
// retain the settings they had when DesiredNumQueues was last // parameters retain the settings they had when DesiredNumQueues
// non-zero (if ever). // was last non-zero (if ever).
config fq.QueueSetConfig qCfg fq.QueuingConfig
// the current dispatching configuration.
dCfg fq.DispatchingConfig
// If `config.DesiredNumQueues` is non-zero then dealer is not nil
// and is good for `config`.
dealer *shufflesharding.Dealer
// queues may be longer than the desired number, while the excess // queues may be longer than the desired number, while the excess
// queues are still draining. // queues are still draining.
@ -96,24 +104,55 @@ type queueSet struct {
totRequestsExecuting int totRequestsExecuting int
emptyHandler fq.EmptyHandler emptyHandler fq.EmptyHandler
dealer *shufflesharding.Dealer
} }
// NewQueueSet creates a new QueueSet object. // NewQueueSetFactory creates a new QueueSetFactory object
// There is a new QueueSet created for each priority level. func NewQueueSetFactory(c clock.PassiveClock, counter counter.GoRoutineCounter) fq.QueueSetFactory {
func (qsf queueSetFactory) NewQueueSet(config fq.QueueSetConfig) (fq.QueueSet, error) { return &queueSetFactory{
fq := &queueSet{ counter: counter,
clock: qsf.clock, clock: c,
counter: qsf.counter,
estimatedServiceTime: 60,
config: config,
lastRealTime: qsf.clock.Now(),
} }
err := fq.SetConfiguration(config) }
func (qsf *queueSetFactory) QualifyQueuingConfig(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) {
dealer, err := checkConfig(qCfg)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return fq, nil return &queueSetCompleter{
factory: qsf,
qCfg: qCfg,
dealer: dealer}, nil
}
// checkConfig returns a non-nil Dealer if the config is valid and
// calls for one, and returns a non-nil error if the given config is
// invalid.
func checkConfig(qCfg fq.QueuingConfig) (*shufflesharding.Dealer, error) {
if qCfg.DesiredNumQueues == 0 {
return nil, nil
}
dealer, err := shufflesharding.NewDealer(qCfg.DesiredNumQueues, qCfg.HandSize)
if err != nil {
err = errors.Wrap(err, "the QueueSetConfig implies an invalid shuffle sharding config (DesiredNumQueues is deckSize)")
}
return dealer, err
}
func (qsc *queueSetCompleter) GetQueueSet(dCfg fq.DispatchingConfig) fq.QueueSet {
qs := qsc.theSet
if qs == nil {
qs = &queueSet{
clock: qsc.factory.clock,
counter: qsc.factory.counter,
estimatedServiceTime: 60,
qCfg: qsc.qCfg,
virtualTime: 0,
lastRealTime: qsc.factory.clock.Now(),
}
}
qs.setConfiguration(qsc.qCfg, qsc.dealer, dCfg)
return qs
} }
// createQueues is a helper method for initializing an array of n queues // createQueues is a helper method for initializing an array of n queues
@ -125,40 +164,45 @@ func createQueues(n, baseIndex int) []*queue {
return fqqueues return fqqueues
} }
// SetConfiguration is used to set the configuration for a queueSet func (qs *queueSet) QualifyQueuingConfig(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) {
// update handling for when fields are updated is handled here as well - dealer, err := checkConfig(qCfg)
if err != nil {
return nil, err
}
return &queueSetCompleter{
theSet: qs,
qCfg: qCfg,
dealer: dealer}, nil
}
// SetConfiguration is used to set the configuration for a queueSet.
// Update handling for when fields are updated is handled here as well -
// eg: if DesiredNum is increased, SetConfiguration reconciles by // eg: if DesiredNum is increased, SetConfiguration reconciles by
// adding more queues. // adding more queues.
func (qs *queueSet) SetConfiguration(config fq.QueueSetConfig) error { func (qs *queueSet) setConfiguration(qCfg fq.QueuingConfig, dealer *shufflesharding.Dealer, dCfg fq.DispatchingConfig) {
qs.lockAndSyncTime() qs.lockAndSyncTime()
defer qs.lock.Unlock() defer qs.lock.Unlock()
var dealer *shufflesharding.Dealer
if config.DesiredNumQueues > 0 { if qCfg.DesiredNumQueues > 0 {
var err error
dealer, err = shufflesharding.NewDealer(config.DesiredNumQueues, config.HandSize)
if err != nil {
return errors.Wrap(err, "shuffle sharding dealer creation failed")
}
// Adding queues is the only thing that requires immediate action // Adding queues is the only thing that requires immediate action
// Removing queues is handled by omitting indexes >DesiredNum from // Removing queues is handled by omitting indexes >DesiredNum from
// chooseQueueIndexLocked // chooseQueueIndexLocked
numQueues := len(qs.queues) numQueues := len(qs.queues)
if config.DesiredNumQueues > numQueues { if qCfg.DesiredNumQueues > numQueues {
qs.queues = append(qs.queues, qs.queues = append(qs.queues,
createQueues(config.DesiredNumQueues-numQueues, len(qs.queues))...) createQueues(qCfg.DesiredNumQueues-numQueues, len(qs.queues))...)
} }
} else { } else {
config.QueueLengthLimit = qs.config.QueueLengthLimit qCfg.QueueLengthLimit = qs.qCfg.QueueLengthLimit
config.HandSize = qs.config.HandSize qCfg.HandSize = qs.qCfg.HandSize
config.RequestWaitLimit = qs.config.RequestWaitLimit qCfg.RequestWaitLimit = qs.qCfg.RequestWaitLimit
} }
qs.config = config qs.qCfg = qCfg
qs.dCfg = dCfg
qs.dealer = dealer qs.dealer = dealer
qs.dispatchAsMuchAsPossibleLocked() qs.dispatchAsMuchAsPossibleLocked()
return nil
} }
// Quiesce controls whether the QueueSet is operating normally or is quiescing. // Quiesce controls whether the QueueSet is operating normally or is quiescing.
@ -216,16 +260,16 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i
// A call to Wait while the system is quiescing will be rebuffed by // A call to Wait while the system is quiescing will be rebuffed by
// returning `tryAnother=true`. // returning `tryAnother=true`.
if qs.emptyHandler != nil { if qs.emptyHandler != nil {
klog.V(5).Infof("QS(%s): rebuffing request %#+v %#+v with TryAnother", qs.config.Name, descr1, descr2) klog.V(5).Infof("QS(%s): rebuffing request %#+v %#+v with TryAnother", qs.qCfg.Name, descr1, descr2)
return decisionTryAnother return decisionTryAnother
} }
// ======================================================================== // ========================================================================
// Step 0: // Step 0:
// Apply only concurrency limit, if zero queues desired // Apply only concurrency limit, if zero queues desired
if qs.config.DesiredNumQueues < 1 { if qs.qCfg.DesiredNumQueues < 1 {
if qs.totRequestsExecuting >= qs.config.ConcurrencyLimit { if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit {
klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v because %d are executing and the limit is %d", qs.config.Name, descr1, descr2, qs.totRequestsExecuting, qs.config.ConcurrencyLimit) klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v because %d are executing and the limit is %d", qs.qCfg.Name, descr1, descr2, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit)
return decisionReject return decisionReject
} }
req = qs.dispatchSansQueue(descr1, descr2) req = qs.dispatchSansQueue(descr1, descr2)
@ -243,8 +287,8 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i
// req == nil means that the request was rejected - no remaining // req == nil means that the request was rejected - no remaining
// concurrency shares and at max queue length already // concurrency shares and at max queue length already
if req == nil { if req == nil {
klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v due to queue full", qs.config.Name, descr1, descr2) klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v due to queue full", qs.qCfg.Name, descr1, descr2)
metrics.AddReject(qs.config.Name, "queue-full") metrics.AddReject(qs.qCfg.Name, "queue-full")
return decisionReject return decisionReject
} }
@ -274,7 +318,7 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i
qs.goroutineDoneOrBlocked() qs.goroutineDoneOrBlocked()
select { select {
case <-doneCh: case <-doneCh:
klog.V(6).Infof("QS(%s): Context of request %#+v %#+v is Done", qs.config.Name, descr1, descr2) klog.V(6).Infof("QS(%s): Context of request %#+v %#+v is Done", qs.qCfg.Name, descr1, descr2)
req.decision.Set(decisionCancel) req.decision.Set(decisionCancel)
} }
qs.goroutineDoneOrBlocked() qs.goroutineDoneOrBlocked()
@ -291,18 +335,18 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i
case requestDecision: case requestDecision:
decision = dec decision = dec
default: default:
klog.Errorf("QS(%s): Impossible decision %#+v (of type %T) for request %#+v %#+v", qs.config.Name, decisionAny, decisionAny, descr1, descr2) klog.Errorf("QS(%s): Impossible decision %#+v (of type %T) for request %#+v %#+v", qs.qCfg.Name, decisionAny, decisionAny, descr1, descr2)
decision = decisionExecute decision = decisionExecute
} }
switch decision { switch decision {
case decisionReject: case decisionReject:
klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.config.Name, descr1, descr2) klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, descr1, descr2)
metrics.AddReject(qs.config.Name, "time-out") metrics.AddReject(qs.qCfg.Name, "time-out")
case decisionCancel: case decisionCancel:
qs.syncTimeLocked() qs.syncTimeLocked()
// TODO(aaron-prindle) add metrics to these two cases // TODO(aaron-prindle) add metrics to these two cases
if req.isWaiting { if req.isWaiting {
klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.config.Name, descr1, descr2) klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, descr1, descr2)
// remove the request from the queue as it has timed out // remove the request from the queue as it has timed out
for i := range req.queue.requests { for i := range req.queue.requests {
if req == req.queue.requests[i] { if req == req.queue.requests[i] {
@ -317,7 +361,7 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i
// then a call to the EmptyHandler should be forked. // then a call to the EmptyHandler should be forked.
qs.maybeForkEmptyHandlerLocked() qs.maybeForkEmptyHandlerLocked()
} else { } else {
klog.V(5).Infof("QS(%s): request %#+v %#+v canceled shortly after dispatch", qs.config.Name, descr1, descr2) klog.V(5).Infof("QS(%s): request %#+v %#+v canceled shortly after dispatch", qs.qCfg.Name, descr1, descr2)
} }
} }
return decision return decision
@ -370,7 +414,7 @@ func (qs *queueSet) getVirtualTimeRatio() float64 {
if activeQueues == 0 { if activeQueues == 0 {
return 0 return 0
} }
return math.Min(float64(reqs), float64(qs.config.ConcurrencyLimit)) / float64(activeQueues) return math.Min(float64(reqs), float64(qs.dCfg.ConcurrencyLimit)) / float64(activeQueues)
} }
// timeoutOldRequestsAndRejectOrEnqueueLocked encapsulates the logic required // timeoutOldRequestsAndRejectOrEnqueueLocked encapsulates the logic required
@ -404,7 +448,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue uint64,
if ok := qs.rejectOrEnqueueLocked(req); !ok { if ok := qs.rejectOrEnqueueLocked(req); !ok {
return nil return nil
} }
metrics.ObserveQueueLength(qs.config.Name, len(queue.requests)) metrics.ObserveQueueLength(qs.qCfg.Name, len(queue.requests))
return req return req
} }
@ -415,13 +459,16 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte
bestQueueLen := int(math.MaxInt32) bestQueueLen := int(math.MaxInt32)
// the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`. // the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`.
qs.dealer.Deal(hashValue, func(queueIdx int) { qs.dealer.Deal(hashValue, func(queueIdx int) {
if queueIdx < 0 || queueIdx >= len(qs.queues) {
return
}
thisLen := len(qs.queues[queueIdx].requests) thisLen := len(qs.queues[queueIdx].requests)
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of length %d", qs.config.Name, descr1, descr2, queueIdx, thisLen) klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of length %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisLen)
if thisLen < bestQueueLen { if thisLen < bestQueueLen {
bestQueueIdx, bestQueueLen = queueIdx, thisLen bestQueueIdx, bestQueueLen = queueIdx, thisLen
} }
}) })
klog.V(6).Infof("QS(%s): For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.config.Name, descr1, descr2, bestQueueIdx, bestQueueLen, qs.queues[bestQueueIdx].requestsExecuting) klog.V(6).Infof("QS(%s): For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, descr1, descr2, bestQueueIdx, bestQueueLen, qs.queues[bestQueueIdx].requestsExecuting)
return bestQueueIdx return bestQueueIdx
} }
@ -436,7 +483,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue) {
// as newer requests also will not have timed out // as newer requests also will not have timed out
// now - requestWaitLimit = waitLimit // now - requestWaitLimit = waitLimit
waitLimit := now.Add(-qs.config.RequestWaitLimit) waitLimit := now.Add(-qs.qCfg.RequestWaitLimit)
for i, req := range reqs { for i, req := range reqs {
if waitLimit.After(req.arrivalTime) { if waitLimit.After(req.arrivalTime) {
req.decision.SetLocked(decisionReject) req.decision.SetLocked(decisionReject)
@ -463,8 +510,8 @@ func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
queue := request.queue queue := request.queue
curQueueLength := len(queue.requests) curQueueLength := len(queue.requests)
// rejects the newly arrived request if resource criteria not met // rejects the newly arrived request if resource criteria not met
if qs.totRequestsExecuting >= qs.config.ConcurrencyLimit && if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit &&
curQueueLength >= qs.config.QueueLengthLimit { curQueueLength >= qs.qCfg.QueueLengthLimit {
return false return false
} }
@ -479,12 +526,12 @@ func (qs *queueSet) enqueueLocked(request *request) {
// the queues virtual start time is set to the virtual time. // the queues virtual start time is set to the virtual time.
queue.virtualStart = qs.virtualTime queue.virtualStart = qs.virtualTime
if klog.V(6) { if klog.V(6) {
klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2) klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2)
} }
} }
queue.Enqueue(request) queue.Enqueue(request)
qs.totRequestsWaiting++ qs.totRequestsWaiting++
metrics.UpdateFlowControlRequestsInQueue(qs.config.Name, qs.totRequestsWaiting) metrics.UpdateFlowControlRequestsInQueue(qs.qCfg.Name, qs.totRequestsWaiting)
} }
// dispatchAsMuchAsPossibleLocked runs a loop, as long as there // dispatchAsMuchAsPossibleLocked runs a loop, as long as there
@ -494,7 +541,7 @@ func (qs *queueSet) enqueueLocked(request *request) {
// queue, increment the count of the number executing, and send true // queue, increment the count of the number executing, and send true
// to the request's channel. // to the request's channel.
func (qs *queueSet) dispatchAsMuchAsPossibleLocked() { func (qs *queueSet) dispatchAsMuchAsPossibleLocked() {
for qs.totRequestsWaiting != 0 && qs.totRequestsExecuting < qs.config.ConcurrencyLimit { for qs.totRequestsWaiting != 0 && qs.totRequestsExecuting < qs.dCfg.ConcurrencyLimit {
ok := qs.dispatchLocked() ok := qs.dispatchLocked()
if !ok { if !ok {
break break
@ -512,9 +559,9 @@ func (qs *queueSet) dispatchSansQueue(descr1, descr2 interface{}) *request {
} }
qs.totRequestsExecuting++ qs.totRequestsExecuting++
if klog.V(5) { if klog.V(5) {
klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %#+v %#+v, qs will have %d executing", qs.config.Name, now.Format(nsTimeFmt), qs.virtualTime, descr1, descr2, qs.totRequestsExecuting) klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, descr1, descr2, qs.totRequestsExecuting)
} }
metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, qs.totRequestsExecuting) metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting)
return req return req
} }
@ -537,11 +584,11 @@ func (qs *queueSet) dispatchLocked() bool {
qs.totRequestsExecuting++ qs.totRequestsExecuting++
queue.requestsExecuting++ queue.requestsExecuting++
if klog.V(6) { if klog.V(6) {
klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.index, queue.virtualStart, len(queue.requests), queue.requestsExecuting) klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.index, queue.virtualStart, len(queue.requests), queue.requestsExecuting)
} }
// When a request is dequeued for service -> qs.virtualStart += G // When a request is dequeued for service -> qs.virtualStart += G
queue.virtualStart += qs.estimatedServiceTime queue.virtualStart += qs.estimatedServiceTime
metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, qs.totRequestsExecuting) metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting)
request.decision.SetLocked(decisionExecute) request.decision.SetLocked(decisionExecute)
return ok return ok
} }
@ -590,11 +637,11 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) {
// callback updates important state in the queueSet // callback updates important state in the queueSet
func (qs *queueSet) finishRequestLocked(r *request) { func (qs *queueSet) finishRequestLocked(r *request) {
qs.totRequestsExecuting-- qs.totRequestsExecuting--
metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, qs.totRequestsExecuting) metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting)
if r.queue == nil { if r.queue == nil {
if klog.V(6) { if klog.V(6) {
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting) klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting)
} }
return return
} }
@ -609,12 +656,12 @@ func (qs *queueSet) finishRequestLocked(r *request) {
r.queue.requestsExecuting-- r.queue.requestsExecuting--
if klog.V(6) { if klog.V(6) {
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index, r.queue.virtualStart, S, len(r.queue.requests), r.queue.requestsExecuting) klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index, r.queue.virtualStart, S, len(r.queue.requests), r.queue.requestsExecuting)
} }
// If there are more queues than desired and this one has no // If there are more queues than desired and this one has no
// requests then remove it // requests then remove it
if len(qs.queues) > qs.config.DesiredNumQueues && if len(qs.queues) > qs.qCfg.DesiredNumQueues &&
len(r.queue.requests) == 0 && len(r.queue.requests) == 0 &&
r.queue.requestsExecuting == 0 { r.queue.requestsExecuting == 0 {
qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.queue.index) qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.queue.index)

View File

@ -141,12 +141,11 @@ func init() {
func TestNoRestraint(t *testing.T) { func TestNoRestraint(t *testing.T) {
now := time.Now() now := time.Now()
clk, counter := clock.NewFakeEventClock(now, 0, nil) clk, counter := clock.NewFakeEventClock(now, 0, nil)
nrf := test.NewNoRestraintFactory() nrc, err := test.NewNoRestraintFactory().QualifyQueuingConfig(fq.QueuingConfig{})
config := fq.QueueSetConfig{}
nr, err := nrf.NewQueueSet(config)
if err != nil { if err != nil {
t.Fatalf("QueueSet creation failed with %v", err) t.Fatal(err)
} }
nr := nrc.GetQueueSet(fq.DispatchingConfig{})
exerciseQueueSetUniformScenario(t, "NoRestraint", nr, []uniformClient{ exerciseQueueSetUniformScenario(t, "NoRestraint", nr, []uniformClient{
{1001001001, 5, 10, time.Second, time.Second}, {1001001001, 5, 10, time.Second, time.Second},
{2002002002, 2, 10, time.Second, time.Second / 2}, {2002002002, 2, 10, time.Second, time.Second / 2},
@ -158,18 +157,18 @@ func TestUniformFlows(t *testing.T) {
clk, counter := clock.NewFakeEventClock(now, 0, nil) clk, counter := clock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter) qsf := NewQueueSetFactory(clk, counter)
config := fq.QueueSetConfig{ qCfg := fq.QueuingConfig{
Name: "TestUniformFlows", Name: "TestUniformFlows",
ConcurrencyLimit: 4,
DesiredNumQueues: 8, DesiredNumQueues: 8,
QueueLengthLimit: 6, QueueLengthLimit: 6,
HandSize: 3, HandSize: 3,
RequestWaitLimit: 10 * time.Minute, RequestWaitLimit: 10 * time.Minute,
} }
qs, err := qsf.NewQueueSet(config) qsc, err := qsf.QualifyQueuingConfig(qCfg)
if err != nil { if err != nil {
t.Fatalf("QueueSet creation failed with %v", err) t.Fatal(err)
} }
qs := qsc.GetQueueSet(fq.DispatchingConfig{ConcurrencyLimit: 4})
exerciseQueueSetUniformScenario(t, "UniformFlows", qs, []uniformClient{ exerciseQueueSetUniformScenario(t, "UniformFlows", qs, []uniformClient{
{1001001001, 5, 10, time.Second, time.Second}, {1001001001, 5, 10, time.Second, time.Second},
@ -182,18 +181,18 @@ func TestDifferentFlows(t *testing.T) {
clk, counter := clock.NewFakeEventClock(now, 0, nil) clk, counter := clock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter) qsf := NewQueueSetFactory(clk, counter)
config := fq.QueueSetConfig{ qCfg := fq.QueuingConfig{
Name: "TestDifferentFlows", Name: "TestDifferentFlows",
ConcurrencyLimit: 4,
DesiredNumQueues: 8, DesiredNumQueues: 8,
QueueLengthLimit: 6, QueueLengthLimit: 6,
HandSize: 3, HandSize: 3,
RequestWaitLimit: 10 * time.Minute, RequestWaitLimit: 10 * time.Minute,
} }
qs, err := qsf.NewQueueSet(config) qsc, err := qsf.QualifyQueuingConfig(qCfg)
if err != nil { if err != nil {
t.Fatalf("QueueSet creation failed with %v", err) t.Fatal(err)
} }
qs := qsc.GetQueueSet(fq.DispatchingConfig{ConcurrencyLimit: 4})
exerciseQueueSetUniformScenario(t, "DifferentFlows", qs, []uniformClient{ exerciseQueueSetUniformScenario(t, "DifferentFlows", qs, []uniformClient{
{1001001001, 6, 10, time.Second, time.Second}, {1001001001, 6, 10, time.Second, time.Second},
@ -206,18 +205,15 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) {
clk, counter := clock.NewFakeEventClock(now, 0, nil) clk, counter := clock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter) qsf := NewQueueSetFactory(clk, counter)
config := fq.QueueSetConfig{ qCfg := fq.QueuingConfig{
Name: "TestDifferentFlowsWithoutQueuing", Name: "TestDifferentFlowsWithoutQueuing",
ConcurrencyLimit: 4,
DesiredNumQueues: 0, DesiredNumQueues: 0,
QueueLengthLimit: 6,
HandSize: 3,
RequestWaitLimit: 10 * time.Minute,
} }
qs, err := qsf.NewQueueSet(config) qsc, err := qsf.QualifyQueuingConfig(qCfg)
if err != nil { if err != nil {
t.Fatalf("QueueSet creation failed with %v", err) t.Fatal(err)
} }
qs := qsc.GetQueueSet(fq.DispatchingConfig{ConcurrencyLimit: 4})
exerciseQueueSetUniformScenario(t, "DifferentFlowsWithoutQueuing", qs, []uniformClient{ exerciseQueueSetUniformScenario(t, "DifferentFlowsWithoutQueuing", qs, []uniformClient{
{1001001001, 6, 10, time.Second, 57 * time.Millisecond}, {1001001001, 6, 10, time.Second, 57 * time.Millisecond},
@ -230,18 +226,18 @@ func TestTimeout(t *testing.T) {
clk, counter := clock.NewFakeEventClock(now, 0, nil) clk, counter := clock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter) qsf := NewQueueSetFactory(clk, counter)
config := fq.QueueSetConfig{ qCfg := fq.QueuingConfig{
Name: "TestTimeout", Name: "TestTimeout",
ConcurrencyLimit: 1,
DesiredNumQueues: 128, DesiredNumQueues: 128,
QueueLengthLimit: 128, QueueLengthLimit: 128,
HandSize: 1, HandSize: 1,
RequestWaitLimit: 0, RequestWaitLimit: 0,
} }
qs, err := qsf.NewQueueSet(config) qsc, err := qsf.QualifyQueuingConfig(qCfg)
if err != nil { if err != nil {
t.Fatalf("QueueSet creation failed with %v", err) t.Fatal(err)
} }
qs := qsc.GetQueueSet(fq.DispatchingConfig{ConcurrencyLimit: 1})
exerciseQueueSetUniformScenario(t, "Timeout", qs, []uniformClient{ exerciseQueueSetUniformScenario(t, "Timeout", qs, []uniformClient{
{1001001001, 5, 100, time.Second, time.Second}, {1001001001, 5, 100, time.Second, time.Second},

View File

@ -31,14 +31,20 @@ func NewNoRestraintFactory() fq.QueueSetFactory {
type noRestraintFactory struct{} type noRestraintFactory struct{}
func (noRestraintFactory) NewQueueSet(config fq.QueueSetConfig) (fq.QueueSet, error) { type noRestraintCompeter struct{}
return noRestraint{}, nil
}
type noRestraint struct{} type noRestraint struct{}
func (noRestraint) SetConfiguration(config fq.QueueSetConfig) error { func (noRestraintFactory) QualifyQueuingConfig(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) {
return nil return noRestraintCompeter{}, nil
}
func (noRestraintCompeter) GetQueueSet(dCfg fq.DispatchingConfig) fq.QueueSet {
return noRestraint{}
}
func (noRestraint) QualifyQueuingConfig(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) {
return noRestraintCompeter{}, nil
} }
func (noRestraint) Quiesce(fq.EmptyHandler) { func (noRestraint) Quiesce(fq.EmptyHandler) {