diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index 919bda40e96..908e2569704 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -125,8 +125,7 @@ type configController struct { requestWaitLimit time.Duration // This must be locked while accessing flowSchemas or - // priorityLevelStates. It is the lock involved in - // LockingWriteMultiple. + // priorityLevelStates. lock sync.Mutex // flowSchemas holds the flow schema objects, sorted by increasing diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go index 1977f7522ac..58af4594935 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go @@ -16,16 +16,11 @@ limitations under the License. package promise -// This file defines interfaces for promises and futures and related -// things. These are about coordination among multiple goroutines and -// so are safe for concurrent calls --- although moderated in some -// cases by a requirement that the caller hold a certain lock. - -// Readable represents a variable that is initially not set and later -// becomes set. Some instances may be set to multiple values in -// series. A Readable for a variable that can only get one value is -// commonly known as a "future". -type Readable interface { +// WriteOnce represents a variable that is initially not set and can +// be set once and is readable. This is the common meaning for +// "promise". +// The implementations of this interface are NOT thread-safe. +type WriteOnce interface { // Get reads the current value of this variable. If this variable // is not set yet then this call blocks until this variable gets a // value. @@ -34,29 +29,7 @@ type Readable interface { // IsSet returns immediately with an indication of whether this // variable has been set. IsSet() bool -} -// LockingReadable is a Readable whose implementation is protected by -// a lock -type LockingReadable interface { - Readable - - // GetLocked is like Get but the caller must already hold the - // lock. GetLocked may release, and later re-acquire, the lock - // any number of times. Get may acquire, and later release, the - // lock any number of times. - GetLocked() interface{} - - // IsSetLocked is like IsSet but the caller must already hold the - // lock. IsSetLocked may release, and later re-acquire, the lock - // any number of times. IsSet may acquire, and later release, the - // lock any number of times. - IsSetLocked() bool -} - -// WriteOnceOnly represents a variable that is initially not set and -// can be set once. -type WriteOnceOnly interface { // Set normally writes a value into this variable, unblocks every // goroutine waiting for this variable to have a value, and // returns true. In the unhappy case that this variable is @@ -64,66 +37,3 @@ type WriteOnceOnly interface { // variable's value. Set(interface{}) bool } - -// WriteOnce represents a variable that is initially not set and can -// be set once and is readable. This is the common meaning for -// "promise". -type WriteOnce interface { - Readable - WriteOnceOnly -} - -// LockingWriteOnceOnly is a WriteOnceOnly whose implementation is -// protected by a lock. -type LockingWriteOnceOnly interface { - WriteOnceOnly - - // SetLocked is like Set but the caller must already hold the - // lock. SetLocked may release, and later re-acquire, the lock - // any number of times. Set may acquire, and later release, the - // lock any number of times - SetLocked(interface{}) bool -} - -// LockingWriteOnce is a WriteOnce whose implementation is protected -// by a lock. -type LockingWriteOnce interface { - LockingReadable - LockingWriteOnceOnly -} - -// WriteMultipleOnly represents a variable that is initially not set -// and can be set one or more times (unlike a traditional "promise", -// which can be written only once). -type WriteMultipleOnly interface { - // Set writes a value into this variable and unblocks every - // goroutine waiting for this variable to have a value - Set(interface{}) -} - -// WriteMultiple represents a variable that is initially not set and -// can be set one or more times (unlike a traditional "promise", which -// can be written only once) and is readable. -type WriteMultiple interface { - Readable - WriteMultipleOnly -} - -// LockingWriteMultipleOnly is a WriteMultipleOnly whose -// implementation is protected by a lock. -type LockingWriteMultipleOnly interface { - WriteMultipleOnly - - // SetLocked is like Set but the caller must already hold the - // lock. SetLocked may release, and later re-acquire, the lock - // any number of times. Set may acquire, and later release, the - // lock any number of times - SetLocked(interface{}) -} - -// LockingWriteMultiple is a WriteMultiple whose implementation is -// protected by a lock. -type LockingWriteMultiple interface { - LockingReadable - LockingWriteMultipleOnly -} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise.go deleted file mode 100644 index db5598f8989..00000000000 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise.go +++ /dev/null @@ -1,124 +0,0 @@ -/* -Copyright 2019 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package lockingpromise - -import ( - "sync" - - "k8s.io/apiserver/pkg/util/flowcontrol/counter" - "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise" -) - -// promisoid is the data and behavior common to all the promise-like -// abstractions implemented here. This implementation is based on a -// condition variable. This implementation tracks active goroutines: -// the given counter is decremented for a goroutine waiting for this -// varible to be set and incremented when such a goroutine is -// unblocked. -type promisoid struct { - lock sync.Locker - cond sync.Cond - activeCounter counter.GoRoutineCounter // counter of active goroutines - waitingCount int // number of goroutines idle due to this being unset - isSet bool - value interface{} -} - -func (pr *promisoid) Get() interface{} { - pr.lock.Lock() - defer pr.lock.Unlock() - return pr.GetLocked() -} - -func (pr *promisoid) GetLocked() interface{} { - if !pr.isSet { - pr.waitingCount++ - pr.activeCounter.Add(-1) - pr.cond.Wait() - } - return pr.value -} - -func (pr *promisoid) IsSet() bool { - pr.lock.Lock() - defer pr.lock.Unlock() - return pr.IsSetLocked() -} - -func (pr *promisoid) IsSetLocked() bool { - return pr.isSet -} - -func (pr *promisoid) SetLocked(value interface{}) { - pr.isSet = true - pr.value = value - if pr.waitingCount > 0 { - pr.activeCounter.Add(pr.waitingCount) - pr.waitingCount = 0 - pr.cond.Broadcast() - } -} - -type writeOnce struct { - promisoid -} - -var _ promise.LockingWriteOnce = &writeOnce{} - -// NewWriteOnce makes a new promise.LockingWriteOnce -func NewWriteOnce(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingWriteOnce { - return &writeOnce{promisoid{ - lock: lock, - cond: *sync.NewCond(lock), - activeCounter: activeCounter, - }} -} - -func (wr *writeOnce) Set(value interface{}) bool { - wr.lock.Lock() - defer wr.lock.Unlock() - return wr.SetLocked(value) -} - -func (wr *writeOnce) SetLocked(value interface{}) bool { - if wr.isSet { - return false - } - wr.promisoid.SetLocked(value) - return true -} - -type writeMultiple struct { - promisoid -} - -var _ promise.LockingWriteMultiple = &writeMultiple{} - -// NewWriteMultiple makes a new promise.LockingWriteMultiple -func NewWriteMultiple(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingWriteMultiple { - return &writeMultiple{promisoid{ - lock: lock, - cond: *sync.NewCond(lock), - activeCounter: activeCounter, - }} -} - -func (wr *writeMultiple) Set(value interface{}) { - wr.lock.Lock() - defer wr.lock.Unlock() - wr.SetLocked(value) -} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise_test.go deleted file mode 100644 index abaedda1230..00000000000 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise_test.go +++ /dev/null @@ -1,152 +0,0 @@ -/* -Copyright 2019 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package lockingpromise - -import ( - "sync" - "sync/atomic" - "testing" - "time" - - "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock" -) - -func TestLockingWriteMultiple(t *testing.T) { - now := time.Now() - clock, counter := clock.NewFakeEventClock(now, 0, nil) - var lock sync.Mutex - wr := NewWriteMultiple(&lock, counter) - var gots int32 - var got atomic.Value - counter.Add(1) - go func() { - got.Store(wr.Get()) - atomic.AddInt32(&gots, 1) - counter.Add(-1) - }() - clock.Run(nil) - time.Sleep(time.Second) - if atomic.LoadInt32(&gots) != 0 { - t.Error("Get returned before Set") - } - if wr.IsSet() { - t.Error("IsSet before Set") - } - aval := &now - wr.Set(aval) - clock.Run(nil) - time.Sleep(time.Second) - if atomic.LoadInt32(&gots) != 1 { - t.Error("Get did not return after Set") - } - if got.Load() != aval { - t.Error("Get did not return what was Set") - } - if !wr.IsSet() { - t.Error("IsSet()==false after Set") - } - counter.Add(1) - go func() { - got.Store(wr.Get()) - atomic.AddInt32(&gots, 1) - counter.Add(-1) - }() - clock.Run(nil) - time.Sleep(time.Second) - if atomic.LoadInt32(&gots) != 2 { - t.Error("Second Get did not return immediately") - } - if got.Load() != aval { - t.Error("Second Get did not return what was Set") - } - if !wr.IsSet() { - t.Error("IsSet()==false after second Set") - } - later := time.Now() - bval := &later - wr.Set(bval) - if !wr.IsSet() { - t.Error("IsSet() returned false after second set") - } else if wr.Get() != bval { - t.Error("Get() after second Set returned wrong value") - } -} - -func TestLockingWriteOnce(t *testing.T) { - now := time.Now() - clock, counter := clock.NewFakeEventClock(now, 0, nil) - var lock sync.Mutex - wr := NewWriteOnce(&lock, counter) - var gots int32 - var got atomic.Value - counter.Add(1) - go func() { - got.Store(wr.Get()) - atomic.AddInt32(&gots, 1) - counter.Add(-1) - }() - clock.Run(nil) - time.Sleep(time.Second) - if atomic.LoadInt32(&gots) != 0 { - t.Error("Get returned before Set") - } - if wr.IsSet() { - t.Error("IsSet before Set") - } - aval := &now - if !wr.Set(aval) { - t.Error("Set() returned false") - } - clock.Run(nil) - time.Sleep(time.Second) - if atomic.LoadInt32(&gots) != 1 { - t.Error("Get did not return after Set") - } - if got.Load() != aval { - t.Error("Get did not return what was Set") - } - if !wr.IsSet() { - t.Error("IsSet()==false after Set") - } - counter.Add(1) - go func() { - got.Store(wr.Get()) - atomic.AddInt32(&gots, 1) - counter.Add(-1) - }() - clock.Run(nil) - time.Sleep(time.Second) - if atomic.LoadInt32(&gots) != 2 { - t.Error("Second Get did not return immediately") - } - if got.Load() != aval { - t.Error("Second Get did not return what was Set") - } - if !wr.IsSet() { - t.Error("IsSet()==false after second Set") - } - later := time.Now() - bval := &later - if wr.Set(bval) { - t.Error("second Set() returned true") - } - if !wr.IsSet() { - t.Error("IsSet() returned false after second set") - } else if wr.Get() != aval { - t.Error("Get() after second Set returned wrong value") - } -} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/promise.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/promise.go new file mode 100644 index 00000000000..70059e827f8 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/promise.go @@ -0,0 +1,74 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package promise + +import ( + "sync" + + "k8s.io/apiserver/pkg/util/flowcontrol/counter" +) + +// promise implements the promise.WriteOnce interface. +// This implementation is based on a condition variable. +// This implementation tracks active goroutines: +// the given counter is decremented for a goroutine waiting for this +// varible to be set and incremented when such a goroutine is +// unblocked. +type promise struct { + cond sync.Cond + activeCounter counter.GoRoutineCounter // counter of active goroutines + waitingCount int // number of goroutines idle due to this being unset + isSet bool + value interface{} +} + +var _ WriteOnce = &promise{} + +// NewWriteOnce makes a new promise.LockingWriteOnce +func NewWriteOnce(lock sync.Locker, activeCounter counter.GoRoutineCounter) WriteOnce { + return &promise{ + cond: *sync.NewCond(lock), + activeCounter: activeCounter, + } +} + +func (p *promise) Get() interface{} { + if !p.isSet { + p.waitingCount++ + p.activeCounter.Add(-1) + p.cond.Wait() + } + return p.value +} + +func (p *promise) IsSet() bool { + return p.isSet +} + +func (p *promise) Set(value interface{}) bool { + if p.isSet { + return false + } + p.isSet = true + p.value = value + if p.waitingCount > 0 { + p.activeCounter.Add(p.waitingCount) + p.waitingCount = 0 + p.cond.Broadcast() + } + return true +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/promise_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/promise_test.go new file mode 100644 index 00000000000..ca825560f1d --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/promise_test.go @@ -0,0 +1,121 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package promise + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock" +) + +func TestLockingWriteOnce(t *testing.T) { + now := time.Now() + clock, counter := clock.NewFakeEventClock(now, 0, nil) + var lock sync.Mutex + wr := NewWriteOnce(&lock, counter) + var gots int32 + var got atomic.Value + counter.Add(1) + go func() { + lock.Lock() + defer lock.Unlock() + + got.Store(wr.Get()) + atomic.AddInt32(&gots, 1) + counter.Add(-1) + }() + clock.Run(nil) + time.Sleep(time.Second) + if atomic.LoadInt32(&gots) != 0 { + t.Error("Get returned before Set") + } + func() { + lock.Lock() + defer lock.Unlock() + if wr.IsSet() { + t.Error("IsSet before Set") + } + }() + aval := &now + func() { + lock.Lock() + defer lock.Unlock() + if !wr.Set(aval) { + t.Error("Set() returned false") + } + }() + clock.Run(nil) + time.Sleep(time.Second) + if atomic.LoadInt32(&gots) != 1 { + t.Error("Get did not return after Set") + } + if got.Load() != aval { + t.Error("Get did not return what was Set") + } + func() { + lock.Lock() + defer lock.Unlock() + if !wr.IsSet() { + t.Error("IsSet()==false after Set") + } + }() + counter.Add(1) + go func() { + lock.Lock() + defer lock.Unlock() + + got.Store(wr.Get()) + atomic.AddInt32(&gots, 1) + counter.Add(-1) + }() + clock.Run(nil) + time.Sleep(time.Second) + if atomic.LoadInt32(&gots) != 2 { + t.Error("Second Get did not return immediately") + } + if got.Load() != aval { + t.Error("Second Get did not return what was Set") + } + func() { + lock.Lock() + defer lock.Unlock() + if !wr.IsSet() { + t.Error("IsSet()==false after second Get") + } + }() + later := time.Now() + bval := &later + func() { + lock.Lock() + defer lock.Unlock() + if wr.Set(bval) { + t.Error("second Set() returned true") + } + }() + func() { + lock.Lock() + defer lock.Unlock() + if !wr.IsSet() { + t.Error("IsSet() returned false after second set") + } else if wr.Get() != aval { + t.Error("Get() after second Set returned wrong value") + } + }() +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index a5c7b1a4d20..edf3c18ae07 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -28,7 +28,7 @@ import ( "k8s.io/apiserver/pkg/util/flowcontrol/counter" "k8s.io/apiserver/pkg/util/flowcontrol/debug" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" - "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise" + "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise" "k8s.io/apiserver/pkg/util/flowcontrol/metrics" fqrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" "k8s.io/apiserver/pkg/util/shufflesharding" @@ -356,7 +356,7 @@ func (req *request) wait() (bool, bool) { // Step 4: // The final step is to wait on a decision from // somewhere and then act on it. - decisionAny := req.decision.GetLocked() + decisionAny := req.decision.Get() qs.syncTimeLocked() decision, isDecision := decisionAny.(requestDecision) if !isDecision { @@ -453,7 +453,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte fsName: fsName, flowDistinguisher: flowDistinguisher, ctx: ctx, - decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter), + decision: promise.NewWriteOnce(&qs.lock, qs.counter), arrivalTime: qs.clock.Now(), queue: queue, descr1: descr1, @@ -503,7 +503,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s waitLimit := now.Add(-qs.qCfg.RequestWaitLimit) reqs.Walk(func(req *request) bool { if waitLimit.After(req.arrivalTime) { - req.decision.SetLocked(decisionReject) + req.decision.Set(decisionReject) timeoutCount++ metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) req.NoteQueued(false) @@ -588,13 +588,13 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f flowDistinguisher: flowDistinguisher, ctx: ctx, startTime: now, - decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter), + decision: promise.NewWriteOnce(&qs.lock, qs.counter), arrivalTime: now, descr1: descr1, descr2: descr2, workEstimate: *workEstimate, } - req.decision.SetLocked(decisionExecute) + req.decision.Set(decisionExecute) qs.totRequestsExecuting++ qs.totSeatsInUse += req.Seats() metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1) @@ -643,7 +643,7 @@ func (qs *queueSet) dispatchLocked() bool { } // When a request is dequeued for service -> qs.virtualStart += G queue.virtualStart += qs.estimatedServiceTime * float64(request.Seats()) - request.decision.SetLocked(decisionExecute) + request.decision.Set(decisionExecute) return ok } @@ -652,12 +652,12 @@ func (qs *queueSet) dispatchLocked() bool { func (qs *queueSet) cancelWait(req *request) { qs.lock.Lock() defer qs.lock.Unlock() - if req.decision.IsSetLocked() { + if req.decision.IsSet() { // The request has already been removed from the queue // and so we consider its wait to be over. return } - req.decision.SetLocked(decisionCancel) + req.decision.Set(decisionCancel) // remove the request from the queue as it has timed out req.removeFromQueueFn() diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index 72943cf5a3c..cd954d4668d 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -52,7 +52,10 @@ type request struct { // is removed from its queue. The value will be decisionReject, // decisionCancel, or decisionExecute; decisionTryAnother never // appears here. - decision promise.LockingWriteOnce + // + // The field is NOT thread-safe and should be protected by the + // queueset's lock. + decision promise.WriteOnce // arrivalTime is the real time when the request entered this system arrivalTime time.Time diff --git a/vendor/modules.txt b/vendor/modules.txt index 3ac9079ee65..d21f2055617 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1549,7 +1549,6 @@ k8s.io/apiserver/pkg/util/flowcontrol/counter k8s.io/apiserver/pkg/util/flowcontrol/debug k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise -k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing k8s.io/apiserver/pkg/util/flowcontrol/format