diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go index 12378153562..9d6e3b64738 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go @@ -21,9 +21,22 @@ import ( "time" ) -// QueueSetFactory is used to create QueueSet objects. +// QueueSetFactory is used to create QueueSet objects. Creation, like +// config update, is done in two phases: the first phase consumes the +// QueuingConfig and the second consumes the DispatchingConfig. They +// are separated so that errors from the first phase can be found +// before committing to a concurrency allotment for the second. type QueueSetFactory interface { - NewQueueSet(config QueueSetConfig) (QueueSet, error) + // BeginConstruction does the first phase of creating a QueueSet + BeginConstruction(QueuingConfig) (QueueSetCompleter, error) +} + +// QueueSetCompleter finishes the two-step process of creating or +// reconfiguring a QueueSet +type QueueSetCompleter interface { + // Complete returns a QueueSet configured by the given + // dispatching configuration. + Complete(DispatchingConfig) QueueSet } // QueueSet is the abstraction for the queuing and dispatching @@ -34,19 +47,27 @@ type QueueSetFactory interface { // . Some day we may have connections between priority levels, but // today is not that day. type QueueSet interface { - // SetConfiguration updates the configuration - SetConfiguration(QueueSetConfig) error + // BeginConfigChange starts the two-step process of updating + // the configuration. No change is made until Complete is + // called. If `C := X.BeginConstruction(q)` then + // `C.Complete(d)` returns the same value `X`. If the + // QueuingConfig's DesiredNumQueues field is zero then the other + // queuing-specific config parameters are not changed, so that the + // queues continue draining as before. + BeginConfigChange(QueuingConfig) (QueueSetCompleter, error) - // Quiesce controls whether the QueueSet is operating normally or is quiescing. - // A quiescing QueueSet drains as normal but does not admit any - // new requests. Passing a non-nil handler means the system should - // be quiescing, a nil handler means the system should operate - // normally. A call to Wait while the system is quiescing - // will be rebuffed by returning tryAnother=true. If all the - // queues have no requests waiting nor executing while the system - // is quiescing then the handler will eventually be called with no - // locks held (even if the system becomes non-quiescing between the - // triggering state and the required call). + // Quiesce controls whether the QueueSet is operating normally or + // is quiescing. A quiescing QueueSet drains as normal but does + // not admit any new requests. Passing a non-nil handler means the + // system should be quiescing, a nil handler means the system + // should operate normally. A call to Wait while the system is + // quiescing will be rebuffed by returning tryAnother=true. If all + // the queues have no requests waiting nor executing while the + // system is quiescing then the handler will eventually be called + // with no locks held (even if the system becomes non-quiescing + // between the triggering state and the required call). In Go + // Memory Model terms, the triggering state happens before the + // call to the EmptyHandler. Quiesce(EmptyHandler) // Wait uses the given hashValue as the source of entropy as it @@ -56,34 +77,47 @@ type QueueSet interface { // tryAnother==true at return then the QueueSet has become // undesirable and the client should try to find a different // QueueSet to use; execute and afterExecution are irrelevant in - // this case. Otherwise, if execute then the client should start - // executing the request and, once the request finishes execution - // or is canceled, call afterExecution(). Otherwise the client - // should not execute the request and afterExecution is - // irrelevant. + // this case. In the terms of the Go Memory Model, there was a + // call to Quiesce with a non-nil handler that happened before + // this return from Wait. Otherwise, if execute then the client + // should start executing the request and, once the request + // finishes execution or is canceled, call afterExecution(). + // Otherwise the client should not execute the request and + // afterExecution is irrelevant. Canceling the context while the + // request is waiting in its queue will cut short that wait and + // cause a return with tryAnother and execute both false; later + // cancellations are the caller's problem. Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func()) } -// QueueSetConfig defines the configuration of a QueueSet. -type QueueSetConfig struct { +// QueuingConfig defines the configuration of the queuing aspect of a QueueSet. +type QueuingConfig struct { // Name is used to identify a queue set, allowing for descriptive information about its intended use Name string - // ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time - ConcurrencyLimit int + // DesiredNumQueues is the number of queues that the API says // should exist now. This may be zero, in which case // QueueLengthLimit, HandSize, and RequestWaitLimit are ignored. DesiredNumQueues int + // QueueLengthLimit is the maximum number of requests that may be waiting in a given queue at a time QueueLengthLimit int + // HandSize is a parameter of shuffle sharding. Upon arrival of a request, a queue is chosen by randomly // dealing a "hand" of this many queues and then picking one of minimum length. HandSize int + // RequestWaitLimit is the maximum amount of time that a request may wait in a queue. // If, by the end of that time, the request has not been dispatched then it is rejected. RequestWaitLimit time.Duration } +// DispatchingConfig defines the configuration of the dispatching aspect of a QueueSet. +type DispatchingConfig struct { + // ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time + ConcurrencyLimit int +} + // EmptyHandler is used to notify the callee when all the queues // of a QueueSet have been drained. type EmptyHandler interface { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go index 258469043b4..1977f7522ac 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go @@ -16,27 +16,114 @@ limitations under the License. package promise -// Mutable is a variable that is initially not set and can be set one -// or more times (unlike a traditional "promise", which can be written -// only once). -type Mutable interface { +// This file defines interfaces for promises and futures and related +// things. These are about coordination among multiple goroutines and +// so are safe for concurrent calls --- although moderated in some +// cases by a requirement that the caller hold a certain lock. +// Readable represents a variable that is initially not set and later +// becomes set. Some instances may be set to multiple values in +// series. A Readable for a variable that can only get one value is +// commonly known as a "future". +type Readable interface { + // Get reads the current value of this variable. If this variable + // is not set yet then this call blocks until this variable gets a + // value. + Get() interface{} + + // IsSet returns immediately with an indication of whether this + // variable has been set. + IsSet() bool +} + +// LockingReadable is a Readable whose implementation is protected by +// a lock +type LockingReadable interface { + Readable + + // GetLocked is like Get but the caller must already hold the + // lock. GetLocked may release, and later re-acquire, the lock + // any number of times. Get may acquire, and later release, the + // lock any number of times. + GetLocked() interface{} + + // IsSetLocked is like IsSet but the caller must already hold the + // lock. IsSetLocked may release, and later re-acquire, the lock + // any number of times. IsSet may acquire, and later release, the + // lock any number of times. + IsSetLocked() bool +} + +// WriteOnceOnly represents a variable that is initially not set and +// can be set once. +type WriteOnceOnly interface { + // Set normally writes a value into this variable, unblocks every + // goroutine waiting for this variable to have a value, and + // returns true. In the unhappy case that this variable is + // already set, this method returns false without modifying the + // variable's value. + Set(interface{}) bool +} + +// WriteOnce represents a variable that is initially not set and can +// be set once and is readable. This is the common meaning for +// "promise". +type WriteOnce interface { + Readable + WriteOnceOnly +} + +// LockingWriteOnceOnly is a WriteOnceOnly whose implementation is +// protected by a lock. +type LockingWriteOnceOnly interface { + WriteOnceOnly + + // SetLocked is like Set but the caller must already hold the + // lock. SetLocked may release, and later re-acquire, the lock + // any number of times. Set may acquire, and later release, the + // lock any number of times + SetLocked(interface{}) bool +} + +// LockingWriteOnce is a WriteOnce whose implementation is protected +// by a lock. +type LockingWriteOnce interface { + LockingReadable + LockingWriteOnceOnly +} + +// WriteMultipleOnly represents a variable that is initially not set +// and can be set one or more times (unlike a traditional "promise", +// which can be written only once). +type WriteMultipleOnly interface { // Set writes a value into this variable and unblocks every // goroutine waiting for this variable to have a value Set(interface{}) - - // Get reads the value of this variable. If this variable is - // not set yet then this call blocks until this variable gets a value. - Get() interface{} } -// LockingMutable is a Mutable whose implementation is protected by a lock -type LockingMutable interface { - Mutable +// WriteMultiple represents a variable that is initially not set and +// can be set one or more times (unlike a traditional "promise", which +// can be written only once) and is readable. +type WriteMultiple interface { + Readable + WriteMultipleOnly +} - // SetLocked is like Set but the caller must already hold the lock +// LockingWriteMultipleOnly is a WriteMultipleOnly whose +// implementation is protected by a lock. +type LockingWriteMultipleOnly interface { + WriteMultipleOnly + + // SetLocked is like Set but the caller must already hold the + // lock. SetLocked may release, and later re-acquire, the lock + // any number of times. Set may acquire, and later release, the + // lock any number of times SetLocked(interface{}) - - // GetLocked is like Get but the caller must already hold the lock - GetLocked() interface{} +} + +// LockingWriteMultiple is a WriteMultiple whose implementation is +// protected by a lock. +type LockingWriteMultiple interface { + LockingReadable + LockingWriteMultipleOnly } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise.go index 5013abf3fd4..db5598f8989 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise.go @@ -23,57 +23,102 @@ import ( "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise" ) -// lockingPromise implements LockingMutable based on a condition -// variable. This implementation tracks active goroutines: the given -// counter is decremented for a goroutine waiting for this varible to -// be set and incremented when such a goroutine is unblocked. -type lockingPromise struct { +// promisoid is the data and behavior common to all the promise-like +// abstractions implemented here. This implementation is based on a +// condition variable. This implementation tracks active goroutines: +// the given counter is decremented for a goroutine waiting for this +// varible to be set and incremented when such a goroutine is +// unblocked. +type promisoid struct { lock sync.Locker cond sync.Cond activeCounter counter.GoRoutineCounter // counter of active goroutines - waitingCount int // number of goroutines idle due to this mutable being unset + waitingCount int // number of goroutines idle due to this being unset isSet bool value interface{} } -var _ promise.LockingMutable = &lockingPromise{} +func (pr *promisoid) Get() interface{} { + pr.lock.Lock() + defer pr.lock.Unlock() + return pr.GetLocked() +} -// NewLockingPromise makes a new promise.LockingMutable -func NewLockingPromise(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingMutable { - return &lockingPromise{ +func (pr *promisoid) GetLocked() interface{} { + if !pr.isSet { + pr.waitingCount++ + pr.activeCounter.Add(-1) + pr.cond.Wait() + } + return pr.value +} + +func (pr *promisoid) IsSet() bool { + pr.lock.Lock() + defer pr.lock.Unlock() + return pr.IsSetLocked() +} + +func (pr *promisoid) IsSetLocked() bool { + return pr.isSet +} + +func (pr *promisoid) SetLocked(value interface{}) { + pr.isSet = true + pr.value = value + if pr.waitingCount > 0 { + pr.activeCounter.Add(pr.waitingCount) + pr.waitingCount = 0 + pr.cond.Broadcast() + } +} + +type writeOnce struct { + promisoid +} + +var _ promise.LockingWriteOnce = &writeOnce{} + +// NewWriteOnce makes a new promise.LockingWriteOnce +func NewWriteOnce(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingWriteOnce { + return &writeOnce{promisoid{ lock: lock, cond: *sync.NewCond(lock), activeCounter: activeCounter, + }} +} + +func (wr *writeOnce) Set(value interface{}) bool { + wr.lock.Lock() + defer wr.lock.Unlock() + return wr.SetLocked(value) +} + +func (wr *writeOnce) SetLocked(value interface{}) bool { + if wr.isSet { + return false } + wr.promisoid.SetLocked(value) + return true } -func (lp *lockingPromise) Set(value interface{}) { - lp.lock.Lock() - defer lp.lock.Unlock() - lp.SetLocked(value) +type writeMultiple struct { + promisoid } -func (lp *lockingPromise) Get() interface{} { - lp.lock.Lock() - defer lp.lock.Unlock() - return lp.GetLocked() +var _ promise.LockingWriteMultiple = &writeMultiple{} + +// NewWriteMultiple makes a new promise.LockingWriteMultiple +func NewWriteMultiple(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingWriteMultiple { + return &writeMultiple{promisoid{ + lock: lock, + cond: *sync.NewCond(lock), + activeCounter: activeCounter, + }} } -func (lp *lockingPromise) SetLocked(value interface{}) { - lp.isSet = true - lp.value = value - if lp.waitingCount > 0 { - lp.activeCounter.Add(lp.waitingCount) - lp.waitingCount = 0 - lp.cond.Broadcast() - } -} - -func (lp *lockingPromise) GetLocked() interface{} { - if !lp.isSet { - lp.waitingCount++ - lp.activeCounter.Add(-1) - lp.cond.Wait() - } - return lp.value +func (wr *writeMultiple) Set(value interface{}) { + wr.lock.Lock() + defer wr.lock.Unlock() + wr.SetLocked(value) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise_test.go index 766f8ffed9c..abaedda1230 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise_test.go @@ -25,16 +25,16 @@ import ( "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock" ) -func TestLockingPromise(t *testing.T) { +func TestLockingWriteMultiple(t *testing.T) { now := time.Now() clock, counter := clock.NewFakeEventClock(now, 0, nil) var lock sync.Mutex - lp := NewLockingPromise(&lock, counter) + wr := NewWriteMultiple(&lock, counter) var gots int32 var got atomic.Value counter.Add(1) go func() { - got.Store(lp.Get()) + got.Store(wr.Get()) atomic.AddInt32(&gots, 1) counter.Add(-1) }() @@ -43,8 +43,11 @@ func TestLockingPromise(t *testing.T) { if atomic.LoadInt32(&gots) != 0 { t.Error("Get returned before Set") } - var aval = &now - lp.Set(aval) + if wr.IsSet() { + t.Error("IsSet before Set") + } + aval := &now + wr.Set(aval) clock.Run(nil) time.Sleep(time.Second) if atomic.LoadInt32(&gots) != 1 { @@ -53,9 +56,12 @@ func TestLockingPromise(t *testing.T) { if got.Load() != aval { t.Error("Get did not return what was Set") } + if !wr.IsSet() { + t.Error("IsSet()==false after Set") + } counter.Add(1) go func() { - got.Store(lp.Get()) + got.Store(wr.Get()) atomic.AddInt32(&gots, 1) counter.Add(-1) }() @@ -67,4 +73,80 @@ func TestLockingPromise(t *testing.T) { if got.Load() != aval { t.Error("Second Get did not return what was Set") } + if !wr.IsSet() { + t.Error("IsSet()==false after second Set") + } + later := time.Now() + bval := &later + wr.Set(bval) + if !wr.IsSet() { + t.Error("IsSet() returned false after second set") + } else if wr.Get() != bval { + t.Error("Get() after second Set returned wrong value") + } +} + +func TestLockingWriteOnce(t *testing.T) { + now := time.Now() + clock, counter := clock.NewFakeEventClock(now, 0, nil) + var lock sync.Mutex + wr := NewWriteOnce(&lock, counter) + var gots int32 + var got atomic.Value + counter.Add(1) + go func() { + got.Store(wr.Get()) + atomic.AddInt32(&gots, 1) + counter.Add(-1) + }() + clock.Run(nil) + time.Sleep(time.Second) + if atomic.LoadInt32(&gots) != 0 { + t.Error("Get returned before Set") + } + if wr.IsSet() { + t.Error("IsSet before Set") + } + aval := &now + if !wr.Set(aval) { + t.Error("Set() returned false") + } + clock.Run(nil) + time.Sleep(time.Second) + if atomic.LoadInt32(&gots) != 1 { + t.Error("Get did not return after Set") + } + if got.Load() != aval { + t.Error("Get did not return what was Set") + } + if !wr.IsSet() { + t.Error("IsSet()==false after Set") + } + counter.Add(1) + go func() { + got.Store(wr.Get()) + atomic.AddInt32(&gots, 1) + counter.Add(-1) + }() + clock.Run(nil) + time.Sleep(time.Second) + if atomic.LoadInt32(&gots) != 2 { + t.Error("Second Get did not return immediately") + } + if got.Load() != aval { + t.Error("Second Get did not return what was Set") + } + if !wr.IsSet() { + t.Error("IsSet()==false after second Set") + } + later := time.Now() + bval := &later + if wr.Set(bval) { + t.Error("second Set() returned true") + } + if !wr.IsSet() { + t.Error("IsSet() returned false after second set") + } else if wr.Get() != aval { + t.Error("Get() after second Set returned wrong value") + } } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 7b789e7ca14..4a3945b6614 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -22,10 +22,10 @@ import ( "sync" "time" - "k8s.io/apimachinery/pkg/util/runtime" - "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apiserver/pkg/util/flowcontrol/counter" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise" @@ -43,12 +43,13 @@ type queueSetFactory struct { clock clock.PassiveClock } -// NewQueueSetFactory creates a new QueueSetFactory object -func NewQueueSetFactory(c clock.PassiveClock, counter counter.GoRoutineCounter) fq.QueueSetFactory { - return &queueSetFactory{ - counter: counter, - clock: c, - } +// `*queueSetCompleter` implements QueueSetCompleter. Exactly one of +// the fields `factory` and `theSet` is non-nil. +type queueSetCompleter struct { + factory *queueSetFactory + theSet *queueSet + qCfg fq.QueuingConfig + dealer *shufflesharding.Dealer } // queueSet implements the Fair Queuing for Server Requests technique @@ -65,12 +66,19 @@ type queueSet struct { lock sync.Mutex - // config holds the current configuration. Its DesiredNumQueues - // may be less than the current number of queues. If its - // DesiredNumQueues is zero then its other queuing parameters - // retain the settings they had when DesiredNumQueues was last - // non-zero (if ever). - config fq.QueueSetConfig + // qCfg holds the current queuing configuration. Its + // DesiredNumQueues may be less than the current number of queues. + // If its DesiredNumQueues is zero then its other queuing + // parameters retain the settings they had when DesiredNumQueues + // was last non-zero (if ever). + qCfg fq.QueuingConfig + + // the current dispatching configuration. + dCfg fq.DispatchingConfig + + // If `config.DesiredNumQueues` is non-zero then dealer is not nil + // and is good for `config`. + dealer *shufflesharding.Dealer // queues may be longer than the desired number, while the excess // queues are still draining. @@ -96,24 +104,55 @@ type queueSet struct { totRequestsExecuting int emptyHandler fq.EmptyHandler - dealer *shufflesharding.Dealer } -// NewQueueSet creates a new QueueSet object. -// There is a new QueueSet created for each priority level. -func (qsf queueSetFactory) NewQueueSet(config fq.QueueSetConfig) (fq.QueueSet, error) { - fq := &queueSet{ - clock: qsf.clock, - counter: qsf.counter, - estimatedServiceTime: 60, - config: config, - lastRealTime: qsf.clock.Now(), +// NewQueueSetFactory creates a new QueueSetFactory object +func NewQueueSetFactory(c clock.PassiveClock, counter counter.GoRoutineCounter) fq.QueueSetFactory { + return &queueSetFactory{ + counter: counter, + clock: c, } - err := fq.SetConfiguration(config) +} + +func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) { + dealer, err := checkConfig(qCfg) if err != nil { return nil, err } - return fq, nil + return &queueSetCompleter{ + factory: qsf, + qCfg: qCfg, + dealer: dealer}, nil +} + +// checkConfig returns a non-nil Dealer if the config is valid and +// calls for one, and returns a non-nil error if the given config is +// invalid. +func checkConfig(qCfg fq.QueuingConfig) (*shufflesharding.Dealer, error) { + if qCfg.DesiredNumQueues == 0 { + return nil, nil + } + dealer, err := shufflesharding.NewDealer(qCfg.DesiredNumQueues, qCfg.HandSize) + if err != nil { + err = errors.Wrap(err, "the QueueSetConfig implies an invalid shuffle sharding config (DesiredNumQueues is deckSize)") + } + return dealer, err +} + +func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet { + qs := qsc.theSet + if qs == nil { + qs = &queueSet{ + clock: qsc.factory.clock, + counter: qsc.factory.counter, + estimatedServiceTime: 60, + qCfg: qsc.qCfg, + virtualTime: 0, + lastRealTime: qsc.factory.clock.Now(), + } + } + qs.setConfiguration(qsc.qCfg, qsc.dealer, dCfg) + return qs } // createQueues is a helper method for initializing an array of n queues @@ -125,40 +164,45 @@ func createQueues(n, baseIndex int) []*queue { return fqqueues } -// SetConfiguration is used to set the configuration for a queueSet -// update handling for when fields are updated is handled here as well - +func (qs *queueSet) BeginConfigChange(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) { + dealer, err := checkConfig(qCfg) + if err != nil { + return nil, err + } + return &queueSetCompleter{ + theSet: qs, + qCfg: qCfg, + dealer: dealer}, nil +} + +// SetConfiguration is used to set the configuration for a queueSet. +// Update handling for when fields are updated is handled here as well - // eg: if DesiredNum is increased, SetConfiguration reconciles by // adding more queues. -func (qs *queueSet) SetConfiguration(config fq.QueueSetConfig) error { +func (qs *queueSet) setConfiguration(qCfg fq.QueuingConfig, dealer *shufflesharding.Dealer, dCfg fq.DispatchingConfig) { qs.lockAndSyncTime() defer qs.lock.Unlock() - var dealer *shufflesharding.Dealer - if config.DesiredNumQueues > 0 { - var err error - dealer, err = shufflesharding.NewDealer(config.DesiredNumQueues, config.HandSize) - if err != nil { - return errors.Wrap(err, "shuffle sharding dealer creation failed") - } + if qCfg.DesiredNumQueues > 0 { // Adding queues is the only thing that requires immediate action // Removing queues is handled by omitting indexes >DesiredNum from // chooseQueueIndexLocked numQueues := len(qs.queues) - if config.DesiredNumQueues > numQueues { + if qCfg.DesiredNumQueues > numQueues { qs.queues = append(qs.queues, - createQueues(config.DesiredNumQueues-numQueues, len(qs.queues))...) + createQueues(qCfg.DesiredNumQueues-numQueues, len(qs.queues))...) } } else { - config.QueueLengthLimit = qs.config.QueueLengthLimit - config.HandSize = qs.config.HandSize - config.RequestWaitLimit = qs.config.RequestWaitLimit + qCfg.QueueLengthLimit = qs.qCfg.QueueLengthLimit + qCfg.HandSize = qs.qCfg.HandSize + qCfg.RequestWaitLimit = qs.qCfg.RequestWaitLimit } - qs.config = config + qs.qCfg = qCfg + qs.dCfg = dCfg qs.dealer = dealer qs.dispatchAsMuchAsPossibleLocked() - return nil } // Quiesce controls whether the QueueSet is operating normally or is quiescing. @@ -211,24 +255,30 @@ const ( func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func()) { var req *request decision := func() requestDecision { + // Decide what to do and update metrics accordingly. Metrics + // about total requests queued and executing are updated on + // each way out of this function rather than in lower level + // functions because there can be multiple lower level changes + // in one invocation here. qs.lockAndSyncTime() defer qs.lock.Unlock() // A call to Wait while the system is quiescing will be rebuffed by // returning `tryAnother=true`. if qs.emptyHandler != nil { - klog.V(5).Infof("QS(%s): rebuffing request %#+v %#+v with TryAnother", qs.config.Name, descr1, descr2) + klog.V(5).Infof("QS(%s): rebuffing request %#+v %#+v with TryAnother", qs.qCfg.Name, descr1, descr2) return decisionTryAnother } // ======================================================================== // Step 0: // Apply only concurrency limit, if zero queues desired - if qs.config.DesiredNumQueues < 1 { - if qs.totRequestsExecuting >= qs.config.ConcurrencyLimit { - klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v because %d are executing and the limit is %d", qs.config.Name, descr1, descr2, qs.totRequestsExecuting, qs.config.ConcurrencyLimit) + if qs.qCfg.DesiredNumQueues < 1 { + if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit { + klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v because %d are executing and the limit is %d", qs.qCfg.Name, descr1, descr2, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit) return decisionReject } req = qs.dispatchSansQueue(descr1, descr2) + metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting) return decisionExecute } @@ -240,11 +290,12 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i // we are at max queue length // 4) If not rejected, create a request and enqueue req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue, descr1, descr2) + defer metrics.UpdateFlowControlRequestsInQueue(qs.qCfg.Name, qs.totRequestsWaiting) // req == nil means that the request was rejected - no remaining // concurrency shares and at max queue length already if req == nil { - klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v due to queue full", qs.config.Name, descr1, descr2) - metrics.AddReject(qs.config.Name, "queue-full") + klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v due to queue full", qs.qCfg.Name, descr1, descr2) + metrics.AddReject(qs.qCfg.Name, "queue-full") return decisionReject } @@ -258,6 +309,7 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i // fair queuing technique to pick a queue and dispatch a // request from that queue. qs.dispatchAsMuchAsPossibleLocked() + defer metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting) // ======================================================================== // Step 3: @@ -274,8 +326,8 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i qs.goroutineDoneOrBlocked() select { case <-doneCh: - klog.V(6).Infof("QS(%s): Context of request %#+v %#+v is Done", qs.config.Name, descr1, descr2) - req.decision.Set(decisionCancel) + klog.V(6).Infof("QS(%s): Context of request %#+v %#+v is Done", qs.qCfg.Name, descr1, descr2) + qs.cancelWait(req) } qs.goroutineDoneOrBlocked() }() @@ -286,40 +338,24 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i // The final step in Wait is to wait on a decision from // somewhere and then act on it. decisionAny := req.decision.GetLocked() - var decision requestDecision - switch dec := decisionAny.(type) { - case requestDecision: - decision = dec - default: - klog.Errorf("QS(%s): Impossible decision %#+v (of type %T) for request %#+v %#+v", qs.config.Name, decisionAny, decisionAny, descr1, descr2) - decision = decisionExecute + qs.syncTimeLocked() + decision, isDecision := decisionAny.(requestDecision) + if !isDecision { + klog.Errorf("QS(%s): Impossible decision %#+v (of type %T) for request %#+v %#+v", qs.qCfg.Name, decisionAny, decisionAny, descr1, descr2) + decision = decisionExecute // yeah, this is a no-op } switch decision { case decisionReject: - klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.config.Name, descr1, descr2) - metrics.AddReject(qs.config.Name, "time-out") + klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, descr1, descr2) + metrics.AddReject(qs.qCfg.Name, "time-out") case decisionCancel: - qs.syncTimeLocked() - // TODO(aaron-prindle) add metrics to these two cases - if req.isWaiting { - klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.config.Name, descr1, descr2) - // remove the request from the queue as it has timed out - for i := range req.queue.requests { - if req == req.queue.requests[i] { - // remove the request - req.queue.requests = append(req.queue.requests[:i], - req.queue.requests[i+1:]...) - break - } - } - // At this point, if the qs is quiescing, - // has zero requests executing, and has zero requests enqueued - // then a call to the EmptyHandler should be forked. - qs.maybeForkEmptyHandlerLocked() - } else { - klog.V(5).Infof("QS(%s): request %#+v %#+v canceled shortly after dispatch", qs.config.Name, descr1, descr2) - } + // TODO(aaron-prindle) add metrics for this case + klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, descr1, descr2) } + // At this point, if the qs is quiescing, + // has zero requests executing, and has zero requests enqueued + // then a call to the EmptyHandler should be forked. + qs.maybeForkEmptyHandlerLocked() return decision }() switch decision { @@ -370,7 +406,7 @@ func (qs *queueSet) getVirtualTimeRatio() float64 { if activeQueues == 0 { return 0 } - return math.Min(float64(reqs), float64(qs.config.ConcurrencyLimit)) / float64(activeQueues) + return math.Min(float64(reqs), float64(qs.dCfg.ConcurrencyLimit)) / float64(activeQueues) } // timeoutOldRequestsAndRejectOrEnqueueLocked encapsulates the logic required @@ -395,7 +431,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue uint64, // Create a request and enqueue req := &request{ - decision: lockingpromise.NewLockingPromise(&qs.lock, qs.counter), + decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter), arrivalTime: qs.clock.Now(), queue: queue, descr1: descr1, @@ -404,7 +440,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue uint64, if ok := qs.rejectOrEnqueueLocked(req); !ok { return nil } - metrics.ObserveQueueLength(qs.config.Name, len(queue.requests)) + metrics.ObserveQueueLength(qs.qCfg.Name, len(queue.requests)) return req } @@ -415,13 +451,16 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte bestQueueLen := int(math.MaxInt32) // the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`. qs.dealer.Deal(hashValue, func(queueIdx int) { + if queueIdx < 0 || queueIdx >= len(qs.queues) { + return + } thisLen := len(qs.queues[queueIdx].requests) - klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of length %d", qs.config.Name, descr1, descr2, queueIdx, thisLen) + klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of length %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisLen) if thisLen < bestQueueLen { bestQueueIdx, bestQueueLen = queueIdx, thisLen } }) - klog.V(6).Infof("QS(%s): For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.config.Name, descr1, descr2, bestQueueIdx, bestQueueLen, qs.queues[bestQueueIdx].requestsExecuting) + klog.V(6).Infof("QS(%s): For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, descr1, descr2, bestQueueIdx, bestQueueLen, qs.queues[bestQueueIdx].requestsExecuting) return bestQueueIdx } @@ -436,7 +475,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue) { // as newer requests also will not have timed out // now - requestWaitLimit = waitLimit - waitLimit := now.Add(-qs.config.RequestWaitLimit) + waitLimit := now.Add(-qs.qCfg.RequestWaitLimit) for i, req := range reqs { if waitLimit.After(req.arrivalTime) { req.decision.SetLocked(decisionReject) @@ -463,8 +502,8 @@ func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool { queue := request.queue curQueueLength := len(queue.requests) // rejects the newly arrived request if resource criteria not met - if qs.totRequestsExecuting >= qs.config.ConcurrencyLimit && - curQueueLength >= qs.config.QueueLengthLimit { + if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit && + curQueueLength >= qs.qCfg.QueueLengthLimit { return false } @@ -479,12 +518,11 @@ func (qs *queueSet) enqueueLocked(request *request) { // the queue’s virtual start time is set to the virtual time. queue.virtualStart = qs.virtualTime if klog.V(6) { - klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2) + klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2) } } queue.Enqueue(request) qs.totRequestsWaiting++ - metrics.UpdateFlowControlRequestsInQueue(qs.config.Name, qs.totRequestsWaiting) } // dispatchAsMuchAsPossibleLocked runs a loop, as long as there @@ -494,7 +532,7 @@ func (qs *queueSet) enqueueLocked(request *request) { // queue, increment the count of the number executing, and send true // to the request's channel. func (qs *queueSet) dispatchAsMuchAsPossibleLocked() { - for qs.totRequestsWaiting != 0 && qs.totRequestsExecuting < qs.config.ConcurrencyLimit { + for qs.totRequestsWaiting != 0 && qs.totRequestsExecuting < qs.dCfg.ConcurrencyLimit { ok := qs.dispatchLocked() if !ok { break @@ -512,9 +550,8 @@ func (qs *queueSet) dispatchSansQueue(descr1, descr2 interface{}) *request { } qs.totRequestsExecuting++ if klog.V(5) { - klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %#+v %#+v, qs will have %d executing", qs.config.Name, now.Format(nsTimeFmt), qs.virtualTime, descr1, descr2, qs.totRequestsExecuting) + klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, descr1, descr2, qs.totRequestsExecuting) } - metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, qs.totRequestsExecuting) return req } @@ -532,20 +569,46 @@ func (qs *queueSet) dispatchLocked() bool { return false } request.startTime = qs.clock.Now() - // request dequeued, service has started + // At this moment the request leaves its queue and starts + // executing. We do not recognize any interim state between + // "queued" and "executing". While that means "executing" + // includes a little overhead from this package, this is not a + // problem because other overhead is also included. qs.totRequestsWaiting-- qs.totRequestsExecuting++ queue.requestsExecuting++ if klog.V(6) { - klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.index, queue.virtualStart, len(queue.requests), queue.requestsExecuting) + klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.index, queue.virtualStart, len(queue.requests), queue.requestsExecuting) } // When a request is dequeued for service -> qs.virtualStart += G queue.virtualStart += qs.estimatedServiceTime - metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, qs.totRequestsExecuting) request.decision.SetLocked(decisionExecute) return ok } +// cancelWait ensures the request is not waiting +func (qs *queueSet) cancelWait(req *request) { + qs.lock.Lock() + defer qs.lock.Unlock() + if req.decision.IsSetLocked() { + // The request has already been removed from the queue + // and so we consider its wait to be over. + return + } + req.decision.SetLocked(decisionCancel) + queue := req.queue + // remove the request from the queue as it has timed out + for i := range queue.requests { + if req == queue.requests[i] { + // remove the request + queue.requests = append(queue.requests[:i], queue.requests[i+1:]...) + qs.totRequestsWaiting-- + break + } + } + return +} + // selectQueueLocked examines the queues in round robin order and // returns the first one of those for which the virtual finish time of // the oldest waiting request is minimal. @@ -583,6 +646,8 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) { qs.finishRequestLocked(req) qs.dispatchAsMuchAsPossibleLocked() + metrics.UpdateFlowControlRequestsInQueue(qs.qCfg.Name, qs.totRequestsWaiting) + metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting) } // finishRequestLocked is a callback that should be used when a @@ -590,11 +655,10 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) { // callback updates important state in the queueSet func (qs *queueSet) finishRequestLocked(r *request) { qs.totRequestsExecuting-- - metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, qs.totRequestsExecuting) if r.queue == nil { if klog.V(6) { - klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting) + klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting) } return } @@ -609,12 +673,12 @@ func (qs *queueSet) finishRequestLocked(r *request) { r.queue.requestsExecuting-- if klog.V(6) { - klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index, r.queue.virtualStart, S, len(r.queue.requests), r.queue.requestsExecuting) + klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index, r.queue.virtualStart, S, len(r.queue.requests), r.queue.requestsExecuting) } // If there are more queues than desired and this one has no // requests then remove it - if len(qs.queues) > qs.config.DesiredNumQueues && + if len(qs.queues) > qs.qCfg.DesiredNumQueues && len(r.queue.requests) == 0 && r.queue.requestsExecuting == 0 { qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.queue.index) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 024f7805a55..d6a98c457fa 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -141,12 +141,11 @@ func init() { func TestNoRestraint(t *testing.T) { now := time.Now() clk, counter := clock.NewFakeEventClock(now, 0, nil) - nrf := test.NewNoRestraintFactory() - config := fq.QueueSetConfig{} - nr, err := nrf.NewQueueSet(config) + nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}) if err != nil { - t.Fatalf("QueueSet creation failed with %v", err) + t.Fatal(err) } + nr := nrc.Complete(fq.DispatchingConfig{}) exerciseQueueSetUniformScenario(t, "NoRestraint", nr, []uniformClient{ {1001001001, 5, 10, time.Second, time.Second}, {2002002002, 2, 10, time.Second, time.Second / 2}, @@ -158,18 +157,18 @@ func TestUniformFlows(t *testing.T) { clk, counter := clock.NewFakeEventClock(now, 0, nil) qsf := NewQueueSetFactory(clk, counter) - config := fq.QueueSetConfig{ + qCfg := fq.QueuingConfig{ Name: "TestUniformFlows", - ConcurrencyLimit: 4, DesiredNumQueues: 8, QueueLengthLimit: 6, HandSize: 3, RequestWaitLimit: 10 * time.Minute, } - qs, err := qsf.NewQueueSet(config) + qsc, err := qsf.BeginConstruction(qCfg) if err != nil { - t.Fatalf("QueueSet creation failed with %v", err) + t.Fatal(err) } + qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4}) exerciseQueueSetUniformScenario(t, "UniformFlows", qs, []uniformClient{ {1001001001, 5, 10, time.Second, time.Second}, @@ -182,18 +181,18 @@ func TestDifferentFlows(t *testing.T) { clk, counter := clock.NewFakeEventClock(now, 0, nil) qsf := NewQueueSetFactory(clk, counter) - config := fq.QueueSetConfig{ + qCfg := fq.QueuingConfig{ Name: "TestDifferentFlows", - ConcurrencyLimit: 4, DesiredNumQueues: 8, QueueLengthLimit: 6, HandSize: 3, RequestWaitLimit: 10 * time.Minute, } - qs, err := qsf.NewQueueSet(config) + qsc, err := qsf.BeginConstruction(qCfg) if err != nil { - t.Fatalf("QueueSet creation failed with %v", err) + t.Fatal(err) } + qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4}) exerciseQueueSetUniformScenario(t, "DifferentFlows", qs, []uniformClient{ {1001001001, 6, 10, time.Second, time.Second}, @@ -206,18 +205,15 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) { clk, counter := clock.NewFakeEventClock(now, 0, nil) qsf := NewQueueSetFactory(clk, counter) - config := fq.QueueSetConfig{ + qCfg := fq.QueuingConfig{ Name: "TestDifferentFlowsWithoutQueuing", - ConcurrencyLimit: 4, DesiredNumQueues: 0, - QueueLengthLimit: 6, - HandSize: 3, - RequestWaitLimit: 10 * time.Minute, } - qs, err := qsf.NewQueueSet(config) + qsc, err := qsf.BeginConstruction(qCfg) if err != nil { - t.Fatalf("QueueSet creation failed with %v", err) + t.Fatal(err) } + qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4}) exerciseQueueSetUniformScenario(t, "DifferentFlowsWithoutQueuing", qs, []uniformClient{ {1001001001, 6, 10, time.Second, 57 * time.Millisecond}, @@ -230,20 +226,65 @@ func TestTimeout(t *testing.T) { clk, counter := clock.NewFakeEventClock(now, 0, nil) qsf := NewQueueSetFactory(clk, counter) - config := fq.QueueSetConfig{ + qCfg := fq.QueuingConfig{ Name: "TestTimeout", - ConcurrencyLimit: 1, DesiredNumQueues: 128, QueueLengthLimit: 128, HandSize: 1, RequestWaitLimit: 0, } - qs, err := qsf.NewQueueSet(config) + qsc, err := qsf.BeginConstruction(qCfg) if err != nil { - t.Fatalf("QueueSet creation failed with %v", err) + t.Fatal(err) } + qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1}) exerciseQueueSetUniformScenario(t, "Timeout", qs, []uniformClient{ {1001001001, 5, 100, time.Second, time.Second}, }, time.Second*10, true, false, clk, counter) } + +func TestContextCancel(t *testing.T) { + now := time.Now() + clk, counter := clock.NewFakeEventClock(now, 0, nil) + qsf := NewQueueSetFactory(clk, counter) + qCfg := fq.QueuingConfig{ + Name: "TestTimeout", + DesiredNumQueues: 11, + QueueLengthLimit: 11, + HandSize: 1, + RequestWaitLimit: 15 * time.Second, + } + qsc, err := qsf.BeginConstruction(qCfg) + if err != nil { + t.Fatal(err) + } + qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1}) + counter.Add(1) // account for the goroutine running this test + ctx1 := context.Background() + another1, exec1, cleanup1 := qs.Wait(ctx1, 1, "test", "one") + if another1 || !exec1 { + t.Errorf("Unexpected: another1=%v, exec1=%v", another1, exec1) + return + } + defer cleanup1() + ctx2, cancel2 := context.WithCancel(context.Background()) + tBefore := time.Now() + go func() { + time.Sleep(time.Second) + cancel2() + }() + another2, exec2, cleanup2 := qs.Wait(ctx2, 2, "test", "two") + tAfter := time.Now() + if another2 || exec2 { + t.Errorf("Unexpected: another2=%v, exec2=%v", another2, exec2) + if exec2 { + defer cleanup2() + } + } else { + dt := tAfter.Sub(tBefore) + if dt < time.Second || dt > 2*time.Second { + t.Errorf("Unexpected: dt=%d", dt) + } + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index 5d0ce2b39e3..d45f36b9b7c 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -30,15 +30,16 @@ type request struct { // startTime is the real time when the request began executing startTime time.Time - // decision gets set to the decision about what to do with this request - decision promise.LockingMutable + // decision gets set to a `requestDecision` indicating what to do + // with this request. It gets set exactly once, when the request + // is removed from its queue. The value will be decisionReject, + // decisionCancel, or decisionExecute; decisionTryAnother never + // appears here. + decision promise.LockingWriteOnce // arrivalTime is the real time when the request entered this system arrivalTime time.Time - // isWaiting indicates whether the request is presently waiting in a queue - isWaiting bool - // descr1 and descr2 are not used in any logic but they appear in // log messages descr1, descr2 interface{} @@ -60,7 +61,6 @@ type queue struct { // Enqueue enqueues a request into the queue func (q *queue) Enqueue(request *request) { - request.isWaiting = true q.requests = append(q.requests, request) } @@ -71,8 +71,6 @@ func (q *queue) Dequeue() (*request, bool) { } request := q.requests[0] q.requests = q.requests[1:] - - request.isWaiting = false return request, true } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go index 5ac48be94d8..ce0462ae77f 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go @@ -31,14 +31,20 @@ func NewNoRestraintFactory() fq.QueueSetFactory { type noRestraintFactory struct{} -func (noRestraintFactory) NewQueueSet(config fq.QueueSetConfig) (fq.QueueSet, error) { - return noRestraint{}, nil -} +type noRestraintCompeter struct{} type noRestraint struct{} -func (noRestraint) SetConfiguration(config fq.QueueSetConfig) error { - return nil +func (noRestraintFactory) BeginConstruction(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) { + return noRestraintCompeter{}, nil +} + +func (noRestraintCompeter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet { + return noRestraint{} +} + +func (noRestraint) BeginConfigChange(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) { + return noRestraintCompeter{}, nil } func (noRestraint) Quiesce(fq.EmptyHandler) {