mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 12:43:23 +00:00
Merge pull request #87362 from MikeSpreitzer/limited-cancel2
Simplified and corrected logic around context cancelation in refactored QueueSet
This commit is contained in:
commit
25b4c170f8
@ -21,9 +21,22 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// QueueSetFactory is used to create QueueSet objects.
|
// QueueSetFactory is used to create QueueSet objects. Creation, like
|
||||||
|
// config update, is done in two phases: the first phase consumes the
|
||||||
|
// QueuingConfig and the second consumes the DispatchingConfig. They
|
||||||
|
// are separated so that errors from the first phase can be found
|
||||||
|
// before committing to a concurrency allotment for the second.
|
||||||
type QueueSetFactory interface {
|
type QueueSetFactory interface {
|
||||||
NewQueueSet(config QueueSetConfig) (QueueSet, error)
|
// BeginConstruction does the first phase of creating a QueueSet
|
||||||
|
BeginConstruction(QueuingConfig) (QueueSetCompleter, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueueSetCompleter finishes the two-step process of creating or
|
||||||
|
// reconfiguring a QueueSet
|
||||||
|
type QueueSetCompleter interface {
|
||||||
|
// Complete returns a QueueSet configured by the given
|
||||||
|
// dispatching configuration.
|
||||||
|
Complete(DispatchingConfig) QueueSet
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueueSet is the abstraction for the queuing and dispatching
|
// QueueSet is the abstraction for the queuing and dispatching
|
||||||
@ -34,19 +47,27 @@ type QueueSetFactory interface {
|
|||||||
// . Some day we may have connections between priority levels, but
|
// . Some day we may have connections between priority levels, but
|
||||||
// today is not that day.
|
// today is not that day.
|
||||||
type QueueSet interface {
|
type QueueSet interface {
|
||||||
// SetConfiguration updates the configuration
|
// BeginConfigChange starts the two-step process of updating
|
||||||
SetConfiguration(QueueSetConfig) error
|
// the configuration. No change is made until Complete is
|
||||||
|
// called. If `C := X.BeginConstruction(q)` then
|
||||||
|
// `C.Complete(d)` returns the same value `X`. If the
|
||||||
|
// QueuingConfig's DesiredNumQueues field is zero then the other
|
||||||
|
// queuing-specific config parameters are not changed, so that the
|
||||||
|
// queues continue draining as before.
|
||||||
|
BeginConfigChange(QueuingConfig) (QueueSetCompleter, error)
|
||||||
|
|
||||||
// Quiesce controls whether the QueueSet is operating normally or is quiescing.
|
// Quiesce controls whether the QueueSet is operating normally or
|
||||||
// A quiescing QueueSet drains as normal but does not admit any
|
// is quiescing. A quiescing QueueSet drains as normal but does
|
||||||
// new requests. Passing a non-nil handler means the system should
|
// not admit any new requests. Passing a non-nil handler means the
|
||||||
// be quiescing, a nil handler means the system should operate
|
// system should be quiescing, a nil handler means the system
|
||||||
// normally. A call to Wait while the system is quiescing
|
// should operate normally. A call to Wait while the system is
|
||||||
// will be rebuffed by returning tryAnother=true. If all the
|
// quiescing will be rebuffed by returning tryAnother=true. If all
|
||||||
// queues have no requests waiting nor executing while the system
|
// the queues have no requests waiting nor executing while the
|
||||||
// is quiescing then the handler will eventually be called with no
|
// system is quiescing then the handler will eventually be called
|
||||||
// locks held (even if the system becomes non-quiescing between the
|
// with no locks held (even if the system becomes non-quiescing
|
||||||
// triggering state and the required call).
|
// between the triggering state and the required call). In Go
|
||||||
|
// Memory Model terms, the triggering state happens before the
|
||||||
|
// call to the EmptyHandler.
|
||||||
Quiesce(EmptyHandler)
|
Quiesce(EmptyHandler)
|
||||||
|
|
||||||
// Wait uses the given hashValue as the source of entropy as it
|
// Wait uses the given hashValue as the source of entropy as it
|
||||||
@ -56,34 +77,47 @@ type QueueSet interface {
|
|||||||
// tryAnother==true at return then the QueueSet has become
|
// tryAnother==true at return then the QueueSet has become
|
||||||
// undesirable and the client should try to find a different
|
// undesirable and the client should try to find a different
|
||||||
// QueueSet to use; execute and afterExecution are irrelevant in
|
// QueueSet to use; execute and afterExecution are irrelevant in
|
||||||
// this case. Otherwise, if execute then the client should start
|
// this case. In the terms of the Go Memory Model, there was a
|
||||||
// executing the request and, once the request finishes execution
|
// call to Quiesce with a non-nil handler that happened before
|
||||||
// or is canceled, call afterExecution(). Otherwise the client
|
// this return from Wait. Otherwise, if execute then the client
|
||||||
// should not execute the request and afterExecution is
|
// should start executing the request and, once the request
|
||||||
// irrelevant.
|
// finishes execution or is canceled, call afterExecution().
|
||||||
|
// Otherwise the client should not execute the request and
|
||||||
|
// afterExecution is irrelevant. Canceling the context while the
|
||||||
|
// request is waiting in its queue will cut short that wait and
|
||||||
|
// cause a return with tryAnother and execute both false; later
|
||||||
|
// cancellations are the caller's problem.
|
||||||
Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func())
|
Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func())
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueueSetConfig defines the configuration of a QueueSet.
|
// QueuingConfig defines the configuration of the queuing aspect of a QueueSet.
|
||||||
type QueueSetConfig struct {
|
type QueuingConfig struct {
|
||||||
// Name is used to identify a queue set, allowing for descriptive information about its intended use
|
// Name is used to identify a queue set, allowing for descriptive information about its intended use
|
||||||
Name string
|
Name string
|
||||||
// ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time
|
|
||||||
ConcurrencyLimit int
|
|
||||||
// DesiredNumQueues is the number of queues that the API says
|
// DesiredNumQueues is the number of queues that the API says
|
||||||
// should exist now. This may be zero, in which case
|
// should exist now. This may be zero, in which case
|
||||||
// QueueLengthLimit, HandSize, and RequestWaitLimit are ignored.
|
// QueueLengthLimit, HandSize, and RequestWaitLimit are ignored.
|
||||||
DesiredNumQueues int
|
DesiredNumQueues int
|
||||||
|
|
||||||
// QueueLengthLimit is the maximum number of requests that may be waiting in a given queue at a time
|
// QueueLengthLimit is the maximum number of requests that may be waiting in a given queue at a time
|
||||||
QueueLengthLimit int
|
QueueLengthLimit int
|
||||||
|
|
||||||
// HandSize is a parameter of shuffle sharding. Upon arrival of a request, a queue is chosen by randomly
|
// HandSize is a parameter of shuffle sharding. Upon arrival of a request, a queue is chosen by randomly
|
||||||
// dealing a "hand" of this many queues and then picking one of minimum length.
|
// dealing a "hand" of this many queues and then picking one of minimum length.
|
||||||
HandSize int
|
HandSize int
|
||||||
|
|
||||||
// RequestWaitLimit is the maximum amount of time that a request may wait in a queue.
|
// RequestWaitLimit is the maximum amount of time that a request may wait in a queue.
|
||||||
// If, by the end of that time, the request has not been dispatched then it is rejected.
|
// If, by the end of that time, the request has not been dispatched then it is rejected.
|
||||||
RequestWaitLimit time.Duration
|
RequestWaitLimit time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DispatchingConfig defines the configuration of the dispatching aspect of a QueueSet.
|
||||||
|
type DispatchingConfig struct {
|
||||||
|
// ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time
|
||||||
|
ConcurrencyLimit int
|
||||||
|
}
|
||||||
|
|
||||||
// EmptyHandler is used to notify the callee when all the queues
|
// EmptyHandler is used to notify the callee when all the queues
|
||||||
// of a QueueSet have been drained.
|
// of a QueueSet have been drained.
|
||||||
type EmptyHandler interface {
|
type EmptyHandler interface {
|
||||||
|
@ -16,27 +16,114 @@ limitations under the License.
|
|||||||
|
|
||||||
package promise
|
package promise
|
||||||
|
|
||||||
// Mutable is a variable that is initially not set and can be set one
|
// This file defines interfaces for promises and futures and related
|
||||||
// or more times (unlike a traditional "promise", which can be written
|
// things. These are about coordination among multiple goroutines and
|
||||||
// only once).
|
// so are safe for concurrent calls --- although moderated in some
|
||||||
type Mutable interface {
|
// cases by a requirement that the caller hold a certain lock.
|
||||||
|
|
||||||
|
// Readable represents a variable that is initially not set and later
|
||||||
|
// becomes set. Some instances may be set to multiple values in
|
||||||
|
// series. A Readable for a variable that can only get one value is
|
||||||
|
// commonly known as a "future".
|
||||||
|
type Readable interface {
|
||||||
|
// Get reads the current value of this variable. If this variable
|
||||||
|
// is not set yet then this call blocks until this variable gets a
|
||||||
|
// value.
|
||||||
|
Get() interface{}
|
||||||
|
|
||||||
|
// IsSet returns immediately with an indication of whether this
|
||||||
|
// variable has been set.
|
||||||
|
IsSet() bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// LockingReadable is a Readable whose implementation is protected by
|
||||||
|
// a lock
|
||||||
|
type LockingReadable interface {
|
||||||
|
Readable
|
||||||
|
|
||||||
|
// GetLocked is like Get but the caller must already hold the
|
||||||
|
// lock. GetLocked may release, and later re-acquire, the lock
|
||||||
|
// any number of times. Get may acquire, and later release, the
|
||||||
|
// lock any number of times.
|
||||||
|
GetLocked() interface{}
|
||||||
|
|
||||||
|
// IsSetLocked is like IsSet but the caller must already hold the
|
||||||
|
// lock. IsSetLocked may release, and later re-acquire, the lock
|
||||||
|
// any number of times. IsSet may acquire, and later release, the
|
||||||
|
// lock any number of times.
|
||||||
|
IsSetLocked() bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// WriteOnceOnly represents a variable that is initially not set and
|
||||||
|
// can be set once.
|
||||||
|
type WriteOnceOnly interface {
|
||||||
|
// Set normally writes a value into this variable, unblocks every
|
||||||
|
// goroutine waiting for this variable to have a value, and
|
||||||
|
// returns true. In the unhappy case that this variable is
|
||||||
|
// already set, this method returns false without modifying the
|
||||||
|
// variable's value.
|
||||||
|
Set(interface{}) bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// WriteOnce represents a variable that is initially not set and can
|
||||||
|
// be set once and is readable. This is the common meaning for
|
||||||
|
// "promise".
|
||||||
|
type WriteOnce interface {
|
||||||
|
Readable
|
||||||
|
WriteOnceOnly
|
||||||
|
}
|
||||||
|
|
||||||
|
// LockingWriteOnceOnly is a WriteOnceOnly whose implementation is
|
||||||
|
// protected by a lock.
|
||||||
|
type LockingWriteOnceOnly interface {
|
||||||
|
WriteOnceOnly
|
||||||
|
|
||||||
|
// SetLocked is like Set but the caller must already hold the
|
||||||
|
// lock. SetLocked may release, and later re-acquire, the lock
|
||||||
|
// any number of times. Set may acquire, and later release, the
|
||||||
|
// lock any number of times
|
||||||
|
SetLocked(interface{}) bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// LockingWriteOnce is a WriteOnce whose implementation is protected
|
||||||
|
// by a lock.
|
||||||
|
type LockingWriteOnce interface {
|
||||||
|
LockingReadable
|
||||||
|
LockingWriteOnceOnly
|
||||||
|
}
|
||||||
|
|
||||||
|
// WriteMultipleOnly represents a variable that is initially not set
|
||||||
|
// and can be set one or more times (unlike a traditional "promise",
|
||||||
|
// which can be written only once).
|
||||||
|
type WriteMultipleOnly interface {
|
||||||
// Set writes a value into this variable and unblocks every
|
// Set writes a value into this variable and unblocks every
|
||||||
// goroutine waiting for this variable to have a value
|
// goroutine waiting for this variable to have a value
|
||||||
Set(interface{})
|
Set(interface{})
|
||||||
|
|
||||||
// Get reads the value of this variable. If this variable is
|
|
||||||
// not set yet then this call blocks until this variable gets a value.
|
|
||||||
Get() interface{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// LockingMutable is a Mutable whose implementation is protected by a lock
|
// WriteMultiple represents a variable that is initially not set and
|
||||||
type LockingMutable interface {
|
// can be set one or more times (unlike a traditional "promise", which
|
||||||
Mutable
|
// can be written only once) and is readable.
|
||||||
|
type WriteMultiple interface {
|
||||||
|
Readable
|
||||||
|
WriteMultipleOnly
|
||||||
|
}
|
||||||
|
|
||||||
// SetLocked is like Set but the caller must already hold the lock
|
// LockingWriteMultipleOnly is a WriteMultipleOnly whose
|
||||||
|
// implementation is protected by a lock.
|
||||||
|
type LockingWriteMultipleOnly interface {
|
||||||
|
WriteMultipleOnly
|
||||||
|
|
||||||
|
// SetLocked is like Set but the caller must already hold the
|
||||||
|
// lock. SetLocked may release, and later re-acquire, the lock
|
||||||
|
// any number of times. Set may acquire, and later release, the
|
||||||
|
// lock any number of times
|
||||||
SetLocked(interface{})
|
SetLocked(interface{})
|
||||||
|
}
|
||||||
// GetLocked is like Get but the caller must already hold the lock
|
|
||||||
GetLocked() interface{}
|
// LockingWriteMultiple is a WriteMultiple whose implementation is
|
||||||
|
// protected by a lock.
|
||||||
|
type LockingWriteMultiple interface {
|
||||||
|
LockingReadable
|
||||||
|
LockingWriteMultipleOnly
|
||||||
}
|
}
|
||||||
|
@ -23,57 +23,102 @@ import (
|
|||||||
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise"
|
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise"
|
||||||
)
|
)
|
||||||
|
|
||||||
// lockingPromise implements LockingMutable based on a condition
|
// promisoid is the data and behavior common to all the promise-like
|
||||||
// variable. This implementation tracks active goroutines: the given
|
// abstractions implemented here. This implementation is based on a
|
||||||
// counter is decremented for a goroutine waiting for this varible to
|
// condition variable. This implementation tracks active goroutines:
|
||||||
// be set and incremented when such a goroutine is unblocked.
|
// the given counter is decremented for a goroutine waiting for this
|
||||||
type lockingPromise struct {
|
// varible to be set and incremented when such a goroutine is
|
||||||
|
// unblocked.
|
||||||
|
type promisoid struct {
|
||||||
lock sync.Locker
|
lock sync.Locker
|
||||||
cond sync.Cond
|
cond sync.Cond
|
||||||
activeCounter counter.GoRoutineCounter // counter of active goroutines
|
activeCounter counter.GoRoutineCounter // counter of active goroutines
|
||||||
waitingCount int // number of goroutines idle due to this mutable being unset
|
waitingCount int // number of goroutines idle due to this being unset
|
||||||
isSet bool
|
isSet bool
|
||||||
value interface{}
|
value interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ promise.LockingMutable = &lockingPromise{}
|
func (pr *promisoid) Get() interface{} {
|
||||||
|
pr.lock.Lock()
|
||||||
|
defer pr.lock.Unlock()
|
||||||
|
return pr.GetLocked()
|
||||||
|
}
|
||||||
|
|
||||||
// NewLockingPromise makes a new promise.LockingMutable
|
func (pr *promisoid) GetLocked() interface{} {
|
||||||
func NewLockingPromise(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingMutable {
|
if !pr.isSet {
|
||||||
return &lockingPromise{
|
pr.waitingCount++
|
||||||
|
pr.activeCounter.Add(-1)
|
||||||
|
pr.cond.Wait()
|
||||||
|
}
|
||||||
|
return pr.value
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pr *promisoid) IsSet() bool {
|
||||||
|
pr.lock.Lock()
|
||||||
|
defer pr.lock.Unlock()
|
||||||
|
return pr.IsSetLocked()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pr *promisoid) IsSetLocked() bool {
|
||||||
|
return pr.isSet
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pr *promisoid) SetLocked(value interface{}) {
|
||||||
|
pr.isSet = true
|
||||||
|
pr.value = value
|
||||||
|
if pr.waitingCount > 0 {
|
||||||
|
pr.activeCounter.Add(pr.waitingCount)
|
||||||
|
pr.waitingCount = 0
|
||||||
|
pr.cond.Broadcast()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type writeOnce struct {
|
||||||
|
promisoid
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ promise.LockingWriteOnce = &writeOnce{}
|
||||||
|
|
||||||
|
// NewWriteOnce makes a new promise.LockingWriteOnce
|
||||||
|
func NewWriteOnce(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingWriteOnce {
|
||||||
|
return &writeOnce{promisoid{
|
||||||
lock: lock,
|
lock: lock,
|
||||||
cond: *sync.NewCond(lock),
|
cond: *sync.NewCond(lock),
|
||||||
activeCounter: activeCounter,
|
activeCounter: activeCounter,
|
||||||
|
}}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wr *writeOnce) Set(value interface{}) bool {
|
||||||
|
wr.lock.Lock()
|
||||||
|
defer wr.lock.Unlock()
|
||||||
|
return wr.SetLocked(value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wr *writeOnce) SetLocked(value interface{}) bool {
|
||||||
|
if wr.isSet {
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
wr.promisoid.SetLocked(value)
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (lp *lockingPromise) Set(value interface{}) {
|
type writeMultiple struct {
|
||||||
lp.lock.Lock()
|
promisoid
|
||||||
defer lp.lock.Unlock()
|
|
||||||
lp.SetLocked(value)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (lp *lockingPromise) Get() interface{} {
|
var _ promise.LockingWriteMultiple = &writeMultiple{}
|
||||||
lp.lock.Lock()
|
|
||||||
defer lp.lock.Unlock()
|
// NewWriteMultiple makes a new promise.LockingWriteMultiple
|
||||||
return lp.GetLocked()
|
func NewWriteMultiple(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingWriteMultiple {
|
||||||
|
return &writeMultiple{promisoid{
|
||||||
|
lock: lock,
|
||||||
|
cond: *sync.NewCond(lock),
|
||||||
|
activeCounter: activeCounter,
|
||||||
|
}}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (lp *lockingPromise) SetLocked(value interface{}) {
|
func (wr *writeMultiple) Set(value interface{}) {
|
||||||
lp.isSet = true
|
wr.lock.Lock()
|
||||||
lp.value = value
|
defer wr.lock.Unlock()
|
||||||
if lp.waitingCount > 0 {
|
wr.SetLocked(value)
|
||||||
lp.activeCounter.Add(lp.waitingCount)
|
|
||||||
lp.waitingCount = 0
|
|
||||||
lp.cond.Broadcast()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (lp *lockingPromise) GetLocked() interface{} {
|
|
||||||
if !lp.isSet {
|
|
||||||
lp.waitingCount++
|
|
||||||
lp.activeCounter.Add(-1)
|
|
||||||
lp.cond.Wait()
|
|
||||||
}
|
|
||||||
return lp.value
|
|
||||||
}
|
}
|
||||||
|
@ -25,16 +25,16 @@ import (
|
|||||||
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock"
|
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestLockingPromise(t *testing.T) {
|
func TestLockingWriteMultiple(t *testing.T) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
clock, counter := clock.NewFakeEventClock(now, 0, nil)
|
clock, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||||
var lock sync.Mutex
|
var lock sync.Mutex
|
||||||
lp := NewLockingPromise(&lock, counter)
|
wr := NewWriteMultiple(&lock, counter)
|
||||||
var gots int32
|
var gots int32
|
||||||
var got atomic.Value
|
var got atomic.Value
|
||||||
counter.Add(1)
|
counter.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
got.Store(lp.Get())
|
got.Store(wr.Get())
|
||||||
atomic.AddInt32(&gots, 1)
|
atomic.AddInt32(&gots, 1)
|
||||||
counter.Add(-1)
|
counter.Add(-1)
|
||||||
}()
|
}()
|
||||||
@ -43,8 +43,11 @@ func TestLockingPromise(t *testing.T) {
|
|||||||
if atomic.LoadInt32(&gots) != 0 {
|
if atomic.LoadInt32(&gots) != 0 {
|
||||||
t.Error("Get returned before Set")
|
t.Error("Get returned before Set")
|
||||||
}
|
}
|
||||||
var aval = &now
|
if wr.IsSet() {
|
||||||
lp.Set(aval)
|
t.Error("IsSet before Set")
|
||||||
|
}
|
||||||
|
aval := &now
|
||||||
|
wr.Set(aval)
|
||||||
clock.Run(nil)
|
clock.Run(nil)
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
if atomic.LoadInt32(&gots) != 1 {
|
if atomic.LoadInt32(&gots) != 1 {
|
||||||
@ -53,9 +56,12 @@ func TestLockingPromise(t *testing.T) {
|
|||||||
if got.Load() != aval {
|
if got.Load() != aval {
|
||||||
t.Error("Get did not return what was Set")
|
t.Error("Get did not return what was Set")
|
||||||
}
|
}
|
||||||
|
if !wr.IsSet() {
|
||||||
|
t.Error("IsSet()==false after Set")
|
||||||
|
}
|
||||||
counter.Add(1)
|
counter.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
got.Store(lp.Get())
|
got.Store(wr.Get())
|
||||||
atomic.AddInt32(&gots, 1)
|
atomic.AddInt32(&gots, 1)
|
||||||
counter.Add(-1)
|
counter.Add(-1)
|
||||||
}()
|
}()
|
||||||
@ -67,4 +73,80 @@ func TestLockingPromise(t *testing.T) {
|
|||||||
if got.Load() != aval {
|
if got.Load() != aval {
|
||||||
t.Error("Second Get did not return what was Set")
|
t.Error("Second Get did not return what was Set")
|
||||||
}
|
}
|
||||||
|
if !wr.IsSet() {
|
||||||
|
t.Error("IsSet()==false after second Set")
|
||||||
|
}
|
||||||
|
later := time.Now()
|
||||||
|
bval := &later
|
||||||
|
wr.Set(bval)
|
||||||
|
if !wr.IsSet() {
|
||||||
|
t.Error("IsSet() returned false after second set")
|
||||||
|
} else if wr.Get() != bval {
|
||||||
|
t.Error("Get() after second Set returned wrong value")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLockingWriteOnce(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
clock, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||||
|
var lock sync.Mutex
|
||||||
|
wr := NewWriteOnce(&lock, counter)
|
||||||
|
var gots int32
|
||||||
|
var got atomic.Value
|
||||||
|
counter.Add(1)
|
||||||
|
go func() {
|
||||||
|
got.Store(wr.Get())
|
||||||
|
atomic.AddInt32(&gots, 1)
|
||||||
|
counter.Add(-1)
|
||||||
|
}()
|
||||||
|
clock.Run(nil)
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
if atomic.LoadInt32(&gots) != 0 {
|
||||||
|
t.Error("Get returned before Set")
|
||||||
|
}
|
||||||
|
if wr.IsSet() {
|
||||||
|
t.Error("IsSet before Set")
|
||||||
|
}
|
||||||
|
aval := &now
|
||||||
|
if !wr.Set(aval) {
|
||||||
|
t.Error("Set() returned false")
|
||||||
|
}
|
||||||
|
clock.Run(nil)
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
if atomic.LoadInt32(&gots) != 1 {
|
||||||
|
t.Error("Get did not return after Set")
|
||||||
|
}
|
||||||
|
if got.Load() != aval {
|
||||||
|
t.Error("Get did not return what was Set")
|
||||||
|
}
|
||||||
|
if !wr.IsSet() {
|
||||||
|
t.Error("IsSet()==false after Set")
|
||||||
|
}
|
||||||
|
counter.Add(1)
|
||||||
|
go func() {
|
||||||
|
got.Store(wr.Get())
|
||||||
|
atomic.AddInt32(&gots, 1)
|
||||||
|
counter.Add(-1)
|
||||||
|
}()
|
||||||
|
clock.Run(nil)
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
if atomic.LoadInt32(&gots) != 2 {
|
||||||
|
t.Error("Second Get did not return immediately")
|
||||||
|
}
|
||||||
|
if got.Load() != aval {
|
||||||
|
t.Error("Second Get did not return what was Set")
|
||||||
|
}
|
||||||
|
if !wr.IsSet() {
|
||||||
|
t.Error("IsSet()==false after second Set")
|
||||||
|
}
|
||||||
|
later := time.Now()
|
||||||
|
bval := &later
|
||||||
|
if wr.Set(bval) {
|
||||||
|
t.Error("second Set() returned true")
|
||||||
|
}
|
||||||
|
if !wr.IsSet() {
|
||||||
|
t.Error("IsSet() returned false after second set")
|
||||||
|
} else if wr.Get() != aval {
|
||||||
|
t.Error("Get() after second Set returned wrong value")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -22,10 +22,10 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/util/runtime"
|
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/util/clock"
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
|
"k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
|
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
|
||||||
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
||||||
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise"
|
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise"
|
||||||
@ -43,12 +43,13 @@ type queueSetFactory struct {
|
|||||||
clock clock.PassiveClock
|
clock clock.PassiveClock
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewQueueSetFactory creates a new QueueSetFactory object
|
// `*queueSetCompleter` implements QueueSetCompleter. Exactly one of
|
||||||
func NewQueueSetFactory(c clock.PassiveClock, counter counter.GoRoutineCounter) fq.QueueSetFactory {
|
// the fields `factory` and `theSet` is non-nil.
|
||||||
return &queueSetFactory{
|
type queueSetCompleter struct {
|
||||||
counter: counter,
|
factory *queueSetFactory
|
||||||
clock: c,
|
theSet *queueSet
|
||||||
}
|
qCfg fq.QueuingConfig
|
||||||
|
dealer *shufflesharding.Dealer
|
||||||
}
|
}
|
||||||
|
|
||||||
// queueSet implements the Fair Queuing for Server Requests technique
|
// queueSet implements the Fair Queuing for Server Requests technique
|
||||||
@ -65,12 +66,19 @@ type queueSet struct {
|
|||||||
|
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
|
|
||||||
// config holds the current configuration. Its DesiredNumQueues
|
// qCfg holds the current queuing configuration. Its
|
||||||
// may be less than the current number of queues. If its
|
// DesiredNumQueues may be less than the current number of queues.
|
||||||
// DesiredNumQueues is zero then its other queuing parameters
|
// If its DesiredNumQueues is zero then its other queuing
|
||||||
// retain the settings they had when DesiredNumQueues was last
|
// parameters retain the settings they had when DesiredNumQueues
|
||||||
// non-zero (if ever).
|
// was last non-zero (if ever).
|
||||||
config fq.QueueSetConfig
|
qCfg fq.QueuingConfig
|
||||||
|
|
||||||
|
// the current dispatching configuration.
|
||||||
|
dCfg fq.DispatchingConfig
|
||||||
|
|
||||||
|
// If `config.DesiredNumQueues` is non-zero then dealer is not nil
|
||||||
|
// and is good for `config`.
|
||||||
|
dealer *shufflesharding.Dealer
|
||||||
|
|
||||||
// queues may be longer than the desired number, while the excess
|
// queues may be longer than the desired number, while the excess
|
||||||
// queues are still draining.
|
// queues are still draining.
|
||||||
@ -96,24 +104,55 @@ type queueSet struct {
|
|||||||
totRequestsExecuting int
|
totRequestsExecuting int
|
||||||
|
|
||||||
emptyHandler fq.EmptyHandler
|
emptyHandler fq.EmptyHandler
|
||||||
dealer *shufflesharding.Dealer
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewQueueSet creates a new QueueSet object.
|
// NewQueueSetFactory creates a new QueueSetFactory object
|
||||||
// There is a new QueueSet created for each priority level.
|
func NewQueueSetFactory(c clock.PassiveClock, counter counter.GoRoutineCounter) fq.QueueSetFactory {
|
||||||
func (qsf queueSetFactory) NewQueueSet(config fq.QueueSetConfig) (fq.QueueSet, error) {
|
return &queueSetFactory{
|
||||||
fq := &queueSet{
|
counter: counter,
|
||||||
clock: qsf.clock,
|
clock: c,
|
||||||
counter: qsf.counter,
|
|
||||||
estimatedServiceTime: 60,
|
|
||||||
config: config,
|
|
||||||
lastRealTime: qsf.clock.Now(),
|
|
||||||
}
|
}
|
||||||
err := fq.SetConfiguration(config)
|
}
|
||||||
|
|
||||||
|
func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) {
|
||||||
|
dealer, err := checkConfig(qCfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return fq, nil
|
return &queueSetCompleter{
|
||||||
|
factory: qsf,
|
||||||
|
qCfg: qCfg,
|
||||||
|
dealer: dealer}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkConfig returns a non-nil Dealer if the config is valid and
|
||||||
|
// calls for one, and returns a non-nil error if the given config is
|
||||||
|
// invalid.
|
||||||
|
func checkConfig(qCfg fq.QueuingConfig) (*shufflesharding.Dealer, error) {
|
||||||
|
if qCfg.DesiredNumQueues == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
dealer, err := shufflesharding.NewDealer(qCfg.DesiredNumQueues, qCfg.HandSize)
|
||||||
|
if err != nil {
|
||||||
|
err = errors.Wrap(err, "the QueueSetConfig implies an invalid shuffle sharding config (DesiredNumQueues is deckSize)")
|
||||||
|
}
|
||||||
|
return dealer, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet {
|
||||||
|
qs := qsc.theSet
|
||||||
|
if qs == nil {
|
||||||
|
qs = &queueSet{
|
||||||
|
clock: qsc.factory.clock,
|
||||||
|
counter: qsc.factory.counter,
|
||||||
|
estimatedServiceTime: 60,
|
||||||
|
qCfg: qsc.qCfg,
|
||||||
|
virtualTime: 0,
|
||||||
|
lastRealTime: qsc.factory.clock.Now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
qs.setConfiguration(qsc.qCfg, qsc.dealer, dCfg)
|
||||||
|
return qs
|
||||||
}
|
}
|
||||||
|
|
||||||
// createQueues is a helper method for initializing an array of n queues
|
// createQueues is a helper method for initializing an array of n queues
|
||||||
@ -125,40 +164,45 @@ func createQueues(n, baseIndex int) []*queue {
|
|||||||
return fqqueues
|
return fqqueues
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetConfiguration is used to set the configuration for a queueSet
|
func (qs *queueSet) BeginConfigChange(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) {
|
||||||
// update handling for when fields are updated is handled here as well -
|
dealer, err := checkConfig(qCfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &queueSetCompleter{
|
||||||
|
theSet: qs,
|
||||||
|
qCfg: qCfg,
|
||||||
|
dealer: dealer}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetConfiguration is used to set the configuration for a queueSet.
|
||||||
|
// Update handling for when fields are updated is handled here as well -
|
||||||
// eg: if DesiredNum is increased, SetConfiguration reconciles by
|
// eg: if DesiredNum is increased, SetConfiguration reconciles by
|
||||||
// adding more queues.
|
// adding more queues.
|
||||||
func (qs *queueSet) SetConfiguration(config fq.QueueSetConfig) error {
|
func (qs *queueSet) setConfiguration(qCfg fq.QueuingConfig, dealer *shufflesharding.Dealer, dCfg fq.DispatchingConfig) {
|
||||||
qs.lockAndSyncTime()
|
qs.lockAndSyncTime()
|
||||||
defer qs.lock.Unlock()
|
defer qs.lock.Unlock()
|
||||||
var dealer *shufflesharding.Dealer
|
|
||||||
|
|
||||||
if config.DesiredNumQueues > 0 {
|
if qCfg.DesiredNumQueues > 0 {
|
||||||
var err error
|
|
||||||
dealer, err = shufflesharding.NewDealer(config.DesiredNumQueues, config.HandSize)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrap(err, "shuffle sharding dealer creation failed")
|
|
||||||
}
|
|
||||||
// Adding queues is the only thing that requires immediate action
|
// Adding queues is the only thing that requires immediate action
|
||||||
// Removing queues is handled by omitting indexes >DesiredNum from
|
// Removing queues is handled by omitting indexes >DesiredNum from
|
||||||
// chooseQueueIndexLocked
|
// chooseQueueIndexLocked
|
||||||
numQueues := len(qs.queues)
|
numQueues := len(qs.queues)
|
||||||
if config.DesiredNumQueues > numQueues {
|
if qCfg.DesiredNumQueues > numQueues {
|
||||||
qs.queues = append(qs.queues,
|
qs.queues = append(qs.queues,
|
||||||
createQueues(config.DesiredNumQueues-numQueues, len(qs.queues))...)
|
createQueues(qCfg.DesiredNumQueues-numQueues, len(qs.queues))...)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
config.QueueLengthLimit = qs.config.QueueLengthLimit
|
qCfg.QueueLengthLimit = qs.qCfg.QueueLengthLimit
|
||||||
config.HandSize = qs.config.HandSize
|
qCfg.HandSize = qs.qCfg.HandSize
|
||||||
config.RequestWaitLimit = qs.config.RequestWaitLimit
|
qCfg.RequestWaitLimit = qs.qCfg.RequestWaitLimit
|
||||||
}
|
}
|
||||||
|
|
||||||
qs.config = config
|
qs.qCfg = qCfg
|
||||||
|
qs.dCfg = dCfg
|
||||||
qs.dealer = dealer
|
qs.dealer = dealer
|
||||||
|
|
||||||
qs.dispatchAsMuchAsPossibleLocked()
|
qs.dispatchAsMuchAsPossibleLocked()
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Quiesce controls whether the QueueSet is operating normally or is quiescing.
|
// Quiesce controls whether the QueueSet is operating normally or is quiescing.
|
||||||
@ -211,24 +255,30 @@ const (
|
|||||||
func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func()) {
|
func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func()) {
|
||||||
var req *request
|
var req *request
|
||||||
decision := func() requestDecision {
|
decision := func() requestDecision {
|
||||||
|
// Decide what to do and update metrics accordingly. Metrics
|
||||||
|
// about total requests queued and executing are updated on
|
||||||
|
// each way out of this function rather than in lower level
|
||||||
|
// functions because there can be multiple lower level changes
|
||||||
|
// in one invocation here.
|
||||||
qs.lockAndSyncTime()
|
qs.lockAndSyncTime()
|
||||||
defer qs.lock.Unlock()
|
defer qs.lock.Unlock()
|
||||||
// A call to Wait while the system is quiescing will be rebuffed by
|
// A call to Wait while the system is quiescing will be rebuffed by
|
||||||
// returning `tryAnother=true`.
|
// returning `tryAnother=true`.
|
||||||
if qs.emptyHandler != nil {
|
if qs.emptyHandler != nil {
|
||||||
klog.V(5).Infof("QS(%s): rebuffing request %#+v %#+v with TryAnother", qs.config.Name, descr1, descr2)
|
klog.V(5).Infof("QS(%s): rebuffing request %#+v %#+v with TryAnother", qs.qCfg.Name, descr1, descr2)
|
||||||
return decisionTryAnother
|
return decisionTryAnother
|
||||||
}
|
}
|
||||||
|
|
||||||
// ========================================================================
|
// ========================================================================
|
||||||
// Step 0:
|
// Step 0:
|
||||||
// Apply only concurrency limit, if zero queues desired
|
// Apply only concurrency limit, if zero queues desired
|
||||||
if qs.config.DesiredNumQueues < 1 {
|
if qs.qCfg.DesiredNumQueues < 1 {
|
||||||
if qs.totRequestsExecuting >= qs.config.ConcurrencyLimit {
|
if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit {
|
||||||
klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v because %d are executing and the limit is %d", qs.config.Name, descr1, descr2, qs.totRequestsExecuting, qs.config.ConcurrencyLimit)
|
klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v because %d are executing and the limit is %d", qs.qCfg.Name, descr1, descr2, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit)
|
||||||
return decisionReject
|
return decisionReject
|
||||||
}
|
}
|
||||||
req = qs.dispatchSansQueue(descr1, descr2)
|
req = qs.dispatchSansQueue(descr1, descr2)
|
||||||
|
metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting)
|
||||||
return decisionExecute
|
return decisionExecute
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -240,11 +290,12 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i
|
|||||||
// we are at max queue length
|
// we are at max queue length
|
||||||
// 4) If not rejected, create a request and enqueue
|
// 4) If not rejected, create a request and enqueue
|
||||||
req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue, descr1, descr2)
|
req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue, descr1, descr2)
|
||||||
|
defer metrics.UpdateFlowControlRequestsInQueue(qs.qCfg.Name, qs.totRequestsWaiting)
|
||||||
// req == nil means that the request was rejected - no remaining
|
// req == nil means that the request was rejected - no remaining
|
||||||
// concurrency shares and at max queue length already
|
// concurrency shares and at max queue length already
|
||||||
if req == nil {
|
if req == nil {
|
||||||
klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v due to queue full", qs.config.Name, descr1, descr2)
|
klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v due to queue full", qs.qCfg.Name, descr1, descr2)
|
||||||
metrics.AddReject(qs.config.Name, "queue-full")
|
metrics.AddReject(qs.qCfg.Name, "queue-full")
|
||||||
return decisionReject
|
return decisionReject
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -258,6 +309,7 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i
|
|||||||
// fair queuing technique to pick a queue and dispatch a
|
// fair queuing technique to pick a queue and dispatch a
|
||||||
// request from that queue.
|
// request from that queue.
|
||||||
qs.dispatchAsMuchAsPossibleLocked()
|
qs.dispatchAsMuchAsPossibleLocked()
|
||||||
|
defer metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting)
|
||||||
|
|
||||||
// ========================================================================
|
// ========================================================================
|
||||||
// Step 3:
|
// Step 3:
|
||||||
@ -274,8 +326,8 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i
|
|||||||
qs.goroutineDoneOrBlocked()
|
qs.goroutineDoneOrBlocked()
|
||||||
select {
|
select {
|
||||||
case <-doneCh:
|
case <-doneCh:
|
||||||
klog.V(6).Infof("QS(%s): Context of request %#+v %#+v is Done", qs.config.Name, descr1, descr2)
|
klog.V(6).Infof("QS(%s): Context of request %#+v %#+v is Done", qs.qCfg.Name, descr1, descr2)
|
||||||
req.decision.Set(decisionCancel)
|
qs.cancelWait(req)
|
||||||
}
|
}
|
||||||
qs.goroutineDoneOrBlocked()
|
qs.goroutineDoneOrBlocked()
|
||||||
}()
|
}()
|
||||||
@ -286,40 +338,24 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i
|
|||||||
// The final step in Wait is to wait on a decision from
|
// The final step in Wait is to wait on a decision from
|
||||||
// somewhere and then act on it.
|
// somewhere and then act on it.
|
||||||
decisionAny := req.decision.GetLocked()
|
decisionAny := req.decision.GetLocked()
|
||||||
var decision requestDecision
|
qs.syncTimeLocked()
|
||||||
switch dec := decisionAny.(type) {
|
decision, isDecision := decisionAny.(requestDecision)
|
||||||
case requestDecision:
|
if !isDecision {
|
||||||
decision = dec
|
klog.Errorf("QS(%s): Impossible decision %#+v (of type %T) for request %#+v %#+v", qs.qCfg.Name, decisionAny, decisionAny, descr1, descr2)
|
||||||
default:
|
decision = decisionExecute // yeah, this is a no-op
|
||||||
klog.Errorf("QS(%s): Impossible decision %#+v (of type %T) for request %#+v %#+v", qs.config.Name, decisionAny, decisionAny, descr1, descr2)
|
|
||||||
decision = decisionExecute
|
|
||||||
}
|
}
|
||||||
switch decision {
|
switch decision {
|
||||||
case decisionReject:
|
case decisionReject:
|
||||||
klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.config.Name, descr1, descr2)
|
klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, descr1, descr2)
|
||||||
metrics.AddReject(qs.config.Name, "time-out")
|
metrics.AddReject(qs.qCfg.Name, "time-out")
|
||||||
case decisionCancel:
|
case decisionCancel:
|
||||||
qs.syncTimeLocked()
|
// TODO(aaron-prindle) add metrics for this case
|
||||||
// TODO(aaron-prindle) add metrics to these two cases
|
klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, descr1, descr2)
|
||||||
if req.isWaiting {
|
|
||||||
klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.config.Name, descr1, descr2)
|
|
||||||
// remove the request from the queue as it has timed out
|
|
||||||
for i := range req.queue.requests {
|
|
||||||
if req == req.queue.requests[i] {
|
|
||||||
// remove the request
|
|
||||||
req.queue.requests = append(req.queue.requests[:i],
|
|
||||||
req.queue.requests[i+1:]...)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// At this point, if the qs is quiescing,
|
|
||||||
// has zero requests executing, and has zero requests enqueued
|
|
||||||
// then a call to the EmptyHandler should be forked.
|
|
||||||
qs.maybeForkEmptyHandlerLocked()
|
|
||||||
} else {
|
|
||||||
klog.V(5).Infof("QS(%s): request %#+v %#+v canceled shortly after dispatch", qs.config.Name, descr1, descr2)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
// At this point, if the qs is quiescing,
|
||||||
|
// has zero requests executing, and has zero requests enqueued
|
||||||
|
// then a call to the EmptyHandler should be forked.
|
||||||
|
qs.maybeForkEmptyHandlerLocked()
|
||||||
return decision
|
return decision
|
||||||
}()
|
}()
|
||||||
switch decision {
|
switch decision {
|
||||||
@ -370,7 +406,7 @@ func (qs *queueSet) getVirtualTimeRatio() float64 {
|
|||||||
if activeQueues == 0 {
|
if activeQueues == 0 {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
return math.Min(float64(reqs), float64(qs.config.ConcurrencyLimit)) / float64(activeQueues)
|
return math.Min(float64(reqs), float64(qs.dCfg.ConcurrencyLimit)) / float64(activeQueues)
|
||||||
}
|
}
|
||||||
|
|
||||||
// timeoutOldRequestsAndRejectOrEnqueueLocked encapsulates the logic required
|
// timeoutOldRequestsAndRejectOrEnqueueLocked encapsulates the logic required
|
||||||
@ -395,7 +431,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue uint64,
|
|||||||
|
|
||||||
// Create a request and enqueue
|
// Create a request and enqueue
|
||||||
req := &request{
|
req := &request{
|
||||||
decision: lockingpromise.NewLockingPromise(&qs.lock, qs.counter),
|
decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter),
|
||||||
arrivalTime: qs.clock.Now(),
|
arrivalTime: qs.clock.Now(),
|
||||||
queue: queue,
|
queue: queue,
|
||||||
descr1: descr1,
|
descr1: descr1,
|
||||||
@ -404,7 +440,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue uint64,
|
|||||||
if ok := qs.rejectOrEnqueueLocked(req); !ok {
|
if ok := qs.rejectOrEnqueueLocked(req); !ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
metrics.ObserveQueueLength(qs.config.Name, len(queue.requests))
|
metrics.ObserveQueueLength(qs.qCfg.Name, len(queue.requests))
|
||||||
return req
|
return req
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -415,13 +451,16 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte
|
|||||||
bestQueueLen := int(math.MaxInt32)
|
bestQueueLen := int(math.MaxInt32)
|
||||||
// the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`.
|
// the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`.
|
||||||
qs.dealer.Deal(hashValue, func(queueIdx int) {
|
qs.dealer.Deal(hashValue, func(queueIdx int) {
|
||||||
|
if queueIdx < 0 || queueIdx >= len(qs.queues) {
|
||||||
|
return
|
||||||
|
}
|
||||||
thisLen := len(qs.queues[queueIdx].requests)
|
thisLen := len(qs.queues[queueIdx].requests)
|
||||||
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of length %d", qs.config.Name, descr1, descr2, queueIdx, thisLen)
|
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of length %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisLen)
|
||||||
if thisLen < bestQueueLen {
|
if thisLen < bestQueueLen {
|
||||||
bestQueueIdx, bestQueueLen = queueIdx, thisLen
|
bestQueueIdx, bestQueueLen = queueIdx, thisLen
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
klog.V(6).Infof("QS(%s): For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.config.Name, descr1, descr2, bestQueueIdx, bestQueueLen, qs.queues[bestQueueIdx].requestsExecuting)
|
klog.V(6).Infof("QS(%s): For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, descr1, descr2, bestQueueIdx, bestQueueLen, qs.queues[bestQueueIdx].requestsExecuting)
|
||||||
return bestQueueIdx
|
return bestQueueIdx
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -436,7 +475,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue) {
|
|||||||
// as newer requests also will not have timed out
|
// as newer requests also will not have timed out
|
||||||
|
|
||||||
// now - requestWaitLimit = waitLimit
|
// now - requestWaitLimit = waitLimit
|
||||||
waitLimit := now.Add(-qs.config.RequestWaitLimit)
|
waitLimit := now.Add(-qs.qCfg.RequestWaitLimit)
|
||||||
for i, req := range reqs {
|
for i, req := range reqs {
|
||||||
if waitLimit.After(req.arrivalTime) {
|
if waitLimit.After(req.arrivalTime) {
|
||||||
req.decision.SetLocked(decisionReject)
|
req.decision.SetLocked(decisionReject)
|
||||||
@ -463,8 +502,8 @@ func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
|
|||||||
queue := request.queue
|
queue := request.queue
|
||||||
curQueueLength := len(queue.requests)
|
curQueueLength := len(queue.requests)
|
||||||
// rejects the newly arrived request if resource criteria not met
|
// rejects the newly arrived request if resource criteria not met
|
||||||
if qs.totRequestsExecuting >= qs.config.ConcurrencyLimit &&
|
if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit &&
|
||||||
curQueueLength >= qs.config.QueueLengthLimit {
|
curQueueLength >= qs.qCfg.QueueLengthLimit {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -479,12 +518,11 @@ func (qs *queueSet) enqueueLocked(request *request) {
|
|||||||
// the queue’s virtual start time is set to the virtual time.
|
// the queue’s virtual start time is set to the virtual time.
|
||||||
queue.virtualStart = qs.virtualTime
|
queue.virtualStart = qs.virtualTime
|
||||||
if klog.V(6) {
|
if klog.V(6) {
|
||||||
klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2)
|
klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
queue.Enqueue(request)
|
queue.Enqueue(request)
|
||||||
qs.totRequestsWaiting++
|
qs.totRequestsWaiting++
|
||||||
metrics.UpdateFlowControlRequestsInQueue(qs.config.Name, qs.totRequestsWaiting)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// dispatchAsMuchAsPossibleLocked runs a loop, as long as there
|
// dispatchAsMuchAsPossibleLocked runs a loop, as long as there
|
||||||
@ -494,7 +532,7 @@ func (qs *queueSet) enqueueLocked(request *request) {
|
|||||||
// queue, increment the count of the number executing, and send true
|
// queue, increment the count of the number executing, and send true
|
||||||
// to the request's channel.
|
// to the request's channel.
|
||||||
func (qs *queueSet) dispatchAsMuchAsPossibleLocked() {
|
func (qs *queueSet) dispatchAsMuchAsPossibleLocked() {
|
||||||
for qs.totRequestsWaiting != 0 && qs.totRequestsExecuting < qs.config.ConcurrencyLimit {
|
for qs.totRequestsWaiting != 0 && qs.totRequestsExecuting < qs.dCfg.ConcurrencyLimit {
|
||||||
ok := qs.dispatchLocked()
|
ok := qs.dispatchLocked()
|
||||||
if !ok {
|
if !ok {
|
||||||
break
|
break
|
||||||
@ -512,9 +550,8 @@ func (qs *queueSet) dispatchSansQueue(descr1, descr2 interface{}) *request {
|
|||||||
}
|
}
|
||||||
qs.totRequestsExecuting++
|
qs.totRequestsExecuting++
|
||||||
if klog.V(5) {
|
if klog.V(5) {
|
||||||
klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %#+v %#+v, qs will have %d executing", qs.config.Name, now.Format(nsTimeFmt), qs.virtualTime, descr1, descr2, qs.totRequestsExecuting)
|
klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, descr1, descr2, qs.totRequestsExecuting)
|
||||||
}
|
}
|
||||||
metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, qs.totRequestsExecuting)
|
|
||||||
return req
|
return req
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -532,20 +569,46 @@ func (qs *queueSet) dispatchLocked() bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
request.startTime = qs.clock.Now()
|
request.startTime = qs.clock.Now()
|
||||||
// request dequeued, service has started
|
// At this moment the request leaves its queue and starts
|
||||||
|
// executing. We do not recognize any interim state between
|
||||||
|
// "queued" and "executing". While that means "executing"
|
||||||
|
// includes a little overhead from this package, this is not a
|
||||||
|
// problem because other overhead is also included.
|
||||||
qs.totRequestsWaiting--
|
qs.totRequestsWaiting--
|
||||||
qs.totRequestsExecuting++
|
qs.totRequestsExecuting++
|
||||||
queue.requestsExecuting++
|
queue.requestsExecuting++
|
||||||
if klog.V(6) {
|
if klog.V(6) {
|
||||||
klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.index, queue.virtualStart, len(queue.requests), queue.requestsExecuting)
|
klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.index, queue.virtualStart, len(queue.requests), queue.requestsExecuting)
|
||||||
}
|
}
|
||||||
// When a request is dequeued for service -> qs.virtualStart += G
|
// When a request is dequeued for service -> qs.virtualStart += G
|
||||||
queue.virtualStart += qs.estimatedServiceTime
|
queue.virtualStart += qs.estimatedServiceTime
|
||||||
metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, qs.totRequestsExecuting)
|
|
||||||
request.decision.SetLocked(decisionExecute)
|
request.decision.SetLocked(decisionExecute)
|
||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cancelWait ensures the request is not waiting
|
||||||
|
func (qs *queueSet) cancelWait(req *request) {
|
||||||
|
qs.lock.Lock()
|
||||||
|
defer qs.lock.Unlock()
|
||||||
|
if req.decision.IsSetLocked() {
|
||||||
|
// The request has already been removed from the queue
|
||||||
|
// and so we consider its wait to be over.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
req.decision.SetLocked(decisionCancel)
|
||||||
|
queue := req.queue
|
||||||
|
// remove the request from the queue as it has timed out
|
||||||
|
for i := range queue.requests {
|
||||||
|
if req == queue.requests[i] {
|
||||||
|
// remove the request
|
||||||
|
queue.requests = append(queue.requests[:i], queue.requests[i+1:]...)
|
||||||
|
qs.totRequestsWaiting--
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// selectQueueLocked examines the queues in round robin order and
|
// selectQueueLocked examines the queues in round robin order and
|
||||||
// returns the first one of those for which the virtual finish time of
|
// returns the first one of those for which the virtual finish time of
|
||||||
// the oldest waiting request is minimal.
|
// the oldest waiting request is minimal.
|
||||||
@ -583,6 +646,8 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) {
|
|||||||
|
|
||||||
qs.finishRequestLocked(req)
|
qs.finishRequestLocked(req)
|
||||||
qs.dispatchAsMuchAsPossibleLocked()
|
qs.dispatchAsMuchAsPossibleLocked()
|
||||||
|
metrics.UpdateFlowControlRequestsInQueue(qs.qCfg.Name, qs.totRequestsWaiting)
|
||||||
|
metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting)
|
||||||
}
|
}
|
||||||
|
|
||||||
// finishRequestLocked is a callback that should be used when a
|
// finishRequestLocked is a callback that should be used when a
|
||||||
@ -590,11 +655,10 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) {
|
|||||||
// callback updates important state in the queueSet
|
// callback updates important state in the queueSet
|
||||||
func (qs *queueSet) finishRequestLocked(r *request) {
|
func (qs *queueSet) finishRequestLocked(r *request) {
|
||||||
qs.totRequestsExecuting--
|
qs.totRequestsExecuting--
|
||||||
metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, qs.totRequestsExecuting)
|
|
||||||
|
|
||||||
if r.queue == nil {
|
if r.queue == nil {
|
||||||
if klog.V(6) {
|
if klog.V(6) {
|
||||||
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting)
|
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -609,12 +673,12 @@ func (qs *queueSet) finishRequestLocked(r *request) {
|
|||||||
r.queue.requestsExecuting--
|
r.queue.requestsExecuting--
|
||||||
|
|
||||||
if klog.V(6) {
|
if klog.V(6) {
|
||||||
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index, r.queue.virtualStart, S, len(r.queue.requests), r.queue.requestsExecuting)
|
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index, r.queue.virtualStart, S, len(r.queue.requests), r.queue.requestsExecuting)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If there are more queues than desired and this one has no
|
// If there are more queues than desired and this one has no
|
||||||
// requests then remove it
|
// requests then remove it
|
||||||
if len(qs.queues) > qs.config.DesiredNumQueues &&
|
if len(qs.queues) > qs.qCfg.DesiredNumQueues &&
|
||||||
len(r.queue.requests) == 0 &&
|
len(r.queue.requests) == 0 &&
|
||||||
r.queue.requestsExecuting == 0 {
|
r.queue.requestsExecuting == 0 {
|
||||||
qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.queue.index)
|
qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.queue.index)
|
||||||
|
@ -141,12 +141,11 @@ func init() {
|
|||||||
func TestNoRestraint(t *testing.T) {
|
func TestNoRestraint(t *testing.T) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||||
nrf := test.NewNoRestraintFactory()
|
nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{})
|
||||||
config := fq.QueueSetConfig{}
|
|
||||||
nr, err := nrf.NewQueueSet(config)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("QueueSet creation failed with %v", err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
nr := nrc.Complete(fq.DispatchingConfig{})
|
||||||
exerciseQueueSetUniformScenario(t, "NoRestraint", nr, []uniformClient{
|
exerciseQueueSetUniformScenario(t, "NoRestraint", nr, []uniformClient{
|
||||||
{1001001001, 5, 10, time.Second, time.Second},
|
{1001001001, 5, 10, time.Second, time.Second},
|
||||||
{2002002002, 2, 10, time.Second, time.Second / 2},
|
{2002002002, 2, 10, time.Second, time.Second / 2},
|
||||||
@ -158,18 +157,18 @@ func TestUniformFlows(t *testing.T) {
|
|||||||
|
|
||||||
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||||
qsf := NewQueueSetFactory(clk, counter)
|
qsf := NewQueueSetFactory(clk, counter)
|
||||||
config := fq.QueueSetConfig{
|
qCfg := fq.QueuingConfig{
|
||||||
Name: "TestUniformFlows",
|
Name: "TestUniformFlows",
|
||||||
ConcurrencyLimit: 4,
|
|
||||||
DesiredNumQueues: 8,
|
DesiredNumQueues: 8,
|
||||||
QueueLengthLimit: 6,
|
QueueLengthLimit: 6,
|
||||||
HandSize: 3,
|
HandSize: 3,
|
||||||
RequestWaitLimit: 10 * time.Minute,
|
RequestWaitLimit: 10 * time.Minute,
|
||||||
}
|
}
|
||||||
qs, err := qsf.NewQueueSet(config)
|
qsc, err := qsf.BeginConstruction(qCfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("QueueSet creation failed with %v", err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4})
|
||||||
|
|
||||||
exerciseQueueSetUniformScenario(t, "UniformFlows", qs, []uniformClient{
|
exerciseQueueSetUniformScenario(t, "UniformFlows", qs, []uniformClient{
|
||||||
{1001001001, 5, 10, time.Second, time.Second},
|
{1001001001, 5, 10, time.Second, time.Second},
|
||||||
@ -182,18 +181,18 @@ func TestDifferentFlows(t *testing.T) {
|
|||||||
|
|
||||||
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||||
qsf := NewQueueSetFactory(clk, counter)
|
qsf := NewQueueSetFactory(clk, counter)
|
||||||
config := fq.QueueSetConfig{
|
qCfg := fq.QueuingConfig{
|
||||||
Name: "TestDifferentFlows",
|
Name: "TestDifferentFlows",
|
||||||
ConcurrencyLimit: 4,
|
|
||||||
DesiredNumQueues: 8,
|
DesiredNumQueues: 8,
|
||||||
QueueLengthLimit: 6,
|
QueueLengthLimit: 6,
|
||||||
HandSize: 3,
|
HandSize: 3,
|
||||||
RequestWaitLimit: 10 * time.Minute,
|
RequestWaitLimit: 10 * time.Minute,
|
||||||
}
|
}
|
||||||
qs, err := qsf.NewQueueSet(config)
|
qsc, err := qsf.BeginConstruction(qCfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("QueueSet creation failed with %v", err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4})
|
||||||
|
|
||||||
exerciseQueueSetUniformScenario(t, "DifferentFlows", qs, []uniformClient{
|
exerciseQueueSetUniformScenario(t, "DifferentFlows", qs, []uniformClient{
|
||||||
{1001001001, 6, 10, time.Second, time.Second},
|
{1001001001, 6, 10, time.Second, time.Second},
|
||||||
@ -206,18 +205,15 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) {
|
|||||||
|
|
||||||
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||||
qsf := NewQueueSetFactory(clk, counter)
|
qsf := NewQueueSetFactory(clk, counter)
|
||||||
config := fq.QueueSetConfig{
|
qCfg := fq.QueuingConfig{
|
||||||
Name: "TestDifferentFlowsWithoutQueuing",
|
Name: "TestDifferentFlowsWithoutQueuing",
|
||||||
ConcurrencyLimit: 4,
|
|
||||||
DesiredNumQueues: 0,
|
DesiredNumQueues: 0,
|
||||||
QueueLengthLimit: 6,
|
|
||||||
HandSize: 3,
|
|
||||||
RequestWaitLimit: 10 * time.Minute,
|
|
||||||
}
|
}
|
||||||
qs, err := qsf.NewQueueSet(config)
|
qsc, err := qsf.BeginConstruction(qCfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("QueueSet creation failed with %v", err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4})
|
||||||
|
|
||||||
exerciseQueueSetUniformScenario(t, "DifferentFlowsWithoutQueuing", qs, []uniformClient{
|
exerciseQueueSetUniformScenario(t, "DifferentFlowsWithoutQueuing", qs, []uniformClient{
|
||||||
{1001001001, 6, 10, time.Second, 57 * time.Millisecond},
|
{1001001001, 6, 10, time.Second, 57 * time.Millisecond},
|
||||||
@ -230,20 +226,65 @@ func TestTimeout(t *testing.T) {
|
|||||||
|
|
||||||
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||||
qsf := NewQueueSetFactory(clk, counter)
|
qsf := NewQueueSetFactory(clk, counter)
|
||||||
config := fq.QueueSetConfig{
|
qCfg := fq.QueuingConfig{
|
||||||
Name: "TestTimeout",
|
Name: "TestTimeout",
|
||||||
ConcurrencyLimit: 1,
|
|
||||||
DesiredNumQueues: 128,
|
DesiredNumQueues: 128,
|
||||||
QueueLengthLimit: 128,
|
QueueLengthLimit: 128,
|
||||||
HandSize: 1,
|
HandSize: 1,
|
||||||
RequestWaitLimit: 0,
|
RequestWaitLimit: 0,
|
||||||
}
|
}
|
||||||
qs, err := qsf.NewQueueSet(config)
|
qsc, err := qsf.BeginConstruction(qCfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("QueueSet creation failed with %v", err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1})
|
||||||
|
|
||||||
exerciseQueueSetUniformScenario(t, "Timeout", qs, []uniformClient{
|
exerciseQueueSetUniformScenario(t, "Timeout", qs, []uniformClient{
|
||||||
{1001001001, 5, 100, time.Second, time.Second},
|
{1001001001, 5, 100, time.Second, time.Second},
|
||||||
}, time.Second*10, true, false, clk, counter)
|
}, time.Second*10, true, false, clk, counter)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestContextCancel(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||||
|
qsf := NewQueueSetFactory(clk, counter)
|
||||||
|
qCfg := fq.QueuingConfig{
|
||||||
|
Name: "TestTimeout",
|
||||||
|
DesiredNumQueues: 11,
|
||||||
|
QueueLengthLimit: 11,
|
||||||
|
HandSize: 1,
|
||||||
|
RequestWaitLimit: 15 * time.Second,
|
||||||
|
}
|
||||||
|
qsc, err := qsf.BeginConstruction(qCfg)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1})
|
||||||
|
counter.Add(1) // account for the goroutine running this test
|
||||||
|
ctx1 := context.Background()
|
||||||
|
another1, exec1, cleanup1 := qs.Wait(ctx1, 1, "test", "one")
|
||||||
|
if another1 || !exec1 {
|
||||||
|
t.Errorf("Unexpected: another1=%v, exec1=%v", another1, exec1)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer cleanup1()
|
||||||
|
ctx2, cancel2 := context.WithCancel(context.Background())
|
||||||
|
tBefore := time.Now()
|
||||||
|
go func() {
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
cancel2()
|
||||||
|
}()
|
||||||
|
another2, exec2, cleanup2 := qs.Wait(ctx2, 2, "test", "two")
|
||||||
|
tAfter := time.Now()
|
||||||
|
if another2 || exec2 {
|
||||||
|
t.Errorf("Unexpected: another2=%v, exec2=%v", another2, exec2)
|
||||||
|
if exec2 {
|
||||||
|
defer cleanup2()
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
dt := tAfter.Sub(tBefore)
|
||||||
|
if dt < time.Second || dt > 2*time.Second {
|
||||||
|
t.Errorf("Unexpected: dt=%d", dt)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -30,15 +30,16 @@ type request struct {
|
|||||||
// startTime is the real time when the request began executing
|
// startTime is the real time when the request began executing
|
||||||
startTime time.Time
|
startTime time.Time
|
||||||
|
|
||||||
// decision gets set to the decision about what to do with this request
|
// decision gets set to a `requestDecision` indicating what to do
|
||||||
decision promise.LockingMutable
|
// with this request. It gets set exactly once, when the request
|
||||||
|
// is removed from its queue. The value will be decisionReject,
|
||||||
|
// decisionCancel, or decisionExecute; decisionTryAnother never
|
||||||
|
// appears here.
|
||||||
|
decision promise.LockingWriteOnce
|
||||||
|
|
||||||
// arrivalTime is the real time when the request entered this system
|
// arrivalTime is the real time when the request entered this system
|
||||||
arrivalTime time.Time
|
arrivalTime time.Time
|
||||||
|
|
||||||
// isWaiting indicates whether the request is presently waiting in a queue
|
|
||||||
isWaiting bool
|
|
||||||
|
|
||||||
// descr1 and descr2 are not used in any logic but they appear in
|
// descr1 and descr2 are not used in any logic but they appear in
|
||||||
// log messages
|
// log messages
|
||||||
descr1, descr2 interface{}
|
descr1, descr2 interface{}
|
||||||
@ -60,7 +61,6 @@ type queue struct {
|
|||||||
|
|
||||||
// Enqueue enqueues a request into the queue
|
// Enqueue enqueues a request into the queue
|
||||||
func (q *queue) Enqueue(request *request) {
|
func (q *queue) Enqueue(request *request) {
|
||||||
request.isWaiting = true
|
|
||||||
q.requests = append(q.requests, request)
|
q.requests = append(q.requests, request)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -71,8 +71,6 @@ func (q *queue) Dequeue() (*request, bool) {
|
|||||||
}
|
}
|
||||||
request := q.requests[0]
|
request := q.requests[0]
|
||||||
q.requests = q.requests[1:]
|
q.requests = q.requests[1:]
|
||||||
|
|
||||||
request.isWaiting = false
|
|
||||||
return request, true
|
return request, true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -31,14 +31,20 @@ func NewNoRestraintFactory() fq.QueueSetFactory {
|
|||||||
|
|
||||||
type noRestraintFactory struct{}
|
type noRestraintFactory struct{}
|
||||||
|
|
||||||
func (noRestraintFactory) NewQueueSet(config fq.QueueSetConfig) (fq.QueueSet, error) {
|
type noRestraintCompeter struct{}
|
||||||
return noRestraint{}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type noRestraint struct{}
|
type noRestraint struct{}
|
||||||
|
|
||||||
func (noRestraint) SetConfiguration(config fq.QueueSetConfig) error {
|
func (noRestraintFactory) BeginConstruction(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) {
|
||||||
return nil
|
return noRestraintCompeter{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (noRestraintCompeter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet {
|
||||||
|
return noRestraint{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (noRestraint) BeginConfigChange(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) {
|
||||||
|
return noRestraintCompeter{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (noRestraint) Quiesce(fq.EmptyHandler) {
|
func (noRestraint) Quiesce(fq.EmptyHandler) {
|
||||||
|
Loading…
Reference in New Issue
Block a user