mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Simplified logic around context cancel, removing bugs
Previously, a `decisionCancel` could overwrite a `decisionReject` or `decisionExecute`, causing confusion. Now a request gets exactly one decision and there is no confusion. Also added write-once to the promise package and refactored.
This commit is contained in:
parent
1e170637c3
commit
1c092bf635
@ -83,7 +83,10 @@ type QueueSet interface {
|
||||
// should start executing the request and, once the request
|
||||
// finishes execution or is canceled, call afterExecution().
|
||||
// Otherwise the client should not execute the request and
|
||||
// afterExecution is irrelevant.
|
||||
// afterExecution is irrelevant. Canceling the context while the
|
||||
// request is waiting in its queue will cut short that wait and
|
||||
// cause a return with tryAnother and execute both false; later
|
||||
// cancellations are the caller's problem.
|
||||
Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func())
|
||||
}
|
||||
|
||||
|
@ -16,27 +16,87 @@ limitations under the License.
|
||||
|
||||
package promise
|
||||
|
||||
// Mutable is a variable that is initially not set and can be set one
|
||||
// or more times (unlike a traditional "promise", which can be written
|
||||
// only once).
|
||||
type Mutable interface {
|
||||
// This file defines interfaces for promsies and futures and related
|
||||
// things.
|
||||
|
||||
// Set writes a value into this variable and unblocks every
|
||||
// goroutine waiting for this variable to have a value
|
||||
Set(interface{})
|
||||
|
||||
// Get reads the value of this variable. If this variable is
|
||||
// not set yet then this call blocks until this variable gets a value.
|
||||
// Readable represents a variable that is initially not set and later
|
||||
// becomes set. Some instances may be set to multiple values in
|
||||
// series. A Readable for a variable that can only get one value is
|
||||
// commonly known as a "future".
|
||||
type Readable interface {
|
||||
// Get reads the current value of this variable. If this variable
|
||||
// is not set yet then this call blocks until this variable gets a
|
||||
// value.
|
||||
Get() interface{}
|
||||
|
||||
// IsSet returns immediately with an indication of whether this
|
||||
// variable has been set.
|
||||
IsSet() bool
|
||||
}
|
||||
|
||||
// LockingMutable is a Mutable whose implementation is protected by a lock
|
||||
type LockingMutable interface {
|
||||
Mutable
|
||||
|
||||
// SetLocked is like Set but the caller must already hold the lock
|
||||
SetLocked(interface{})
|
||||
// LockingReadable is a Readable whose implementation is protected by
|
||||
// a lock
|
||||
type LockingReadable interface {
|
||||
Readable
|
||||
|
||||
// GetLocked is like Get but the caller must already hold the lock
|
||||
GetLocked() interface{}
|
||||
|
||||
// IsSetLocked is like IsSet but the caller must already hold the lock
|
||||
IsSetLocked() bool
|
||||
}
|
||||
|
||||
// WriteOnceOnly represents a variable that is initially not set and
|
||||
// can be set once.
|
||||
type WriteOnceOnly interface {
|
||||
// Set normally writes a value into this variable, unblocks every
|
||||
// goroutine waiting for this variable to have a value, and
|
||||
// returns true. In the unhappy case that this variable is
|
||||
// already set, this method returns false.
|
||||
Set(interface{}) bool
|
||||
}
|
||||
|
||||
// WriteOnce represents a variable that is initially not set and can
|
||||
// be set once and is readable. This is the common meaning for
|
||||
// "promise".
|
||||
type WriteOnce interface {
|
||||
Readable
|
||||
WriteOnceOnly
|
||||
}
|
||||
|
||||
// LockingWriteOnce is a WriteOnce whose implementation is protected
|
||||
// by a lock.
|
||||
type LockingWriteOnce interface {
|
||||
LockingReadable
|
||||
WriteOnceOnly
|
||||
|
||||
// SetLocked is like Set but the caller must already hold the lock
|
||||
SetLocked(interface{}) bool
|
||||
}
|
||||
|
||||
// WriteMultipleOnly represents a variable that is initially not set
|
||||
// and can be set one or more times (unlike a traditional "promise",
|
||||
// which can be written only once).
|
||||
type WriteMultipleOnly interface {
|
||||
// Set writes a value into this variable and unblocks every
|
||||
// goroutine waiting for this variable to have a value
|
||||
Set(interface{})
|
||||
}
|
||||
|
||||
// WriteMultiple represents a variable that is initially not set and
|
||||
// can be set one or more times (unlike a traditional "promise", which
|
||||
// can be written only once) and is readable.
|
||||
type WriteMultiple interface {
|
||||
Readable
|
||||
WriteMultipleOnly
|
||||
}
|
||||
|
||||
// LockingWriteMultiple is a WriteMultiple whose implementation is
|
||||
// protected by a lock.
|
||||
type LockingWriteMultiple interface {
|
||||
LockingReadable
|
||||
WriteMultipleOnly
|
||||
|
||||
// SetLocked is like Set but the caller must already hold the lock
|
||||
SetLocked(interface{})
|
||||
}
|
||||
|
@ -23,57 +23,108 @@ import (
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise"
|
||||
)
|
||||
|
||||
// lockingPromise implements LockingMutable based on a condition
|
||||
// variable. This implementation tracks active goroutines: the given
|
||||
// counter is decremented for a goroutine waiting for this varible to
|
||||
// be set and incremented when such a goroutine is unblocked.
|
||||
type lockingPromise struct {
|
||||
// promisoid is the data and reading behavior common to all the
|
||||
// promise-like abstractions implemented here. This implementation is
|
||||
// based on a condition variable. This implementation tracks active
|
||||
// goroutines: the given counter is decremented for a goroutine
|
||||
// waiting for this varible to be set and incremented when such a
|
||||
// goroutine is unblocked.
|
||||
type promisoid struct {
|
||||
lock sync.Locker
|
||||
cond sync.Cond
|
||||
activeCounter counter.GoRoutineCounter // counter of active goroutines
|
||||
waitingCount int // number of goroutines idle due to this mutable being unset
|
||||
waitingCount int // number of goroutines idle due to this being unset
|
||||
isSet bool
|
||||
value interface{}
|
||||
}
|
||||
|
||||
var _ promise.LockingMutable = &lockingPromise{}
|
||||
func (pr *promisoid) Get() interface{} {
|
||||
pr.lock.Lock()
|
||||
defer pr.lock.Unlock()
|
||||
return pr.GetLocked()
|
||||
}
|
||||
|
||||
// NewLockingPromise makes a new promise.LockingMutable
|
||||
func NewLockingPromise(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingMutable {
|
||||
return &lockingPromise{
|
||||
func (pr *promisoid) GetLocked() interface{} {
|
||||
if !pr.isSet {
|
||||
pr.waitingCount++
|
||||
pr.activeCounter.Add(-1)
|
||||
pr.cond.Wait()
|
||||
}
|
||||
return pr.value
|
||||
}
|
||||
|
||||
func (pr *promisoid) IsSet() bool {
|
||||
pr.lock.Lock()
|
||||
defer pr.lock.Unlock()
|
||||
return pr.IsSetLocked()
|
||||
}
|
||||
|
||||
func (pr *promisoid) IsSetLocked() bool {
|
||||
return pr.isSet
|
||||
}
|
||||
|
||||
type writeOnce struct {
|
||||
promisoid
|
||||
}
|
||||
|
||||
var _ promise.LockingWriteOnce = &writeOnce{}
|
||||
|
||||
// NewWriteOnce makes a new promise.LockingWriteOnce
|
||||
func NewWriteOnce(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingWriteOnce {
|
||||
return &writeOnce{promisoid{
|
||||
lock: lock,
|
||||
cond: *sync.NewCond(lock),
|
||||
activeCounter: activeCounter,
|
||||
}}
|
||||
}
|
||||
|
||||
func (wr *writeOnce) Set(value interface{}) bool {
|
||||
wr.lock.Lock()
|
||||
defer wr.lock.Unlock()
|
||||
return wr.SetLocked(value)
|
||||
}
|
||||
|
||||
func (wr *writeOnce) SetLocked(value interface{}) bool {
|
||||
if wr.isSet {
|
||||
return false
|
||||
}
|
||||
wr.isSet = true
|
||||
wr.value = value
|
||||
if wr.waitingCount > 0 {
|
||||
wr.activeCounter.Add(wr.waitingCount)
|
||||
wr.waitingCount = 0
|
||||
wr.cond.Broadcast()
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
type writeMultiple struct {
|
||||
promisoid
|
||||
}
|
||||
|
||||
var _ promise.LockingWriteMultiple = &writeMultiple{}
|
||||
|
||||
// NewWriteMultiple makes a new promise.LockingWriteMultiple
|
||||
func NewWriteMultiple(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingWriteMultiple {
|
||||
return &writeMultiple{promisoid{
|
||||
lock: lock,
|
||||
cond: *sync.NewCond(lock),
|
||||
activeCounter: activeCounter,
|
||||
}}
|
||||
}
|
||||
|
||||
func (wr *writeMultiple) Set(value interface{}) {
|
||||
wr.lock.Lock()
|
||||
defer wr.lock.Unlock()
|
||||
wr.SetLocked(value)
|
||||
}
|
||||
|
||||
func (wr *writeMultiple) SetLocked(value interface{}) {
|
||||
wr.isSet = true
|
||||
wr.value = value
|
||||
if wr.waitingCount > 0 {
|
||||
wr.activeCounter.Add(wr.waitingCount)
|
||||
wr.waitingCount = 0
|
||||
wr.cond.Broadcast()
|
||||
}
|
||||
}
|
||||
|
||||
func (lp *lockingPromise) Set(value interface{}) {
|
||||
lp.lock.Lock()
|
||||
defer lp.lock.Unlock()
|
||||
lp.SetLocked(value)
|
||||
}
|
||||
|
||||
func (lp *lockingPromise) Get() interface{} {
|
||||
lp.lock.Lock()
|
||||
defer lp.lock.Unlock()
|
||||
return lp.GetLocked()
|
||||
}
|
||||
|
||||
func (lp *lockingPromise) SetLocked(value interface{}) {
|
||||
lp.isSet = true
|
||||
lp.value = value
|
||||
if lp.waitingCount > 0 {
|
||||
lp.activeCounter.Add(lp.waitingCount)
|
||||
lp.waitingCount = 0
|
||||
lp.cond.Broadcast()
|
||||
}
|
||||
}
|
||||
|
||||
func (lp *lockingPromise) GetLocked() interface{} {
|
||||
if !lp.isSet {
|
||||
lp.waitingCount++
|
||||
lp.activeCounter.Add(-1)
|
||||
lp.cond.Wait()
|
||||
}
|
||||
return lp.value
|
||||
}
|
||||
|
@ -25,16 +25,16 @@ import (
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock"
|
||||
)
|
||||
|
||||
func TestLockingPromise(t *testing.T) {
|
||||
func TestLockingWriteMultiple(t *testing.T) {
|
||||
now := time.Now()
|
||||
clock, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||
var lock sync.Mutex
|
||||
lp := NewLockingPromise(&lock, counter)
|
||||
wr := NewWriteMultiple(&lock, counter)
|
||||
var gots int32
|
||||
var got atomic.Value
|
||||
counter.Add(1)
|
||||
go func() {
|
||||
got.Store(lp.Get())
|
||||
got.Store(wr.Get())
|
||||
atomic.AddInt32(&gots, 1)
|
||||
counter.Add(-1)
|
||||
}()
|
||||
@ -43,8 +43,11 @@ func TestLockingPromise(t *testing.T) {
|
||||
if atomic.LoadInt32(&gots) != 0 {
|
||||
t.Error("Get returned before Set")
|
||||
}
|
||||
var aval = &now
|
||||
lp.Set(aval)
|
||||
if wr.IsSet() {
|
||||
t.Error("IsSet before Set")
|
||||
}
|
||||
aval := &now
|
||||
wr.Set(aval)
|
||||
clock.Run(nil)
|
||||
time.Sleep(time.Second)
|
||||
if atomic.LoadInt32(&gots) != 1 {
|
||||
@ -53,9 +56,12 @@ func TestLockingPromise(t *testing.T) {
|
||||
if got.Load() != aval {
|
||||
t.Error("Get did not return what was Set")
|
||||
}
|
||||
if !wr.IsSet() {
|
||||
t.Error("IsSet()==false after Set")
|
||||
}
|
||||
counter.Add(1)
|
||||
go func() {
|
||||
got.Store(lp.Get())
|
||||
got.Store(wr.Get())
|
||||
atomic.AddInt32(&gots, 1)
|
||||
counter.Add(-1)
|
||||
}()
|
||||
@ -67,4 +73,80 @@ func TestLockingPromise(t *testing.T) {
|
||||
if got.Load() != aval {
|
||||
t.Error("Second Get did not return what was Set")
|
||||
}
|
||||
if !wr.IsSet() {
|
||||
t.Error("IsSet()==false after second Set")
|
||||
}
|
||||
later := time.Now()
|
||||
bval := &later
|
||||
wr.Set(bval)
|
||||
if !wr.IsSet() {
|
||||
t.Error("IsSet() returned false after second set")
|
||||
} else if wr.Get() != bval {
|
||||
t.Error("Get() after second Set returned wrong value")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLockingWriteOnce(t *testing.T) {
|
||||
now := time.Now()
|
||||
clock, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||
var lock sync.Mutex
|
||||
wr := NewWriteOnce(&lock, counter)
|
||||
var gots int32
|
||||
var got atomic.Value
|
||||
counter.Add(1)
|
||||
go func() {
|
||||
got.Store(wr.Get())
|
||||
atomic.AddInt32(&gots, 1)
|
||||
counter.Add(-1)
|
||||
}()
|
||||
clock.Run(nil)
|
||||
time.Sleep(time.Second)
|
||||
if atomic.LoadInt32(&gots) != 0 {
|
||||
t.Error("Get returned before Set")
|
||||
}
|
||||
if wr.IsSet() {
|
||||
t.Error("IsSet before Set")
|
||||
}
|
||||
aval := &now
|
||||
if !wr.Set(aval) {
|
||||
t.Error("Set() returned false")
|
||||
}
|
||||
clock.Run(nil)
|
||||
time.Sleep(time.Second)
|
||||
if atomic.LoadInt32(&gots) != 1 {
|
||||
t.Error("Get did not return after Set")
|
||||
}
|
||||
if got.Load() != aval {
|
||||
t.Error("Get did not return what was Set")
|
||||
}
|
||||
if !wr.IsSet() {
|
||||
t.Error("IsSet()==false after Set")
|
||||
}
|
||||
counter.Add(1)
|
||||
go func() {
|
||||
got.Store(wr.Get())
|
||||
atomic.AddInt32(&gots, 1)
|
||||
counter.Add(-1)
|
||||
}()
|
||||
clock.Run(nil)
|
||||
time.Sleep(time.Second)
|
||||
if atomic.LoadInt32(&gots) != 2 {
|
||||
t.Error("Second Get did not return immediately")
|
||||
}
|
||||
if got.Load() != aval {
|
||||
t.Error("Second Get did not return what was Set")
|
||||
}
|
||||
if !wr.IsSet() {
|
||||
t.Error("IsSet()==false after second Set")
|
||||
}
|
||||
later := time.Now()
|
||||
bval := &later
|
||||
if wr.Set(bval) {
|
||||
t.Error("second Set() returned true")
|
||||
}
|
||||
if !wr.IsSet() {
|
||||
t.Error("IsSet() returned false after second set")
|
||||
} else if wr.Get() != aval {
|
||||
t.Error("Get() after second Set returned wrong value")
|
||||
}
|
||||
}
|
||||
|
@ -255,6 +255,11 @@ const (
|
||||
func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func()) {
|
||||
var req *request
|
||||
decision := func() requestDecision {
|
||||
// Decide what to do and update metrics accordingly. Metrics
|
||||
// about total requests queued and executing are updated on
|
||||
// each way out of this function rather than in lower level
|
||||
// functions because there can be multiple lower level changes
|
||||
// in one invocation here.
|
||||
qs.lockAndSyncTime()
|
||||
defer qs.lock.Unlock()
|
||||
// A call to Wait while the system is quiescing will be rebuffed by
|
||||
@ -273,6 +278,7 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i
|
||||
return decisionReject
|
||||
}
|
||||
req = qs.dispatchSansQueue(descr1, descr2)
|
||||
metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting)
|
||||
return decisionExecute
|
||||
}
|
||||
|
||||
@ -284,6 +290,7 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i
|
||||
// we are at max queue length
|
||||
// 4) If not rejected, create a request and enqueue
|
||||
req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue, descr1, descr2)
|
||||
defer metrics.UpdateFlowControlRequestsInQueue(qs.qCfg.Name, qs.totRequestsWaiting)
|
||||
// req == nil means that the request was rejected - no remaining
|
||||
// concurrency shares and at max queue length already
|
||||
if req == nil {
|
||||
@ -302,6 +309,7 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i
|
||||
// fair queuing technique to pick a queue and dispatch a
|
||||
// request from that queue.
|
||||
qs.dispatchAsMuchAsPossibleLocked()
|
||||
defer metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting)
|
||||
|
||||
// ========================================================================
|
||||
// Step 3:
|
||||
@ -319,7 +327,7 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i
|
||||
select {
|
||||
case <-doneCh:
|
||||
klog.V(6).Infof("QS(%s): Context of request %#+v %#+v is Done", qs.qCfg.Name, descr1, descr2)
|
||||
req.decision.Set(decisionCancel)
|
||||
qs.cancelWait(req)
|
||||
}
|
||||
qs.goroutineDoneOrBlocked()
|
||||
}()
|
||||
@ -330,40 +338,24 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i
|
||||
// The final step in Wait is to wait on a decision from
|
||||
// somewhere and then act on it.
|
||||
decisionAny := req.decision.GetLocked()
|
||||
var decision requestDecision
|
||||
switch dec := decisionAny.(type) {
|
||||
case requestDecision:
|
||||
decision = dec
|
||||
default:
|
||||
qs.syncTimeLocked()
|
||||
decision, isDecision := decisionAny.(requestDecision)
|
||||
if !isDecision {
|
||||
klog.Errorf("QS(%s): Impossible decision %#+v (of type %T) for request %#+v %#+v", qs.qCfg.Name, decisionAny, decisionAny, descr1, descr2)
|
||||
decision = decisionExecute
|
||||
decision = decisionExecute // yeah, this is a no-op
|
||||
}
|
||||
switch decision {
|
||||
case decisionReject:
|
||||
klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, descr1, descr2)
|
||||
metrics.AddReject(qs.qCfg.Name, "time-out")
|
||||
case decisionCancel:
|
||||
qs.syncTimeLocked()
|
||||
// TODO(aaron-prindle) add metrics to these two cases
|
||||
if req.isWaiting {
|
||||
klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, descr1, descr2)
|
||||
// remove the request from the queue as it has timed out
|
||||
for i := range req.queue.requests {
|
||||
if req == req.queue.requests[i] {
|
||||
// remove the request
|
||||
req.queue.requests = append(req.queue.requests[:i],
|
||||
req.queue.requests[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
// At this point, if the qs is quiescing,
|
||||
// has zero requests executing, and has zero requests enqueued
|
||||
// then a call to the EmptyHandler should be forked.
|
||||
qs.maybeForkEmptyHandlerLocked()
|
||||
} else {
|
||||
klog.V(5).Infof("QS(%s): request %#+v %#+v canceled shortly after dispatch", qs.qCfg.Name, descr1, descr2)
|
||||
}
|
||||
// TODO(aaron-prindle) add metrics for this case
|
||||
klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, descr1, descr2)
|
||||
}
|
||||
// At this point, if the qs is quiescing,
|
||||
// has zero requests executing, and has zero requests enqueued
|
||||
// then a call to the EmptyHandler should be forked.
|
||||
qs.maybeForkEmptyHandlerLocked()
|
||||
return decision
|
||||
}()
|
||||
switch decision {
|
||||
@ -439,7 +431,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue uint64,
|
||||
|
||||
// Create a request and enqueue
|
||||
req := &request{
|
||||
decision: lockingpromise.NewLockingPromise(&qs.lock, qs.counter),
|
||||
decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter),
|
||||
arrivalTime: qs.clock.Now(),
|
||||
queue: queue,
|
||||
descr1: descr1,
|
||||
@ -531,7 +523,6 @@ func (qs *queueSet) enqueueLocked(request *request) {
|
||||
}
|
||||
queue.Enqueue(request)
|
||||
qs.totRequestsWaiting++
|
||||
metrics.UpdateFlowControlRequestsInQueue(qs.qCfg.Name, qs.totRequestsWaiting)
|
||||
}
|
||||
|
||||
// dispatchAsMuchAsPossibleLocked runs a loop, as long as there
|
||||
@ -561,7 +552,6 @@ func (qs *queueSet) dispatchSansQueue(descr1, descr2 interface{}) *request {
|
||||
if klog.V(5) {
|
||||
klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, descr1, descr2, qs.totRequestsExecuting)
|
||||
}
|
||||
metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting)
|
||||
return req
|
||||
}
|
||||
|
||||
@ -579,7 +569,11 @@ func (qs *queueSet) dispatchLocked() bool {
|
||||
return false
|
||||
}
|
||||
request.startTime = qs.clock.Now()
|
||||
// request dequeued, service has started
|
||||
// At this moment the request leaves its queue and starts
|
||||
// executing. We do not recognize any interim state between
|
||||
// "queued" and "executing". While that means "executing"
|
||||
// includes a little overhead from this package, this is not a
|
||||
// problem because other overhead is also included.
|
||||
qs.totRequestsWaiting--
|
||||
qs.totRequestsExecuting++
|
||||
queue.requestsExecuting++
|
||||
@ -588,11 +582,33 @@ func (qs *queueSet) dispatchLocked() bool {
|
||||
}
|
||||
// When a request is dequeued for service -> qs.virtualStart += G
|
||||
queue.virtualStart += qs.estimatedServiceTime
|
||||
metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting)
|
||||
request.decision.SetLocked(decisionExecute)
|
||||
return ok
|
||||
}
|
||||
|
||||
// cancelWait ensures the request is not waiting
|
||||
func (qs *queueSet) cancelWait(req *request) {
|
||||
qs.lock.Lock()
|
||||
defer qs.lock.Unlock()
|
||||
if req.decision.IsSetLocked() {
|
||||
// The request has already been removed from the queue
|
||||
// and so we consider its wait to be over.
|
||||
return
|
||||
}
|
||||
req.decision.SetLocked(decisionCancel)
|
||||
queue := req.queue
|
||||
// remove the request from the queue as it has timed out
|
||||
for i := range queue.requests {
|
||||
if req == queue.requests[i] {
|
||||
// remove the request
|
||||
queue.requests = append(queue.requests[:i], queue.requests[i+1:]...)
|
||||
qs.totRequestsWaiting--
|
||||
break
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// selectQueueLocked examines the queues in round robin order and
|
||||
// returns the first one of those for which the virtual finish time of
|
||||
// the oldest waiting request is minimal.
|
||||
@ -630,6 +646,8 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) {
|
||||
|
||||
qs.finishRequestLocked(req)
|
||||
qs.dispatchAsMuchAsPossibleLocked()
|
||||
metrics.UpdateFlowControlRequestsInQueue(qs.qCfg.Name, qs.totRequestsWaiting)
|
||||
metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting)
|
||||
}
|
||||
|
||||
// finishRequestLocked is a callback that should be used when a
|
||||
@ -637,7 +655,6 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) {
|
||||
// callback updates important state in the queueSet
|
||||
func (qs *queueSet) finishRequestLocked(r *request) {
|
||||
qs.totRequestsExecuting--
|
||||
metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting)
|
||||
|
||||
if r.queue == nil {
|
||||
if klog.V(6) {
|
||||
|
@ -243,3 +243,48 @@ func TestTimeout(t *testing.T) {
|
||||
{1001001001, 5, 100, time.Second, time.Second},
|
||||
}, time.Second*10, true, false, clk, counter)
|
||||
}
|
||||
|
||||
func TestContextCancel(t *testing.T) {
|
||||
now := time.Now()
|
||||
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||
qsf := NewQueueSetFactory(clk, counter)
|
||||
qCfg := fq.QueuingConfig{
|
||||
Name: "TestTimeout",
|
||||
DesiredNumQueues: 11,
|
||||
QueueLengthLimit: 11,
|
||||
HandSize: 1,
|
||||
RequestWaitLimit: 15 * time.Second,
|
||||
}
|
||||
qsc, err := qsf.QualifyQueuingConfig(qCfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
qs := qsc.GetQueueSet(fq.DispatchingConfig{ConcurrencyLimit: 1})
|
||||
counter.Add(1) // account for the goroutine running this test
|
||||
ctx1 := context.Background()
|
||||
another1, exec1, cleanup1 := qs.Wait(ctx1, 1, "test", "one")
|
||||
if another1 || !exec1 {
|
||||
t.Errorf("Unexpected: another1=%v, exec1=%v", another1, exec1)
|
||||
return
|
||||
}
|
||||
defer cleanup1()
|
||||
ctx2, cancel2 := context.WithCancel(context.Background())
|
||||
tBefore := time.Now()
|
||||
go func() {
|
||||
time.Sleep(time.Second)
|
||||
cancel2()
|
||||
}()
|
||||
another2, exec2, cleanup2 := qs.Wait(ctx2, 2, "test", "two")
|
||||
tAfter := time.Now()
|
||||
if another2 || exec2 {
|
||||
t.Errorf("Unexpected: another2=%v, exec2=%v", another2, exec2)
|
||||
if exec2 {
|
||||
defer cleanup2()
|
||||
}
|
||||
} else {
|
||||
dt := tAfter.Sub(tBefore)
|
||||
if dt < time.Second || dt > 2*time.Second {
|
||||
t.Errorf("Unexpected: dt=%d", dt)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -30,15 +30,16 @@ type request struct {
|
||||
// startTime is the real time when the request began executing
|
||||
startTime time.Time
|
||||
|
||||
// decision gets set to the decision about what to do with this request
|
||||
decision promise.LockingMutable
|
||||
// decision gets set to a `requestDecision` indicating what to do
|
||||
// with this request. It gets set exactly once, when the request
|
||||
// is removed from its queue. The value will be decisionReject,
|
||||
// decisionCancel, or decisionExecute; decisionTryAnother never
|
||||
// appears here.
|
||||
decision promise.LockingWriteOnce
|
||||
|
||||
// arrivalTime is the real time when the request entered this system
|
||||
arrivalTime time.Time
|
||||
|
||||
// isWaiting indicates whether the request is presently waiting in a queue
|
||||
isWaiting bool
|
||||
|
||||
// descr1 and descr2 are not used in any logic but they appear in
|
||||
// log messages
|
||||
descr1, descr2 interface{}
|
||||
@ -60,7 +61,6 @@ type queue struct {
|
||||
|
||||
// Enqueue enqueues a request into the queue
|
||||
func (q *queue) Enqueue(request *request) {
|
||||
request.isWaiting = true
|
||||
q.requests = append(q.requests, request)
|
||||
}
|
||||
|
||||
@ -71,8 +71,6 @@ func (q *queue) Dequeue() (*request, bool) {
|
||||
}
|
||||
request := q.requests[0]
|
||||
q.requests = q.requests[1:]
|
||||
|
||||
request.isWaiting = false
|
||||
return request, true
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user