diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go index 917d5e1744e..58af4594935 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go @@ -16,14 +16,10 @@ limitations under the License. package promise -// This file defines interfaces for promises and futures and related -// things. These are about coordination among multiple goroutines and -// so are safe for concurrent calls --- although moderated in some -// cases by a requirement that the caller hold a certain lock. - // WriteOnce represents a variable that is initially not set and can // be set once and is readable. This is the common meaning for // "promise". +// The implementations of this interface are NOT thread-safe. type WriteOnce interface { // Get reads the current value of this variable. If this variable // is not set yet then this call blocks until this variable gets a @@ -41,27 +37,3 @@ type WriteOnce interface { // variable's value. Set(interface{}) bool } - -// LockingWriteOnce is a WriteOnce whose implementation is protected -// by a lock. -type LockingWriteOnce interface { - WriteOnce - - // GetLocked is like Get but the caller must already hold the - // lock. GetLocked may release, and later re-acquire, the lock - // any number of times. Get may acquire, and later release, the - // lock any number of times. - GetLocked() interface{} - - // IsSetLocked is like IsSet but the caller must already hold the - // lock. IsSetLocked may release, and later re-acquire, the lock - // any number of times. IsSet may acquire, and later release, the - // lock any number of times. - IsSetLocked() bool - - // SetLocked is like Set but the caller must already hold the - // lock. SetLocked may release, and later re-acquire, the lock - // any number of times. Set may acquire, and later release, the - // lock any number of times - SetLocked(interface{}) bool -} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/promise.go similarity index 65% rename from staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise.go rename to staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/promise.go index cb7e41dbd38..70059e827f8 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/promise.go @@ -14,23 +14,21 @@ See the License for the specific language governing permissions and limitations under the License. */ -package lockingpromise +package promise import ( "sync" "k8s.io/apiserver/pkg/util/flowcontrol/counter" - "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise" ) -// lockingPromise implementss the promise.LockingWriteOnce interface. +// promise implements the promise.WriteOnce interface. // This implementation is based on a condition variable. // This implementation tracks active goroutines: // the given counter is decremented for a goroutine waiting for this // varible to be set and incremented when such a goroutine is // unblocked. -type lockingPromise struct { - lock sync.Locker +type promise struct { cond sync.Cond activeCounter counter.GoRoutineCounter // counter of active goroutines waitingCount int // number of goroutines idle due to this being unset @@ -38,24 +36,17 @@ type lockingPromise struct { value interface{} } -var _ promise.LockingWriteOnce = &lockingPromise{} +var _ WriteOnce = &promise{} // NewWriteOnce makes a new promise.LockingWriteOnce -func NewWriteOnce(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingWriteOnce { - return &lockingPromise{ - lock: lock, +func NewWriteOnce(lock sync.Locker, activeCounter counter.GoRoutineCounter) WriteOnce { + return &promise{ cond: *sync.NewCond(lock), activeCounter: activeCounter, } } -func (p *lockingPromise) Get() interface{} { - p.lock.Lock() - defer p.lock.Unlock() - return p.GetLocked() -} - -func (p *lockingPromise) GetLocked() interface{} { +func (p *promise) Get() interface{} { if !p.isSet { p.waitingCount++ p.activeCounter.Add(-1) @@ -64,23 +55,11 @@ func (p *lockingPromise) GetLocked() interface{} { return p.value } -func (p *lockingPromise) IsSet() bool { - p.lock.Lock() - defer p.lock.Unlock() - return p.IsSetLocked() -} - -func (p *lockingPromise) IsSetLocked() bool { +func (p *promise) IsSet() bool { return p.isSet } -func (p *lockingPromise) Set(value interface{}) bool { - p.lock.Lock() - defer p.lock.Unlock() - return p.SetLocked(value) -} - -func (p *lockingPromise) SetLocked(value interface{}) bool { +func (p *promise) Set(value interface{}) bool { if p.isSet { return false } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/promise_test.go similarity index 65% rename from staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise_test.go rename to staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/promise_test.go index e91c497d585..ca825560f1d 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/promise_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package lockingpromise +package promise import ( "sync" @@ -34,6 +34,9 @@ func TestLockingWriteOnce(t *testing.T) { var got atomic.Value counter.Add(1) go func() { + lock.Lock() + defer lock.Unlock() + got.Store(wr.Get()) atomic.AddInt32(&gots, 1) counter.Add(-1) @@ -43,13 +46,21 @@ func TestLockingWriteOnce(t *testing.T) { if atomic.LoadInt32(&gots) != 0 { t.Error("Get returned before Set") } - if wr.IsSet() { - t.Error("IsSet before Set") - } + func() { + lock.Lock() + defer lock.Unlock() + if wr.IsSet() { + t.Error("IsSet before Set") + } + }() aval := &now - if !wr.Set(aval) { - t.Error("Set() returned false") - } + func() { + lock.Lock() + defer lock.Unlock() + if !wr.Set(aval) { + t.Error("Set() returned false") + } + }() clock.Run(nil) time.Sleep(time.Second) if atomic.LoadInt32(&gots) != 1 { @@ -58,11 +69,18 @@ func TestLockingWriteOnce(t *testing.T) { if got.Load() != aval { t.Error("Get did not return what was Set") } - if !wr.IsSet() { - t.Error("IsSet()==false after Set") - } + func() { + lock.Lock() + defer lock.Unlock() + if !wr.IsSet() { + t.Error("IsSet()==false after Set") + } + }() counter.Add(1) go func() { + lock.Lock() + defer lock.Unlock() + got.Store(wr.Get()) atomic.AddInt32(&gots, 1) counter.Add(-1) @@ -75,17 +93,29 @@ func TestLockingWriteOnce(t *testing.T) { if got.Load() != aval { t.Error("Second Get did not return what was Set") } - if !wr.IsSet() { - t.Error("IsSet()==false after second Set") - } + func() { + lock.Lock() + defer lock.Unlock() + if !wr.IsSet() { + t.Error("IsSet()==false after second Get") + } + }() later := time.Now() bval := &later - if wr.Set(bval) { - t.Error("second Set() returned true") - } - if !wr.IsSet() { - t.Error("IsSet() returned false after second set") - } else if wr.Get() != aval { - t.Error("Get() after second Set returned wrong value") - } + func() { + lock.Lock() + defer lock.Unlock() + if wr.Set(bval) { + t.Error("second Set() returned true") + } + }() + func() { + lock.Lock() + defer lock.Unlock() + if !wr.IsSet() { + t.Error("IsSet() returned false after second set") + } else if wr.Get() != aval { + t.Error("Get() after second Set returned wrong value") + } + }() } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index e4938e9f2e0..25431c10226 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -28,7 +28,7 @@ import ( "k8s.io/apiserver/pkg/util/flowcontrol/counter" "k8s.io/apiserver/pkg/util/flowcontrol/debug" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" - "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise" + "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise" "k8s.io/apiserver/pkg/util/flowcontrol/metrics" fqrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" "k8s.io/apiserver/pkg/util/shufflesharding" @@ -356,7 +356,7 @@ func (req *request) wait() (bool, bool) { // Step 4: // The final step is to wait on a decision from // somewhere and then act on it. - decisionAny := req.decision.GetLocked() + decisionAny := req.decision.Get() qs.syncTimeLocked() decision, isDecision := decisionAny.(requestDecision) if !isDecision { @@ -453,7 +453,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte fsName: fsName, flowDistinguisher: flowDistinguisher, ctx: ctx, - decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter), + decision: promise.NewWriteOnce(&qs.lock, qs.counter), arrivalTime: qs.clock.Now(), queue: queue, descr1: descr1, @@ -503,7 +503,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s waitLimit := now.Add(-qs.qCfg.RequestWaitLimit) reqs.Walk(func(req *request) bool { if waitLimit.After(req.arrivalTime) { - req.decision.SetLocked(decisionReject) + req.decision.Set(decisionReject) timeoutCount++ metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) req.NoteQueued(false) @@ -588,13 +588,13 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, width *fqreques flowDistinguisher: flowDistinguisher, ctx: ctx, startTime: now, - decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter), + decision: promise.NewWriteOnce(&qs.lock, qs.counter), arrivalTime: now, descr1: descr1, descr2: descr2, width: *width, } - req.decision.SetLocked(decisionExecute) + req.decision.Set(decisionExecute) qs.totRequestsExecuting++ qs.totSeatsInUse += req.Seats() metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1) @@ -643,7 +643,7 @@ func (qs *queueSet) dispatchLocked() bool { } // When a request is dequeued for service -> qs.virtualStart += G queue.virtualStart += qs.estimatedServiceTime * float64(request.Seats()) - request.decision.SetLocked(decisionExecute) + request.decision.Set(decisionExecute) return ok } @@ -652,12 +652,12 @@ func (qs *queueSet) dispatchLocked() bool { func (qs *queueSet) cancelWait(req *request) { qs.lock.Lock() defer qs.lock.Unlock() - if req.decision.IsSetLocked() { + if req.decision.IsSet() { // The request has already been removed from the queue // and so we consider its wait to be over. return } - req.decision.SetLocked(decisionCancel) + req.decision.Set(decisionCancel) // remove the request from the queue as it has timed out req.removeFromQueueFn() diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index 7f121d707aa..e0f35f03cf6 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -52,7 +52,10 @@ type request struct { // is removed from its queue. The value will be decisionReject, // decisionCancel, or decisionExecute; decisionTryAnother never // appears here. - decision promise.LockingWriteOnce + // + // The field is NOT thread-safe and should be protected by the + // queueset's lock. + decision promise.WriteOnce // arrivalTime is the real time when the request entered this system arrivalTime time.Time diff --git a/vendor/modules.txt b/vendor/modules.txt index b33d6406e72..55875cc2b27 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1549,7 +1549,6 @@ k8s.io/apiserver/pkg/util/flowcontrol/counter k8s.io/apiserver/pkg/util/flowcontrol/debug k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise -k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing k8s.io/apiserver/pkg/util/flowcontrol/format