Simplify APF promise to what is really used in the code

This commit is contained in:
wojtekt 2021-07-23 13:30:34 +02:00
parent a1cf44eab4
commit 9f735e71bb
6 changed files with 74 additions and 91 deletions

View File

@ -16,14 +16,10 @@ limitations under the License.
package promise
// This file defines interfaces for promises and futures and related
// things. These are about coordination among multiple goroutines and
// so are safe for concurrent calls --- although moderated in some
// cases by a requirement that the caller hold a certain lock.
// WriteOnce represents a variable that is initially not set and can
// be set once and is readable. This is the common meaning for
// "promise".
// The implementations of this interface are NOT thread-safe.
type WriteOnce interface {
// Get reads the current value of this variable. If this variable
// is not set yet then this call blocks until this variable gets a
@ -41,27 +37,3 @@ type WriteOnce interface {
// variable's value.
Set(interface{}) bool
}
// LockingWriteOnce is a WriteOnce whose implementation is protected
// by a lock.
type LockingWriteOnce interface {
WriteOnce
// GetLocked is like Get but the caller must already hold the
// lock. GetLocked may release, and later re-acquire, the lock
// any number of times. Get may acquire, and later release, the
// lock any number of times.
GetLocked() interface{}
// IsSetLocked is like IsSet but the caller must already hold the
// lock. IsSetLocked may release, and later re-acquire, the lock
// any number of times. IsSet may acquire, and later release, the
// lock any number of times.
IsSetLocked() bool
// SetLocked is like Set but the caller must already hold the
// lock. SetLocked may release, and later re-acquire, the lock
// any number of times. Set may acquire, and later release, the
// lock any number of times
SetLocked(interface{}) bool
}

View File

@ -14,23 +14,21 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package lockingpromise
package promise
import (
"sync"
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise"
)
// lockingPromise implementss the promise.LockingWriteOnce interface.
// promise implements the promise.WriteOnce interface.
// This implementation is based on a condition variable.
// This implementation tracks active goroutines:
// the given counter is decremented for a goroutine waiting for this
// varible to be set and incremented when such a goroutine is
// unblocked.
type lockingPromise struct {
lock sync.Locker
type promise struct {
cond sync.Cond
activeCounter counter.GoRoutineCounter // counter of active goroutines
waitingCount int // number of goroutines idle due to this being unset
@ -38,24 +36,17 @@ type lockingPromise struct {
value interface{}
}
var _ promise.LockingWriteOnce = &lockingPromise{}
var _ WriteOnce = &promise{}
// NewWriteOnce makes a new promise.LockingWriteOnce
func NewWriteOnce(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingWriteOnce {
return &lockingPromise{
lock: lock,
func NewWriteOnce(lock sync.Locker, activeCounter counter.GoRoutineCounter) WriteOnce {
return &promise{
cond: *sync.NewCond(lock),
activeCounter: activeCounter,
}
}
func (p *lockingPromise) Get() interface{} {
p.lock.Lock()
defer p.lock.Unlock()
return p.GetLocked()
}
func (p *lockingPromise) GetLocked() interface{} {
func (p *promise) Get() interface{} {
if !p.isSet {
p.waitingCount++
p.activeCounter.Add(-1)
@ -64,23 +55,11 @@ func (p *lockingPromise) GetLocked() interface{} {
return p.value
}
func (p *lockingPromise) IsSet() bool {
p.lock.Lock()
defer p.lock.Unlock()
return p.IsSetLocked()
}
func (p *lockingPromise) IsSetLocked() bool {
func (p *promise) IsSet() bool {
return p.isSet
}
func (p *lockingPromise) Set(value interface{}) bool {
p.lock.Lock()
defer p.lock.Unlock()
return p.SetLocked(value)
}
func (p *lockingPromise) SetLocked(value interface{}) bool {
func (p *promise) Set(value interface{}) bool {
if p.isSet {
return false
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package lockingpromise
package promise
import (
"sync"
@ -34,6 +34,9 @@ func TestLockingWriteOnce(t *testing.T) {
var got atomic.Value
counter.Add(1)
go func() {
lock.Lock()
defer lock.Unlock()
got.Store(wr.Get())
atomic.AddInt32(&gots, 1)
counter.Add(-1)
@ -43,13 +46,21 @@ func TestLockingWriteOnce(t *testing.T) {
if atomic.LoadInt32(&gots) != 0 {
t.Error("Get returned before Set")
}
if wr.IsSet() {
t.Error("IsSet before Set")
}
func() {
lock.Lock()
defer lock.Unlock()
if wr.IsSet() {
t.Error("IsSet before Set")
}
}()
aval := &now
if !wr.Set(aval) {
t.Error("Set() returned false")
}
func() {
lock.Lock()
defer lock.Unlock()
if !wr.Set(aval) {
t.Error("Set() returned false")
}
}()
clock.Run(nil)
time.Sleep(time.Second)
if atomic.LoadInt32(&gots) != 1 {
@ -58,11 +69,18 @@ func TestLockingWriteOnce(t *testing.T) {
if got.Load() != aval {
t.Error("Get did not return what was Set")
}
if !wr.IsSet() {
t.Error("IsSet()==false after Set")
}
func() {
lock.Lock()
defer lock.Unlock()
if !wr.IsSet() {
t.Error("IsSet()==false after Set")
}
}()
counter.Add(1)
go func() {
lock.Lock()
defer lock.Unlock()
got.Store(wr.Get())
atomic.AddInt32(&gots, 1)
counter.Add(-1)
@ -75,17 +93,29 @@ func TestLockingWriteOnce(t *testing.T) {
if got.Load() != aval {
t.Error("Second Get did not return what was Set")
}
if !wr.IsSet() {
t.Error("IsSet()==false after second Set")
}
func() {
lock.Lock()
defer lock.Unlock()
if !wr.IsSet() {
t.Error("IsSet()==false after second Get")
}
}()
later := time.Now()
bval := &later
if wr.Set(bval) {
t.Error("second Set() returned true")
}
if !wr.IsSet() {
t.Error("IsSet() returned false after second set")
} else if wr.Get() != aval {
t.Error("Get() after second Set returned wrong value")
}
func() {
lock.Lock()
defer lock.Unlock()
if wr.Set(bval) {
t.Error("second Set() returned true")
}
}()
func() {
lock.Lock()
defer lock.Unlock()
if !wr.IsSet() {
t.Error("IsSet() returned false after second set")
} else if wr.Get() != aval {
t.Error("Get() after second Set returned wrong value")
}
}()
}

View File

@ -28,7 +28,7 @@ import (
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
fqrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
"k8s.io/apiserver/pkg/util/shufflesharding"
@ -356,7 +356,7 @@ func (req *request) wait() (bool, bool) {
// Step 4:
// The final step is to wait on a decision from
// somewhere and then act on it.
decisionAny := req.decision.GetLocked()
decisionAny := req.decision.Get()
qs.syncTimeLocked()
decision, isDecision := decisionAny.(requestDecision)
if !isDecision {
@ -453,7 +453,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
fsName: fsName,
flowDistinguisher: flowDistinguisher,
ctx: ctx,
decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter),
decision: promise.NewWriteOnce(&qs.lock, qs.counter),
arrivalTime: qs.clock.Now(),
queue: queue,
descr1: descr1,
@ -503,7 +503,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s
waitLimit := now.Add(-qs.qCfg.RequestWaitLimit)
reqs.Walk(func(req *request) bool {
if waitLimit.After(req.arrivalTime) {
req.decision.SetLocked(decisionReject)
req.decision.Set(decisionReject)
timeoutCount++
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
req.NoteQueued(false)
@ -588,13 +588,13 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, width *fqreques
flowDistinguisher: flowDistinguisher,
ctx: ctx,
startTime: now,
decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter),
decision: promise.NewWriteOnce(&qs.lock, qs.counter),
arrivalTime: now,
descr1: descr1,
descr2: descr2,
width: *width,
}
req.decision.SetLocked(decisionExecute)
req.decision.Set(decisionExecute)
qs.totRequestsExecuting++
qs.totSeatsInUse += req.Seats()
metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1)
@ -643,7 +643,7 @@ func (qs *queueSet) dispatchLocked() bool {
}
// When a request is dequeued for service -> qs.virtualStart += G
queue.virtualStart += qs.estimatedServiceTime * float64(request.Seats())
request.decision.SetLocked(decisionExecute)
request.decision.Set(decisionExecute)
return ok
}
@ -652,12 +652,12 @@ func (qs *queueSet) dispatchLocked() bool {
func (qs *queueSet) cancelWait(req *request) {
qs.lock.Lock()
defer qs.lock.Unlock()
if req.decision.IsSetLocked() {
if req.decision.IsSet() {
// The request has already been removed from the queue
// and so we consider its wait to be over.
return
}
req.decision.SetLocked(decisionCancel)
req.decision.Set(decisionCancel)
// remove the request from the queue as it has timed out
req.removeFromQueueFn()

View File

@ -52,7 +52,10 @@ type request struct {
// is removed from its queue. The value will be decisionReject,
// decisionCancel, or decisionExecute; decisionTryAnother never
// appears here.
decision promise.LockingWriteOnce
//
// The field is NOT thread-safe and should be protected by the
// queueset's lock.
decision promise.WriteOnce
// arrivalTime is the real time when the request entered this system
arrivalTime time.Time

1
vendor/modules.txt vendored
View File

@ -1549,7 +1549,6 @@ k8s.io/apiserver/pkg/util/flowcontrol/counter
k8s.io/apiserver/pkg/util/flowcontrol/debug
k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing
k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise
k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise
k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset
k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing
k8s.io/apiserver/pkg/util/flowcontrol/format