diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index bd9fd9ad921..d5e600973ac 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -125,8 +125,7 @@ type configController struct { requestWaitLimit time.Duration // This must be locked while accessing flowSchemas or - // priorityLevelStates. It is the lock involved in - // LockingWriteMultiple. + // priorityLevelStates. lock sync.Mutex // flowSchemas holds the flow schema objects, sorted by increasing diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go index 1977f7522ac..917d5e1744e 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go @@ -21,11 +21,10 @@ package promise // so are safe for concurrent calls --- although moderated in some // cases by a requirement that the caller hold a certain lock. -// Readable represents a variable that is initially not set and later -// becomes set. Some instances may be set to multiple values in -// series. A Readable for a variable that can only get one value is -// commonly known as a "future". -type Readable interface { +// WriteOnce represents a variable that is initially not set and can +// be set once and is readable. This is the common meaning for +// "promise". +type WriteOnce interface { // Get reads the current value of this variable. If this variable // is not set yet then this call blocks until this variable gets a // value. @@ -34,12 +33,19 @@ type Readable interface { // IsSet returns immediately with an indication of whether this // variable has been set. IsSet() bool + + // Set normally writes a value into this variable, unblocks every + // goroutine waiting for this variable to have a value, and + // returns true. In the unhappy case that this variable is + // already set, this method returns false without modifying the + // variable's value. + Set(interface{}) bool } -// LockingReadable is a Readable whose implementation is protected by -// a lock -type LockingReadable interface { - Readable +// LockingWriteOnce is a WriteOnce whose implementation is protected +// by a lock. +type LockingWriteOnce interface { + WriteOnce // GetLocked is like Get but the caller must already hold the // lock. GetLocked may release, and later re-acquire, the lock @@ -52,31 +58,6 @@ type LockingReadable interface { // any number of times. IsSet may acquire, and later release, the // lock any number of times. IsSetLocked() bool -} - -// WriteOnceOnly represents a variable that is initially not set and -// can be set once. -type WriteOnceOnly interface { - // Set normally writes a value into this variable, unblocks every - // goroutine waiting for this variable to have a value, and - // returns true. In the unhappy case that this variable is - // already set, this method returns false without modifying the - // variable's value. - Set(interface{}) bool -} - -// WriteOnce represents a variable that is initially not set and can -// be set once and is readable. This is the common meaning for -// "promise". -type WriteOnce interface { - Readable - WriteOnceOnly -} - -// LockingWriteOnceOnly is a WriteOnceOnly whose implementation is -// protected by a lock. -type LockingWriteOnceOnly interface { - WriteOnceOnly // SetLocked is like Set but the caller must already hold the // lock. SetLocked may release, and later re-acquire, the lock @@ -84,46 +65,3 @@ type LockingWriteOnceOnly interface { // lock any number of times SetLocked(interface{}) bool } - -// LockingWriteOnce is a WriteOnce whose implementation is protected -// by a lock. -type LockingWriteOnce interface { - LockingReadable - LockingWriteOnceOnly -} - -// WriteMultipleOnly represents a variable that is initially not set -// and can be set one or more times (unlike a traditional "promise", -// which can be written only once). -type WriteMultipleOnly interface { - // Set writes a value into this variable and unblocks every - // goroutine waiting for this variable to have a value - Set(interface{}) -} - -// WriteMultiple represents a variable that is initially not set and -// can be set one or more times (unlike a traditional "promise", which -// can be written only once) and is readable. -type WriteMultiple interface { - Readable - WriteMultipleOnly -} - -// LockingWriteMultipleOnly is a WriteMultipleOnly whose -// implementation is protected by a lock. -type LockingWriteMultipleOnly interface { - WriteMultipleOnly - - // SetLocked is like Set but the caller must already hold the - // lock. SetLocked may release, and later re-acquire, the lock - // any number of times. Set may acquire, and later release, the - // lock any number of times - SetLocked(interface{}) -} - -// LockingWriteMultiple is a WriteMultiple whose implementation is -// protected by a lock. -type LockingWriteMultiple interface { - LockingReadable - LockingWriteMultipleOnly -} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise.go index db5598f8989..cb7e41dbd38 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise.go @@ -23,13 +23,13 @@ import ( "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise" ) -// promisoid is the data and behavior common to all the promise-like -// abstractions implemented here. This implementation is based on a -// condition variable. This implementation tracks active goroutines: +// lockingPromise implementss the promise.LockingWriteOnce interface. +// This implementation is based on a condition variable. +// This implementation tracks active goroutines: // the given counter is decremented for a goroutine waiting for this // varible to be set and incremented when such a goroutine is // unblocked. -type promisoid struct { +type lockingPromise struct { lock sync.Locker cond sync.Cond activeCounter counter.GoRoutineCounter // counter of active goroutines @@ -38,87 +38,58 @@ type promisoid struct { value interface{} } -func (pr *promisoid) Get() interface{} { - pr.lock.Lock() - defer pr.lock.Unlock() - return pr.GetLocked() -} - -func (pr *promisoid) GetLocked() interface{} { - if !pr.isSet { - pr.waitingCount++ - pr.activeCounter.Add(-1) - pr.cond.Wait() - } - return pr.value -} - -func (pr *promisoid) IsSet() bool { - pr.lock.Lock() - defer pr.lock.Unlock() - return pr.IsSetLocked() -} - -func (pr *promisoid) IsSetLocked() bool { - return pr.isSet -} - -func (pr *promisoid) SetLocked(value interface{}) { - pr.isSet = true - pr.value = value - if pr.waitingCount > 0 { - pr.activeCounter.Add(pr.waitingCount) - pr.waitingCount = 0 - pr.cond.Broadcast() - } -} - -type writeOnce struct { - promisoid -} - -var _ promise.LockingWriteOnce = &writeOnce{} +var _ promise.LockingWriteOnce = &lockingPromise{} // NewWriteOnce makes a new promise.LockingWriteOnce func NewWriteOnce(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingWriteOnce { - return &writeOnce{promisoid{ + return &lockingPromise{ lock: lock, cond: *sync.NewCond(lock), activeCounter: activeCounter, - }} + } } -func (wr *writeOnce) Set(value interface{}) bool { - wr.lock.Lock() - defer wr.lock.Unlock() - return wr.SetLocked(value) +func (p *lockingPromise) Get() interface{} { + p.lock.Lock() + defer p.lock.Unlock() + return p.GetLocked() } -func (wr *writeOnce) SetLocked(value interface{}) bool { - if wr.isSet { +func (p *lockingPromise) GetLocked() interface{} { + if !p.isSet { + p.waitingCount++ + p.activeCounter.Add(-1) + p.cond.Wait() + } + return p.value +} + +func (p *lockingPromise) IsSet() bool { + p.lock.Lock() + defer p.lock.Unlock() + return p.IsSetLocked() +} + +func (p *lockingPromise) IsSetLocked() bool { + return p.isSet +} + +func (p *lockingPromise) Set(value interface{}) bool { + p.lock.Lock() + defer p.lock.Unlock() + return p.SetLocked(value) +} + +func (p *lockingPromise) SetLocked(value interface{}) bool { + if p.isSet { return false } - wr.promisoid.SetLocked(value) + p.isSet = true + p.value = value + if p.waitingCount > 0 { + p.activeCounter.Add(p.waitingCount) + p.waitingCount = 0 + p.cond.Broadcast() + } return true } - -type writeMultiple struct { - promisoid -} - -var _ promise.LockingWriteMultiple = &writeMultiple{} - -// NewWriteMultiple makes a new promise.LockingWriteMultiple -func NewWriteMultiple(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingWriteMultiple { - return &writeMultiple{promisoid{ - lock: lock, - cond: *sync.NewCond(lock), - activeCounter: activeCounter, - }} -} - -func (wr *writeMultiple) Set(value interface{}) { - wr.lock.Lock() - defer wr.lock.Unlock() - wr.SetLocked(value) -} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise_test.go index abaedda1230..e91c497d585 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise_test.go @@ -25,67 +25,6 @@ import ( "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock" ) -func TestLockingWriteMultiple(t *testing.T) { - now := time.Now() - clock, counter := clock.NewFakeEventClock(now, 0, nil) - var lock sync.Mutex - wr := NewWriteMultiple(&lock, counter) - var gots int32 - var got atomic.Value - counter.Add(1) - go func() { - got.Store(wr.Get()) - atomic.AddInt32(&gots, 1) - counter.Add(-1) - }() - clock.Run(nil) - time.Sleep(time.Second) - if atomic.LoadInt32(&gots) != 0 { - t.Error("Get returned before Set") - } - if wr.IsSet() { - t.Error("IsSet before Set") - } - aval := &now - wr.Set(aval) - clock.Run(nil) - time.Sleep(time.Second) - if atomic.LoadInt32(&gots) != 1 { - t.Error("Get did not return after Set") - } - if got.Load() != aval { - t.Error("Get did not return what was Set") - } - if !wr.IsSet() { - t.Error("IsSet()==false after Set") - } - counter.Add(1) - go func() { - got.Store(wr.Get()) - atomic.AddInt32(&gots, 1) - counter.Add(-1) - }() - clock.Run(nil) - time.Sleep(time.Second) - if atomic.LoadInt32(&gots) != 2 { - t.Error("Second Get did not return immediately") - } - if got.Load() != aval { - t.Error("Second Get did not return what was Set") - } - if !wr.IsSet() { - t.Error("IsSet()==false after second Set") - } - later := time.Now() - bval := &later - wr.Set(bval) - if !wr.IsSet() { - t.Error("IsSet() returned false after second set") - } else if wr.Get() != bval { - t.Error("Get() after second Set returned wrong value") - } -} - func TestLockingWriteOnce(t *testing.T) { now := time.Now() clock, counter := clock.NewFakeEventClock(now, 0, nil)