From a1cf44eab44f21c3929ff2d79501c56e6fbcddc3 Mon Sep 17 00:00:00 2001 From: wojtekt Date: Wed, 21 Jul 2021 12:14:30 +0200 Subject: [PATCH 1/2] Remove unused promise code from APF --- .../pkg/util/flowcontrol/apf_controller.go | 3 +- .../fairqueuing/promise/interface.go | 92 +++----------- .../promise/lockingpromise/lockingpromise.go | 119 +++++++----------- .../lockingpromise/lockingpromise_test.go | 61 --------- 4 files changed, 61 insertions(+), 214 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index bd9fd9ad921..d5e600973ac 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -125,8 +125,7 @@ type configController struct { requestWaitLimit time.Duration // This must be locked while accessing flowSchemas or - // priorityLevelStates. It is the lock involved in - // LockingWriteMultiple. + // priorityLevelStates. lock sync.Mutex // flowSchemas holds the flow schema objects, sorted by increasing diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go index 1977f7522ac..917d5e1744e 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go @@ -21,11 +21,10 @@ package promise // so are safe for concurrent calls --- although moderated in some // cases by a requirement that the caller hold a certain lock. -// Readable represents a variable that is initially not set and later -// becomes set. Some instances may be set to multiple values in -// series. A Readable for a variable that can only get one value is -// commonly known as a "future". -type Readable interface { +// WriteOnce represents a variable that is initially not set and can +// be set once and is readable. This is the common meaning for +// "promise". +type WriteOnce interface { // Get reads the current value of this variable. If this variable // is not set yet then this call blocks until this variable gets a // value. @@ -34,12 +33,19 @@ type Readable interface { // IsSet returns immediately with an indication of whether this // variable has been set. IsSet() bool + + // Set normally writes a value into this variable, unblocks every + // goroutine waiting for this variable to have a value, and + // returns true. In the unhappy case that this variable is + // already set, this method returns false without modifying the + // variable's value. + Set(interface{}) bool } -// LockingReadable is a Readable whose implementation is protected by -// a lock -type LockingReadable interface { - Readable +// LockingWriteOnce is a WriteOnce whose implementation is protected +// by a lock. +type LockingWriteOnce interface { + WriteOnce // GetLocked is like Get but the caller must already hold the // lock. GetLocked may release, and later re-acquire, the lock @@ -52,31 +58,6 @@ type LockingReadable interface { // any number of times. IsSet may acquire, and later release, the // lock any number of times. IsSetLocked() bool -} - -// WriteOnceOnly represents a variable that is initially not set and -// can be set once. -type WriteOnceOnly interface { - // Set normally writes a value into this variable, unblocks every - // goroutine waiting for this variable to have a value, and - // returns true. In the unhappy case that this variable is - // already set, this method returns false without modifying the - // variable's value. - Set(interface{}) bool -} - -// WriteOnce represents a variable that is initially not set and can -// be set once and is readable. This is the common meaning for -// "promise". -type WriteOnce interface { - Readable - WriteOnceOnly -} - -// LockingWriteOnceOnly is a WriteOnceOnly whose implementation is -// protected by a lock. -type LockingWriteOnceOnly interface { - WriteOnceOnly // SetLocked is like Set but the caller must already hold the // lock. SetLocked may release, and later re-acquire, the lock @@ -84,46 +65,3 @@ type LockingWriteOnceOnly interface { // lock any number of times SetLocked(interface{}) bool } - -// LockingWriteOnce is a WriteOnce whose implementation is protected -// by a lock. -type LockingWriteOnce interface { - LockingReadable - LockingWriteOnceOnly -} - -// WriteMultipleOnly represents a variable that is initially not set -// and can be set one or more times (unlike a traditional "promise", -// which can be written only once). -type WriteMultipleOnly interface { - // Set writes a value into this variable and unblocks every - // goroutine waiting for this variable to have a value - Set(interface{}) -} - -// WriteMultiple represents a variable that is initially not set and -// can be set one or more times (unlike a traditional "promise", which -// can be written only once) and is readable. -type WriteMultiple interface { - Readable - WriteMultipleOnly -} - -// LockingWriteMultipleOnly is a WriteMultipleOnly whose -// implementation is protected by a lock. -type LockingWriteMultipleOnly interface { - WriteMultipleOnly - - // SetLocked is like Set but the caller must already hold the - // lock. SetLocked may release, and later re-acquire, the lock - // any number of times. Set may acquire, and later release, the - // lock any number of times - SetLocked(interface{}) -} - -// LockingWriteMultiple is a WriteMultiple whose implementation is -// protected by a lock. -type LockingWriteMultiple interface { - LockingReadable - LockingWriteMultipleOnly -} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise.go index db5598f8989..cb7e41dbd38 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise.go @@ -23,13 +23,13 @@ import ( "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise" ) -// promisoid is the data and behavior common to all the promise-like -// abstractions implemented here. This implementation is based on a -// condition variable. This implementation tracks active goroutines: +// lockingPromise implementss the promise.LockingWriteOnce interface. +// This implementation is based on a condition variable. +// This implementation tracks active goroutines: // the given counter is decremented for a goroutine waiting for this // varible to be set and incremented when such a goroutine is // unblocked. -type promisoid struct { +type lockingPromise struct { lock sync.Locker cond sync.Cond activeCounter counter.GoRoutineCounter // counter of active goroutines @@ -38,87 +38,58 @@ type promisoid struct { value interface{} } -func (pr *promisoid) Get() interface{} { - pr.lock.Lock() - defer pr.lock.Unlock() - return pr.GetLocked() -} - -func (pr *promisoid) GetLocked() interface{} { - if !pr.isSet { - pr.waitingCount++ - pr.activeCounter.Add(-1) - pr.cond.Wait() - } - return pr.value -} - -func (pr *promisoid) IsSet() bool { - pr.lock.Lock() - defer pr.lock.Unlock() - return pr.IsSetLocked() -} - -func (pr *promisoid) IsSetLocked() bool { - return pr.isSet -} - -func (pr *promisoid) SetLocked(value interface{}) { - pr.isSet = true - pr.value = value - if pr.waitingCount > 0 { - pr.activeCounter.Add(pr.waitingCount) - pr.waitingCount = 0 - pr.cond.Broadcast() - } -} - -type writeOnce struct { - promisoid -} - -var _ promise.LockingWriteOnce = &writeOnce{} +var _ promise.LockingWriteOnce = &lockingPromise{} // NewWriteOnce makes a new promise.LockingWriteOnce func NewWriteOnce(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingWriteOnce { - return &writeOnce{promisoid{ + return &lockingPromise{ lock: lock, cond: *sync.NewCond(lock), activeCounter: activeCounter, - }} + } } -func (wr *writeOnce) Set(value interface{}) bool { - wr.lock.Lock() - defer wr.lock.Unlock() - return wr.SetLocked(value) +func (p *lockingPromise) Get() interface{} { + p.lock.Lock() + defer p.lock.Unlock() + return p.GetLocked() } -func (wr *writeOnce) SetLocked(value interface{}) bool { - if wr.isSet { +func (p *lockingPromise) GetLocked() interface{} { + if !p.isSet { + p.waitingCount++ + p.activeCounter.Add(-1) + p.cond.Wait() + } + return p.value +} + +func (p *lockingPromise) IsSet() bool { + p.lock.Lock() + defer p.lock.Unlock() + return p.IsSetLocked() +} + +func (p *lockingPromise) IsSetLocked() bool { + return p.isSet +} + +func (p *lockingPromise) Set(value interface{}) bool { + p.lock.Lock() + defer p.lock.Unlock() + return p.SetLocked(value) +} + +func (p *lockingPromise) SetLocked(value interface{}) bool { + if p.isSet { return false } - wr.promisoid.SetLocked(value) + p.isSet = true + p.value = value + if p.waitingCount > 0 { + p.activeCounter.Add(p.waitingCount) + p.waitingCount = 0 + p.cond.Broadcast() + } return true } - -type writeMultiple struct { - promisoid -} - -var _ promise.LockingWriteMultiple = &writeMultiple{} - -// NewWriteMultiple makes a new promise.LockingWriteMultiple -func NewWriteMultiple(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingWriteMultiple { - return &writeMultiple{promisoid{ - lock: lock, - cond: *sync.NewCond(lock), - activeCounter: activeCounter, - }} -} - -func (wr *writeMultiple) Set(value interface{}) { - wr.lock.Lock() - defer wr.lock.Unlock() - wr.SetLocked(value) -} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise_test.go index abaedda1230..e91c497d585 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise_test.go @@ -25,67 +25,6 @@ import ( "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock" ) -func TestLockingWriteMultiple(t *testing.T) { - now := time.Now() - clock, counter := clock.NewFakeEventClock(now, 0, nil) - var lock sync.Mutex - wr := NewWriteMultiple(&lock, counter) - var gots int32 - var got atomic.Value - counter.Add(1) - go func() { - got.Store(wr.Get()) - atomic.AddInt32(&gots, 1) - counter.Add(-1) - }() - clock.Run(nil) - time.Sleep(time.Second) - if atomic.LoadInt32(&gots) != 0 { - t.Error("Get returned before Set") - } - if wr.IsSet() { - t.Error("IsSet before Set") - } - aval := &now - wr.Set(aval) - clock.Run(nil) - time.Sleep(time.Second) - if atomic.LoadInt32(&gots) != 1 { - t.Error("Get did not return after Set") - } - if got.Load() != aval { - t.Error("Get did not return what was Set") - } - if !wr.IsSet() { - t.Error("IsSet()==false after Set") - } - counter.Add(1) - go func() { - got.Store(wr.Get()) - atomic.AddInt32(&gots, 1) - counter.Add(-1) - }() - clock.Run(nil) - time.Sleep(time.Second) - if atomic.LoadInt32(&gots) != 2 { - t.Error("Second Get did not return immediately") - } - if got.Load() != aval { - t.Error("Second Get did not return what was Set") - } - if !wr.IsSet() { - t.Error("IsSet()==false after second Set") - } - later := time.Now() - bval := &later - wr.Set(bval) - if !wr.IsSet() { - t.Error("IsSet() returned false after second set") - } else if wr.Get() != bval { - t.Error("Get() after second Set returned wrong value") - } -} - func TestLockingWriteOnce(t *testing.T) { now := time.Now() clock, counter := clock.NewFakeEventClock(now, 0, nil) From 9f735e71bbb7d0dde67a718891641d8afd20a8bc Mon Sep 17 00:00:00 2001 From: wojtekt Date: Fri, 23 Jul 2021 13:30:34 +0200 Subject: [PATCH 2/2] Simplify APF promise to what is really used in the code --- .../fairqueuing/promise/interface.go | 30 +------- .../lockingpromise.go => promise.go} | 39 +++------- ...lockingpromise_test.go => promise_test.go} | 72 +++++++++++++------ .../fairqueuing/queueset/queueset.go | 18 ++--- .../flowcontrol/fairqueuing/queueset/types.go | 5 +- vendor/modules.txt | 1 - 6 files changed, 74 insertions(+), 91 deletions(-) rename staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/{lockingpromise/lockingpromise.go => promise.go} (65%) rename staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/{lockingpromise/lockingpromise_test.go => promise_test.go} (65%) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go index 917d5e1744e..58af4594935 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go @@ -16,14 +16,10 @@ limitations under the License. package promise -// This file defines interfaces for promises and futures and related -// things. These are about coordination among multiple goroutines and -// so are safe for concurrent calls --- although moderated in some -// cases by a requirement that the caller hold a certain lock. - // WriteOnce represents a variable that is initially not set and can // be set once and is readable. This is the common meaning for // "promise". +// The implementations of this interface are NOT thread-safe. type WriteOnce interface { // Get reads the current value of this variable. If this variable // is not set yet then this call blocks until this variable gets a @@ -41,27 +37,3 @@ type WriteOnce interface { // variable's value. Set(interface{}) bool } - -// LockingWriteOnce is a WriteOnce whose implementation is protected -// by a lock. -type LockingWriteOnce interface { - WriteOnce - - // GetLocked is like Get but the caller must already hold the - // lock. GetLocked may release, and later re-acquire, the lock - // any number of times. Get may acquire, and later release, the - // lock any number of times. - GetLocked() interface{} - - // IsSetLocked is like IsSet but the caller must already hold the - // lock. IsSetLocked may release, and later re-acquire, the lock - // any number of times. IsSet may acquire, and later release, the - // lock any number of times. - IsSetLocked() bool - - // SetLocked is like Set but the caller must already hold the - // lock. SetLocked may release, and later re-acquire, the lock - // any number of times. Set may acquire, and later release, the - // lock any number of times - SetLocked(interface{}) bool -} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/promise.go similarity index 65% rename from staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise.go rename to staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/promise.go index cb7e41dbd38..70059e827f8 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/promise.go @@ -14,23 +14,21 @@ See the License for the specific language governing permissions and limitations under the License. */ -package lockingpromise +package promise import ( "sync" "k8s.io/apiserver/pkg/util/flowcontrol/counter" - "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise" ) -// lockingPromise implementss the promise.LockingWriteOnce interface. +// promise implements the promise.WriteOnce interface. // This implementation is based on a condition variable. // This implementation tracks active goroutines: // the given counter is decremented for a goroutine waiting for this // varible to be set and incremented when such a goroutine is // unblocked. -type lockingPromise struct { - lock sync.Locker +type promise struct { cond sync.Cond activeCounter counter.GoRoutineCounter // counter of active goroutines waitingCount int // number of goroutines idle due to this being unset @@ -38,24 +36,17 @@ type lockingPromise struct { value interface{} } -var _ promise.LockingWriteOnce = &lockingPromise{} +var _ WriteOnce = &promise{} // NewWriteOnce makes a new promise.LockingWriteOnce -func NewWriteOnce(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingWriteOnce { - return &lockingPromise{ - lock: lock, +func NewWriteOnce(lock sync.Locker, activeCounter counter.GoRoutineCounter) WriteOnce { + return &promise{ cond: *sync.NewCond(lock), activeCounter: activeCounter, } } -func (p *lockingPromise) Get() interface{} { - p.lock.Lock() - defer p.lock.Unlock() - return p.GetLocked() -} - -func (p *lockingPromise) GetLocked() interface{} { +func (p *promise) Get() interface{} { if !p.isSet { p.waitingCount++ p.activeCounter.Add(-1) @@ -64,23 +55,11 @@ func (p *lockingPromise) GetLocked() interface{} { return p.value } -func (p *lockingPromise) IsSet() bool { - p.lock.Lock() - defer p.lock.Unlock() - return p.IsSetLocked() -} - -func (p *lockingPromise) IsSetLocked() bool { +func (p *promise) IsSet() bool { return p.isSet } -func (p *lockingPromise) Set(value interface{}) bool { - p.lock.Lock() - defer p.lock.Unlock() - return p.SetLocked(value) -} - -func (p *lockingPromise) SetLocked(value interface{}) bool { +func (p *promise) Set(value interface{}) bool { if p.isSet { return false } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/promise_test.go similarity index 65% rename from staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise_test.go rename to staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/promise_test.go index e91c497d585..ca825560f1d 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/promise_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package lockingpromise +package promise import ( "sync" @@ -34,6 +34,9 @@ func TestLockingWriteOnce(t *testing.T) { var got atomic.Value counter.Add(1) go func() { + lock.Lock() + defer lock.Unlock() + got.Store(wr.Get()) atomic.AddInt32(&gots, 1) counter.Add(-1) @@ -43,13 +46,21 @@ func TestLockingWriteOnce(t *testing.T) { if atomic.LoadInt32(&gots) != 0 { t.Error("Get returned before Set") } - if wr.IsSet() { - t.Error("IsSet before Set") - } + func() { + lock.Lock() + defer lock.Unlock() + if wr.IsSet() { + t.Error("IsSet before Set") + } + }() aval := &now - if !wr.Set(aval) { - t.Error("Set() returned false") - } + func() { + lock.Lock() + defer lock.Unlock() + if !wr.Set(aval) { + t.Error("Set() returned false") + } + }() clock.Run(nil) time.Sleep(time.Second) if atomic.LoadInt32(&gots) != 1 { @@ -58,11 +69,18 @@ func TestLockingWriteOnce(t *testing.T) { if got.Load() != aval { t.Error("Get did not return what was Set") } - if !wr.IsSet() { - t.Error("IsSet()==false after Set") - } + func() { + lock.Lock() + defer lock.Unlock() + if !wr.IsSet() { + t.Error("IsSet()==false after Set") + } + }() counter.Add(1) go func() { + lock.Lock() + defer lock.Unlock() + got.Store(wr.Get()) atomic.AddInt32(&gots, 1) counter.Add(-1) @@ -75,17 +93,29 @@ func TestLockingWriteOnce(t *testing.T) { if got.Load() != aval { t.Error("Second Get did not return what was Set") } - if !wr.IsSet() { - t.Error("IsSet()==false after second Set") - } + func() { + lock.Lock() + defer lock.Unlock() + if !wr.IsSet() { + t.Error("IsSet()==false after second Get") + } + }() later := time.Now() bval := &later - if wr.Set(bval) { - t.Error("second Set() returned true") - } - if !wr.IsSet() { - t.Error("IsSet() returned false after second set") - } else if wr.Get() != aval { - t.Error("Get() after second Set returned wrong value") - } + func() { + lock.Lock() + defer lock.Unlock() + if wr.Set(bval) { + t.Error("second Set() returned true") + } + }() + func() { + lock.Lock() + defer lock.Unlock() + if !wr.IsSet() { + t.Error("IsSet() returned false after second set") + } else if wr.Get() != aval { + t.Error("Get() after second Set returned wrong value") + } + }() } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index e4938e9f2e0..25431c10226 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -28,7 +28,7 @@ import ( "k8s.io/apiserver/pkg/util/flowcontrol/counter" "k8s.io/apiserver/pkg/util/flowcontrol/debug" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" - "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise" + "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise" "k8s.io/apiserver/pkg/util/flowcontrol/metrics" fqrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" "k8s.io/apiserver/pkg/util/shufflesharding" @@ -356,7 +356,7 @@ func (req *request) wait() (bool, bool) { // Step 4: // The final step is to wait on a decision from // somewhere and then act on it. - decisionAny := req.decision.GetLocked() + decisionAny := req.decision.Get() qs.syncTimeLocked() decision, isDecision := decisionAny.(requestDecision) if !isDecision { @@ -453,7 +453,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte fsName: fsName, flowDistinguisher: flowDistinguisher, ctx: ctx, - decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter), + decision: promise.NewWriteOnce(&qs.lock, qs.counter), arrivalTime: qs.clock.Now(), queue: queue, descr1: descr1, @@ -503,7 +503,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s waitLimit := now.Add(-qs.qCfg.RequestWaitLimit) reqs.Walk(func(req *request) bool { if waitLimit.After(req.arrivalTime) { - req.decision.SetLocked(decisionReject) + req.decision.Set(decisionReject) timeoutCount++ metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) req.NoteQueued(false) @@ -588,13 +588,13 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, width *fqreques flowDistinguisher: flowDistinguisher, ctx: ctx, startTime: now, - decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter), + decision: promise.NewWriteOnce(&qs.lock, qs.counter), arrivalTime: now, descr1: descr1, descr2: descr2, width: *width, } - req.decision.SetLocked(decisionExecute) + req.decision.Set(decisionExecute) qs.totRequestsExecuting++ qs.totSeatsInUse += req.Seats() metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1) @@ -643,7 +643,7 @@ func (qs *queueSet) dispatchLocked() bool { } // When a request is dequeued for service -> qs.virtualStart += G queue.virtualStart += qs.estimatedServiceTime * float64(request.Seats()) - request.decision.SetLocked(decisionExecute) + request.decision.Set(decisionExecute) return ok } @@ -652,12 +652,12 @@ func (qs *queueSet) dispatchLocked() bool { func (qs *queueSet) cancelWait(req *request) { qs.lock.Lock() defer qs.lock.Unlock() - if req.decision.IsSetLocked() { + if req.decision.IsSet() { // The request has already been removed from the queue // and so we consider its wait to be over. return } - req.decision.SetLocked(decisionCancel) + req.decision.Set(decisionCancel) // remove the request from the queue as it has timed out req.removeFromQueueFn() diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index 7f121d707aa..e0f35f03cf6 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -52,7 +52,10 @@ type request struct { // is removed from its queue. The value will be decisionReject, // decisionCancel, or decisionExecute; decisionTryAnother never // appears here. - decision promise.LockingWriteOnce + // + // The field is NOT thread-safe and should be protected by the + // queueset's lock. + decision promise.WriteOnce // arrivalTime is the real time when the request entered this system arrivalTime time.Time diff --git a/vendor/modules.txt b/vendor/modules.txt index b33d6406e72..55875cc2b27 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1549,7 +1549,6 @@ k8s.io/apiserver/pkg/util/flowcontrol/counter k8s.io/apiserver/pkg/util/flowcontrol/debug k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise -k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing k8s.io/apiserver/pkg/util/flowcontrol/format