From 1c092bf635954bde9c9c363672fa156b9430206b Mon Sep 17 00:00:00 2001 From: Mike Spreitzer Date: Thu, 9 Jan 2020 14:03:13 -0500 Subject: [PATCH] Simplified logic around context cancel, removing bugs Previously, a `decisionCancel` could overwrite a `decisionReject` or `decisionExecute`, causing confusion. Now a request gets exactly one decision and there is no confusion. Also added write-once to the promise package and refactored. --- .../util/flowcontrol/fairqueuing/interface.go | 5 +- .../fairqueuing/promise/interface.go | 92 +++++++++--- .../promise/lockingpromise/lockingpromise.go | 133 ++++++++++++------ .../lockingpromise/lockingpromise_test.go | 94 ++++++++++++- .../fairqueuing/queueset/queueset.go | 83 ++++++----- .../fairqueuing/queueset/queueset_test.go | 45 ++++++ .../flowcontrol/fairqueuing/queueset/types.go | 14 +- 7 files changed, 361 insertions(+), 105 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go index c48acea7543..1822d92c0fe 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go @@ -83,7 +83,10 @@ type QueueSet interface { // should start executing the request and, once the request // finishes execution or is canceled, call afterExecution(). // Otherwise the client should not execute the request and - // afterExecution is irrelevant. + // afterExecution is irrelevant. Canceling the context while the + // request is waiting in its queue will cut short that wait and + // cause a return with tryAnother and execute both false; later + // cancellations are the caller's problem. Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func()) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go index 258469043b4..9dd1479bc40 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go @@ -16,27 +16,87 @@ limitations under the License. package promise -// Mutable is a variable that is initially not set and can be set one -// or more times (unlike a traditional "promise", which can be written -// only once). -type Mutable interface { +// This file defines interfaces for promsies and futures and related +// things. - // Set writes a value into this variable and unblocks every - // goroutine waiting for this variable to have a value - Set(interface{}) - - // Get reads the value of this variable. If this variable is - // not set yet then this call blocks until this variable gets a value. +// Readable represents a variable that is initially not set and later +// becomes set. Some instances may be set to multiple values in +// series. A Readable for a variable that can only get one value is +// commonly known as a "future". +type Readable interface { + // Get reads the current value of this variable. If this variable + // is not set yet then this call blocks until this variable gets a + // value. Get() interface{} + + // IsSet returns immediately with an indication of whether this + // variable has been set. + IsSet() bool } -// LockingMutable is a Mutable whose implementation is protected by a lock -type LockingMutable interface { - Mutable - - // SetLocked is like Set but the caller must already hold the lock - SetLocked(interface{}) +// LockingReadable is a Readable whose implementation is protected by +// a lock +type LockingReadable interface { + Readable // GetLocked is like Get but the caller must already hold the lock GetLocked() interface{} + + // IsSetLocked is like IsSet but the caller must already hold the lock + IsSetLocked() bool +} + +// WriteOnceOnly represents a variable that is initially not set and +// can be set once. +type WriteOnceOnly interface { + // Set normally writes a value into this variable, unblocks every + // goroutine waiting for this variable to have a value, and + // returns true. In the unhappy case that this variable is + // already set, this method returns false. + Set(interface{}) bool +} + +// WriteOnce represents a variable that is initially not set and can +// be set once and is readable. This is the common meaning for +// "promise". +type WriteOnce interface { + Readable + WriteOnceOnly +} + +// LockingWriteOnce is a WriteOnce whose implementation is protected +// by a lock. +type LockingWriteOnce interface { + LockingReadable + WriteOnceOnly + + // SetLocked is like Set but the caller must already hold the lock + SetLocked(interface{}) bool +} + +// WriteMultipleOnly represents a variable that is initially not set +// and can be set one or more times (unlike a traditional "promise", +// which can be written only once). +type WriteMultipleOnly interface { + // Set writes a value into this variable and unblocks every + // goroutine waiting for this variable to have a value + Set(interface{}) +} + +// WriteMultiple represents a variable that is initially not set and +// can be set one or more times (unlike a traditional "promise", which +// can be written only once) and is readable. +type WriteMultiple interface { + Readable + WriteMultipleOnly +} + +// LockingWriteMultiple is a WriteMultiple whose implementation is +// protected by a lock. +type LockingWriteMultiple interface { + LockingReadable + WriteMultipleOnly + + // SetLocked is like Set but the caller must already hold the lock + SetLocked(interface{}) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise.go index 5013abf3fd4..3f557214e6e 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise.go @@ -23,57 +23,108 @@ import ( "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise" ) -// lockingPromise implements LockingMutable based on a condition -// variable. This implementation tracks active goroutines: the given -// counter is decremented for a goroutine waiting for this varible to -// be set and incremented when such a goroutine is unblocked. -type lockingPromise struct { +// promisoid is the data and reading behavior common to all the +// promise-like abstractions implemented here. This implementation is +// based on a condition variable. This implementation tracks active +// goroutines: the given counter is decremented for a goroutine +// waiting for this varible to be set and incremented when such a +// goroutine is unblocked. +type promisoid struct { lock sync.Locker cond sync.Cond activeCounter counter.GoRoutineCounter // counter of active goroutines - waitingCount int // number of goroutines idle due to this mutable being unset + waitingCount int // number of goroutines idle due to this being unset isSet bool value interface{} } -var _ promise.LockingMutable = &lockingPromise{} +func (pr *promisoid) Get() interface{} { + pr.lock.Lock() + defer pr.lock.Unlock() + return pr.GetLocked() +} -// NewLockingPromise makes a new promise.LockingMutable -func NewLockingPromise(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingMutable { - return &lockingPromise{ +func (pr *promisoid) GetLocked() interface{} { + if !pr.isSet { + pr.waitingCount++ + pr.activeCounter.Add(-1) + pr.cond.Wait() + } + return pr.value +} + +func (pr *promisoid) IsSet() bool { + pr.lock.Lock() + defer pr.lock.Unlock() + return pr.IsSetLocked() +} + +func (pr *promisoid) IsSetLocked() bool { + return pr.isSet +} + +type writeOnce struct { + promisoid +} + +var _ promise.LockingWriteOnce = &writeOnce{} + +// NewWriteOnce makes a new promise.LockingWriteOnce +func NewWriteOnce(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingWriteOnce { + return &writeOnce{promisoid{ lock: lock, cond: *sync.NewCond(lock), activeCounter: activeCounter, + }} +} + +func (wr *writeOnce) Set(value interface{}) bool { + wr.lock.Lock() + defer wr.lock.Unlock() + return wr.SetLocked(value) +} + +func (wr *writeOnce) SetLocked(value interface{}) bool { + if wr.isSet { + return false + } + wr.isSet = true + wr.value = value + if wr.waitingCount > 0 { + wr.activeCounter.Add(wr.waitingCount) + wr.waitingCount = 0 + wr.cond.Broadcast() + } + return true +} + +type writeMultiple struct { + promisoid +} + +var _ promise.LockingWriteMultiple = &writeMultiple{} + +// NewWriteMultiple makes a new promise.LockingWriteMultiple +func NewWriteMultiple(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingWriteMultiple { + return &writeMultiple{promisoid{ + lock: lock, + cond: *sync.NewCond(lock), + activeCounter: activeCounter, + }} +} + +func (wr *writeMultiple) Set(value interface{}) { + wr.lock.Lock() + defer wr.lock.Unlock() + wr.SetLocked(value) +} + +func (wr *writeMultiple) SetLocked(value interface{}) { + wr.isSet = true + wr.value = value + if wr.waitingCount > 0 { + wr.activeCounter.Add(wr.waitingCount) + wr.waitingCount = 0 + wr.cond.Broadcast() } } - -func (lp *lockingPromise) Set(value interface{}) { - lp.lock.Lock() - defer lp.lock.Unlock() - lp.SetLocked(value) -} - -func (lp *lockingPromise) Get() interface{} { - lp.lock.Lock() - defer lp.lock.Unlock() - return lp.GetLocked() -} - -func (lp *lockingPromise) SetLocked(value interface{}) { - lp.isSet = true - lp.value = value - if lp.waitingCount > 0 { - lp.activeCounter.Add(lp.waitingCount) - lp.waitingCount = 0 - lp.cond.Broadcast() - } -} - -func (lp *lockingPromise) GetLocked() interface{} { - if !lp.isSet { - lp.waitingCount++ - lp.activeCounter.Add(-1) - lp.cond.Wait() - } - return lp.value -} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise_test.go index 766f8ffed9c..abaedda1230 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise_test.go @@ -25,16 +25,16 @@ import ( "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock" ) -func TestLockingPromise(t *testing.T) { +func TestLockingWriteMultiple(t *testing.T) { now := time.Now() clock, counter := clock.NewFakeEventClock(now, 0, nil) var lock sync.Mutex - lp := NewLockingPromise(&lock, counter) + wr := NewWriteMultiple(&lock, counter) var gots int32 var got atomic.Value counter.Add(1) go func() { - got.Store(lp.Get()) + got.Store(wr.Get()) atomic.AddInt32(&gots, 1) counter.Add(-1) }() @@ -43,8 +43,11 @@ func TestLockingPromise(t *testing.T) { if atomic.LoadInt32(&gots) != 0 { t.Error("Get returned before Set") } - var aval = &now - lp.Set(aval) + if wr.IsSet() { + t.Error("IsSet before Set") + } + aval := &now + wr.Set(aval) clock.Run(nil) time.Sleep(time.Second) if atomic.LoadInt32(&gots) != 1 { @@ -53,9 +56,12 @@ func TestLockingPromise(t *testing.T) { if got.Load() != aval { t.Error("Get did not return what was Set") } + if !wr.IsSet() { + t.Error("IsSet()==false after Set") + } counter.Add(1) go func() { - got.Store(lp.Get()) + got.Store(wr.Get()) atomic.AddInt32(&gots, 1) counter.Add(-1) }() @@ -67,4 +73,80 @@ func TestLockingPromise(t *testing.T) { if got.Load() != aval { t.Error("Second Get did not return what was Set") } + if !wr.IsSet() { + t.Error("IsSet()==false after second Set") + } + later := time.Now() + bval := &later + wr.Set(bval) + if !wr.IsSet() { + t.Error("IsSet() returned false after second set") + } else if wr.Get() != bval { + t.Error("Get() after second Set returned wrong value") + } +} + +func TestLockingWriteOnce(t *testing.T) { + now := time.Now() + clock, counter := clock.NewFakeEventClock(now, 0, nil) + var lock sync.Mutex + wr := NewWriteOnce(&lock, counter) + var gots int32 + var got atomic.Value + counter.Add(1) + go func() { + got.Store(wr.Get()) + atomic.AddInt32(&gots, 1) + counter.Add(-1) + }() + clock.Run(nil) + time.Sleep(time.Second) + if atomic.LoadInt32(&gots) != 0 { + t.Error("Get returned before Set") + } + if wr.IsSet() { + t.Error("IsSet before Set") + } + aval := &now + if !wr.Set(aval) { + t.Error("Set() returned false") + } + clock.Run(nil) + time.Sleep(time.Second) + if atomic.LoadInt32(&gots) != 1 { + t.Error("Get did not return after Set") + } + if got.Load() != aval { + t.Error("Get did not return what was Set") + } + if !wr.IsSet() { + t.Error("IsSet()==false after Set") + } + counter.Add(1) + go func() { + got.Store(wr.Get()) + atomic.AddInt32(&gots, 1) + counter.Add(-1) + }() + clock.Run(nil) + time.Sleep(time.Second) + if atomic.LoadInt32(&gots) != 2 { + t.Error("Second Get did not return immediately") + } + if got.Load() != aval { + t.Error("Second Get did not return what was Set") + } + if !wr.IsSet() { + t.Error("IsSet()==false after second Set") + } + later := time.Now() + bval := &later + if wr.Set(bval) { + t.Error("second Set() returned true") + } + if !wr.IsSet() { + t.Error("IsSet() returned false after second set") + } else if wr.Get() != aval { + t.Error("Get() after second Set returned wrong value") + } } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 9ec03d8ab55..bde50521577 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -255,6 +255,11 @@ const ( func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func()) { var req *request decision := func() requestDecision { + // Decide what to do and update metrics accordingly. Metrics + // about total requests queued and executing are updated on + // each way out of this function rather than in lower level + // functions because there can be multiple lower level changes + // in one invocation here. qs.lockAndSyncTime() defer qs.lock.Unlock() // A call to Wait while the system is quiescing will be rebuffed by @@ -273,6 +278,7 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i return decisionReject } req = qs.dispatchSansQueue(descr1, descr2) + metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting) return decisionExecute } @@ -284,6 +290,7 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i // we are at max queue length // 4) If not rejected, create a request and enqueue req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue, descr1, descr2) + defer metrics.UpdateFlowControlRequestsInQueue(qs.qCfg.Name, qs.totRequestsWaiting) // req == nil means that the request was rejected - no remaining // concurrency shares and at max queue length already if req == nil { @@ -302,6 +309,7 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i // fair queuing technique to pick a queue and dispatch a // request from that queue. qs.dispatchAsMuchAsPossibleLocked() + defer metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting) // ======================================================================== // Step 3: @@ -319,7 +327,7 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i select { case <-doneCh: klog.V(6).Infof("QS(%s): Context of request %#+v %#+v is Done", qs.qCfg.Name, descr1, descr2) - req.decision.Set(decisionCancel) + qs.cancelWait(req) } qs.goroutineDoneOrBlocked() }() @@ -330,40 +338,24 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i // The final step in Wait is to wait on a decision from // somewhere and then act on it. decisionAny := req.decision.GetLocked() - var decision requestDecision - switch dec := decisionAny.(type) { - case requestDecision: - decision = dec - default: + qs.syncTimeLocked() + decision, isDecision := decisionAny.(requestDecision) + if !isDecision { klog.Errorf("QS(%s): Impossible decision %#+v (of type %T) for request %#+v %#+v", qs.qCfg.Name, decisionAny, decisionAny, descr1, descr2) - decision = decisionExecute + decision = decisionExecute // yeah, this is a no-op } switch decision { case decisionReject: klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, descr1, descr2) metrics.AddReject(qs.qCfg.Name, "time-out") case decisionCancel: - qs.syncTimeLocked() - // TODO(aaron-prindle) add metrics to these two cases - if req.isWaiting { - klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, descr1, descr2) - // remove the request from the queue as it has timed out - for i := range req.queue.requests { - if req == req.queue.requests[i] { - // remove the request - req.queue.requests = append(req.queue.requests[:i], - req.queue.requests[i+1:]...) - break - } - } - // At this point, if the qs is quiescing, - // has zero requests executing, and has zero requests enqueued - // then a call to the EmptyHandler should be forked. - qs.maybeForkEmptyHandlerLocked() - } else { - klog.V(5).Infof("QS(%s): request %#+v %#+v canceled shortly after dispatch", qs.qCfg.Name, descr1, descr2) - } + // TODO(aaron-prindle) add metrics for this case + klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, descr1, descr2) } + // At this point, if the qs is quiescing, + // has zero requests executing, and has zero requests enqueued + // then a call to the EmptyHandler should be forked. + qs.maybeForkEmptyHandlerLocked() return decision }() switch decision { @@ -439,7 +431,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue uint64, // Create a request and enqueue req := &request{ - decision: lockingpromise.NewLockingPromise(&qs.lock, qs.counter), + decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter), arrivalTime: qs.clock.Now(), queue: queue, descr1: descr1, @@ -531,7 +523,6 @@ func (qs *queueSet) enqueueLocked(request *request) { } queue.Enqueue(request) qs.totRequestsWaiting++ - metrics.UpdateFlowControlRequestsInQueue(qs.qCfg.Name, qs.totRequestsWaiting) } // dispatchAsMuchAsPossibleLocked runs a loop, as long as there @@ -561,7 +552,6 @@ func (qs *queueSet) dispatchSansQueue(descr1, descr2 interface{}) *request { if klog.V(5) { klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, descr1, descr2, qs.totRequestsExecuting) } - metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting) return req } @@ -579,7 +569,11 @@ func (qs *queueSet) dispatchLocked() bool { return false } request.startTime = qs.clock.Now() - // request dequeued, service has started + // At this moment the request leaves its queue and starts + // executing. We do not recognize any interim state between + // "queued" and "executing". While that means "executing" + // includes a little overhead from this package, this is not a + // problem because other overhead is also included. qs.totRequestsWaiting-- qs.totRequestsExecuting++ queue.requestsExecuting++ @@ -588,11 +582,33 @@ func (qs *queueSet) dispatchLocked() bool { } // When a request is dequeued for service -> qs.virtualStart += G queue.virtualStart += qs.estimatedServiceTime - metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting) request.decision.SetLocked(decisionExecute) return ok } +// cancelWait ensures the request is not waiting +func (qs *queueSet) cancelWait(req *request) { + qs.lock.Lock() + defer qs.lock.Unlock() + if req.decision.IsSetLocked() { + // The request has already been removed from the queue + // and so we consider its wait to be over. + return + } + req.decision.SetLocked(decisionCancel) + queue := req.queue + // remove the request from the queue as it has timed out + for i := range queue.requests { + if req == queue.requests[i] { + // remove the request + queue.requests = append(queue.requests[:i], queue.requests[i+1:]...) + qs.totRequestsWaiting-- + break + } + } + return +} + // selectQueueLocked examines the queues in round robin order and // returns the first one of those for which the virtual finish time of // the oldest waiting request is minimal. @@ -630,6 +646,8 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) { qs.finishRequestLocked(req) qs.dispatchAsMuchAsPossibleLocked() + metrics.UpdateFlowControlRequestsInQueue(qs.qCfg.Name, qs.totRequestsWaiting) + metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting) } // finishRequestLocked is a callback that should be used when a @@ -637,7 +655,6 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) { // callback updates important state in the queueSet func (qs *queueSet) finishRequestLocked(r *request) { qs.totRequestsExecuting-- - metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting) if r.queue == nil { if klog.V(6) { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index d198da00bf0..b265a0c9e9b 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -243,3 +243,48 @@ func TestTimeout(t *testing.T) { {1001001001, 5, 100, time.Second, time.Second}, }, time.Second*10, true, false, clk, counter) } + +func TestContextCancel(t *testing.T) { + now := time.Now() + clk, counter := clock.NewFakeEventClock(now, 0, nil) + qsf := NewQueueSetFactory(clk, counter) + qCfg := fq.QueuingConfig{ + Name: "TestTimeout", + DesiredNumQueues: 11, + QueueLengthLimit: 11, + HandSize: 1, + RequestWaitLimit: 15 * time.Second, + } + qsc, err := qsf.QualifyQueuingConfig(qCfg) + if err != nil { + t.Fatal(err) + } + qs := qsc.GetQueueSet(fq.DispatchingConfig{ConcurrencyLimit: 1}) + counter.Add(1) // account for the goroutine running this test + ctx1 := context.Background() + another1, exec1, cleanup1 := qs.Wait(ctx1, 1, "test", "one") + if another1 || !exec1 { + t.Errorf("Unexpected: another1=%v, exec1=%v", another1, exec1) + return + } + defer cleanup1() + ctx2, cancel2 := context.WithCancel(context.Background()) + tBefore := time.Now() + go func() { + time.Sleep(time.Second) + cancel2() + }() + another2, exec2, cleanup2 := qs.Wait(ctx2, 2, "test", "two") + tAfter := time.Now() + if another2 || exec2 { + t.Errorf("Unexpected: another2=%v, exec2=%v", another2, exec2) + if exec2 { + defer cleanup2() + } + } else { + dt := tAfter.Sub(tBefore) + if dt < time.Second || dt > 2*time.Second { + t.Errorf("Unexpected: dt=%d", dt) + } + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index 5d0ce2b39e3..d45f36b9b7c 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -30,15 +30,16 @@ type request struct { // startTime is the real time when the request began executing startTime time.Time - // decision gets set to the decision about what to do with this request - decision promise.LockingMutable + // decision gets set to a `requestDecision` indicating what to do + // with this request. It gets set exactly once, when the request + // is removed from its queue. The value will be decisionReject, + // decisionCancel, or decisionExecute; decisionTryAnother never + // appears here. + decision promise.LockingWriteOnce // arrivalTime is the real time when the request entered this system arrivalTime time.Time - // isWaiting indicates whether the request is presently waiting in a queue - isWaiting bool - // descr1 and descr2 are not used in any logic but they appear in // log messages descr1, descr2 interface{} @@ -60,7 +61,6 @@ type queue struct { // Enqueue enqueues a request into the queue func (q *queue) Enqueue(request *request) { - request.isWaiting = true q.requests = append(q.requests, request) } @@ -71,8 +71,6 @@ func (q *queue) Dequeue() (*request, bool) { } request := q.requests[0] q.requests = q.requests[1:] - - request.isWaiting = false return request, true }