Merge pull request #103820 from wojtek-t/pf_remove_counter

Couple code cleanups for APF code
This commit is contained in:
Kubernetes Prow Robot 2021-08-05 01:44:35 -07:00 committed by GitHub
commit e3b01a6d7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 214 additions and 384 deletions

View File

@ -125,8 +125,7 @@ type configController struct {
requestWaitLimit time.Duration
// This must be locked while accessing flowSchemas or
// priorityLevelStates. It is the lock involved in
// LockingWriteMultiple.
// priorityLevelStates.
lock sync.Mutex
// flowSchemas holds the flow schema objects, sorted by increasing

View File

@ -16,16 +16,11 @@ limitations under the License.
package promise
// This file defines interfaces for promises and futures and related
// things. These are about coordination among multiple goroutines and
// so are safe for concurrent calls --- although moderated in some
// cases by a requirement that the caller hold a certain lock.
// Readable represents a variable that is initially not set and later
// becomes set. Some instances may be set to multiple values in
// series. A Readable for a variable that can only get one value is
// commonly known as a "future".
type Readable interface {
// WriteOnce represents a variable that is initially not set and can
// be set once and is readable. This is the common meaning for
// "promise".
// The implementations of this interface are NOT thread-safe.
type WriteOnce interface {
// Get reads the current value of this variable. If this variable
// is not set yet then this call blocks until this variable gets a
// value.
@ -34,29 +29,7 @@ type Readable interface {
// IsSet returns immediately with an indication of whether this
// variable has been set.
IsSet() bool
}
// LockingReadable is a Readable whose implementation is protected by
// a lock
type LockingReadable interface {
Readable
// GetLocked is like Get but the caller must already hold the
// lock. GetLocked may release, and later re-acquire, the lock
// any number of times. Get may acquire, and later release, the
// lock any number of times.
GetLocked() interface{}
// IsSetLocked is like IsSet but the caller must already hold the
// lock. IsSetLocked may release, and later re-acquire, the lock
// any number of times. IsSet may acquire, and later release, the
// lock any number of times.
IsSetLocked() bool
}
// WriteOnceOnly represents a variable that is initially not set and
// can be set once.
type WriteOnceOnly interface {
// Set normally writes a value into this variable, unblocks every
// goroutine waiting for this variable to have a value, and
// returns true. In the unhappy case that this variable is
@ -64,66 +37,3 @@ type WriteOnceOnly interface {
// variable's value.
Set(interface{}) bool
}
// WriteOnce represents a variable that is initially not set and can
// be set once and is readable. This is the common meaning for
// "promise".
type WriteOnce interface {
Readable
WriteOnceOnly
}
// LockingWriteOnceOnly is a WriteOnceOnly whose implementation is
// protected by a lock.
type LockingWriteOnceOnly interface {
WriteOnceOnly
// SetLocked is like Set but the caller must already hold the
// lock. SetLocked may release, and later re-acquire, the lock
// any number of times. Set may acquire, and later release, the
// lock any number of times
SetLocked(interface{}) bool
}
// LockingWriteOnce is a WriteOnce whose implementation is protected
// by a lock.
type LockingWriteOnce interface {
LockingReadable
LockingWriteOnceOnly
}
// WriteMultipleOnly represents a variable that is initially not set
// and can be set one or more times (unlike a traditional "promise",
// which can be written only once).
type WriteMultipleOnly interface {
// Set writes a value into this variable and unblocks every
// goroutine waiting for this variable to have a value
Set(interface{})
}
// WriteMultiple represents a variable that is initially not set and
// can be set one or more times (unlike a traditional "promise", which
// can be written only once) and is readable.
type WriteMultiple interface {
Readable
WriteMultipleOnly
}
// LockingWriteMultipleOnly is a WriteMultipleOnly whose
// implementation is protected by a lock.
type LockingWriteMultipleOnly interface {
WriteMultipleOnly
// SetLocked is like Set but the caller must already hold the
// lock. SetLocked may release, and later re-acquire, the lock
// any number of times. Set may acquire, and later release, the
// lock any number of times
SetLocked(interface{})
}
// LockingWriteMultiple is a WriteMultiple whose implementation is
// protected by a lock.
type LockingWriteMultiple interface {
LockingReadable
LockingWriteMultipleOnly
}

View File

@ -1,124 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package lockingpromise
import (
"sync"
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise"
)
// promisoid is the data and behavior common to all the promise-like
// abstractions implemented here. This implementation is based on a
// condition variable. This implementation tracks active goroutines:
// the given counter is decremented for a goroutine waiting for this
// varible to be set and incremented when such a goroutine is
// unblocked.
type promisoid struct {
lock sync.Locker
cond sync.Cond
activeCounter counter.GoRoutineCounter // counter of active goroutines
waitingCount int // number of goroutines idle due to this being unset
isSet bool
value interface{}
}
func (pr *promisoid) Get() interface{} {
pr.lock.Lock()
defer pr.lock.Unlock()
return pr.GetLocked()
}
func (pr *promisoid) GetLocked() interface{} {
if !pr.isSet {
pr.waitingCount++
pr.activeCounter.Add(-1)
pr.cond.Wait()
}
return pr.value
}
func (pr *promisoid) IsSet() bool {
pr.lock.Lock()
defer pr.lock.Unlock()
return pr.IsSetLocked()
}
func (pr *promisoid) IsSetLocked() bool {
return pr.isSet
}
func (pr *promisoid) SetLocked(value interface{}) {
pr.isSet = true
pr.value = value
if pr.waitingCount > 0 {
pr.activeCounter.Add(pr.waitingCount)
pr.waitingCount = 0
pr.cond.Broadcast()
}
}
type writeOnce struct {
promisoid
}
var _ promise.LockingWriteOnce = &writeOnce{}
// NewWriteOnce makes a new promise.LockingWriteOnce
func NewWriteOnce(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingWriteOnce {
return &writeOnce{promisoid{
lock: lock,
cond: *sync.NewCond(lock),
activeCounter: activeCounter,
}}
}
func (wr *writeOnce) Set(value interface{}) bool {
wr.lock.Lock()
defer wr.lock.Unlock()
return wr.SetLocked(value)
}
func (wr *writeOnce) SetLocked(value interface{}) bool {
if wr.isSet {
return false
}
wr.promisoid.SetLocked(value)
return true
}
type writeMultiple struct {
promisoid
}
var _ promise.LockingWriteMultiple = &writeMultiple{}
// NewWriteMultiple makes a new promise.LockingWriteMultiple
func NewWriteMultiple(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingWriteMultiple {
return &writeMultiple{promisoid{
lock: lock,
cond: *sync.NewCond(lock),
activeCounter: activeCounter,
}}
}
func (wr *writeMultiple) Set(value interface{}) {
wr.lock.Lock()
defer wr.lock.Unlock()
wr.SetLocked(value)
}

View File

@ -1,152 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package lockingpromise
import (
"sync"
"sync/atomic"
"testing"
"time"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock"
)
func TestLockingWriteMultiple(t *testing.T) {
now := time.Now()
clock, counter := clock.NewFakeEventClock(now, 0, nil)
var lock sync.Mutex
wr := NewWriteMultiple(&lock, counter)
var gots int32
var got atomic.Value
counter.Add(1)
go func() {
got.Store(wr.Get())
atomic.AddInt32(&gots, 1)
counter.Add(-1)
}()
clock.Run(nil)
time.Sleep(time.Second)
if atomic.LoadInt32(&gots) != 0 {
t.Error("Get returned before Set")
}
if wr.IsSet() {
t.Error("IsSet before Set")
}
aval := &now
wr.Set(aval)
clock.Run(nil)
time.Sleep(time.Second)
if atomic.LoadInt32(&gots) != 1 {
t.Error("Get did not return after Set")
}
if got.Load() != aval {
t.Error("Get did not return what was Set")
}
if !wr.IsSet() {
t.Error("IsSet()==false after Set")
}
counter.Add(1)
go func() {
got.Store(wr.Get())
atomic.AddInt32(&gots, 1)
counter.Add(-1)
}()
clock.Run(nil)
time.Sleep(time.Second)
if atomic.LoadInt32(&gots) != 2 {
t.Error("Second Get did not return immediately")
}
if got.Load() != aval {
t.Error("Second Get did not return what was Set")
}
if !wr.IsSet() {
t.Error("IsSet()==false after second Set")
}
later := time.Now()
bval := &later
wr.Set(bval)
if !wr.IsSet() {
t.Error("IsSet() returned false after second set")
} else if wr.Get() != bval {
t.Error("Get() after second Set returned wrong value")
}
}
func TestLockingWriteOnce(t *testing.T) {
now := time.Now()
clock, counter := clock.NewFakeEventClock(now, 0, nil)
var lock sync.Mutex
wr := NewWriteOnce(&lock, counter)
var gots int32
var got atomic.Value
counter.Add(1)
go func() {
got.Store(wr.Get())
atomic.AddInt32(&gots, 1)
counter.Add(-1)
}()
clock.Run(nil)
time.Sleep(time.Second)
if atomic.LoadInt32(&gots) != 0 {
t.Error("Get returned before Set")
}
if wr.IsSet() {
t.Error("IsSet before Set")
}
aval := &now
if !wr.Set(aval) {
t.Error("Set() returned false")
}
clock.Run(nil)
time.Sleep(time.Second)
if atomic.LoadInt32(&gots) != 1 {
t.Error("Get did not return after Set")
}
if got.Load() != aval {
t.Error("Get did not return what was Set")
}
if !wr.IsSet() {
t.Error("IsSet()==false after Set")
}
counter.Add(1)
go func() {
got.Store(wr.Get())
atomic.AddInt32(&gots, 1)
counter.Add(-1)
}()
clock.Run(nil)
time.Sleep(time.Second)
if atomic.LoadInt32(&gots) != 2 {
t.Error("Second Get did not return immediately")
}
if got.Load() != aval {
t.Error("Second Get did not return what was Set")
}
if !wr.IsSet() {
t.Error("IsSet()==false after second Set")
}
later := time.Now()
bval := &later
if wr.Set(bval) {
t.Error("second Set() returned true")
}
if !wr.IsSet() {
t.Error("IsSet() returned false after second set")
} else if wr.Get() != aval {
t.Error("Get() after second Set returned wrong value")
}
}

View File

@ -0,0 +1,74 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package promise
import (
"sync"
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
)
// promise implements the promise.WriteOnce interface.
// This implementation is based on a condition variable.
// This implementation tracks active goroutines:
// the given counter is decremented for a goroutine waiting for this
// varible to be set and incremented when such a goroutine is
// unblocked.
type promise struct {
cond sync.Cond
activeCounter counter.GoRoutineCounter // counter of active goroutines
waitingCount int // number of goroutines idle due to this being unset
isSet bool
value interface{}
}
var _ WriteOnce = &promise{}
// NewWriteOnce makes a new promise.LockingWriteOnce
func NewWriteOnce(lock sync.Locker, activeCounter counter.GoRoutineCounter) WriteOnce {
return &promise{
cond: *sync.NewCond(lock),
activeCounter: activeCounter,
}
}
func (p *promise) Get() interface{} {
if !p.isSet {
p.waitingCount++
p.activeCounter.Add(-1)
p.cond.Wait()
}
return p.value
}
func (p *promise) IsSet() bool {
return p.isSet
}
func (p *promise) Set(value interface{}) bool {
if p.isSet {
return false
}
p.isSet = true
p.value = value
if p.waitingCount > 0 {
p.activeCounter.Add(p.waitingCount)
p.waitingCount = 0
p.cond.Broadcast()
}
return true
}

View File

@ -0,0 +1,121 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package promise
import (
"sync"
"sync/atomic"
"testing"
"time"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock"
)
func TestLockingWriteOnce(t *testing.T) {
now := time.Now()
clock, counter := clock.NewFakeEventClock(now, 0, nil)
var lock sync.Mutex
wr := NewWriteOnce(&lock, counter)
var gots int32
var got atomic.Value
counter.Add(1)
go func() {
lock.Lock()
defer lock.Unlock()
got.Store(wr.Get())
atomic.AddInt32(&gots, 1)
counter.Add(-1)
}()
clock.Run(nil)
time.Sleep(time.Second)
if atomic.LoadInt32(&gots) != 0 {
t.Error("Get returned before Set")
}
func() {
lock.Lock()
defer lock.Unlock()
if wr.IsSet() {
t.Error("IsSet before Set")
}
}()
aval := &now
func() {
lock.Lock()
defer lock.Unlock()
if !wr.Set(aval) {
t.Error("Set() returned false")
}
}()
clock.Run(nil)
time.Sleep(time.Second)
if atomic.LoadInt32(&gots) != 1 {
t.Error("Get did not return after Set")
}
if got.Load() != aval {
t.Error("Get did not return what was Set")
}
func() {
lock.Lock()
defer lock.Unlock()
if !wr.IsSet() {
t.Error("IsSet()==false after Set")
}
}()
counter.Add(1)
go func() {
lock.Lock()
defer lock.Unlock()
got.Store(wr.Get())
atomic.AddInt32(&gots, 1)
counter.Add(-1)
}()
clock.Run(nil)
time.Sleep(time.Second)
if atomic.LoadInt32(&gots) != 2 {
t.Error("Second Get did not return immediately")
}
if got.Load() != aval {
t.Error("Second Get did not return what was Set")
}
func() {
lock.Lock()
defer lock.Unlock()
if !wr.IsSet() {
t.Error("IsSet()==false after second Get")
}
}()
later := time.Now()
bval := &later
func() {
lock.Lock()
defer lock.Unlock()
if wr.Set(bval) {
t.Error("second Set() returned true")
}
}()
func() {
lock.Lock()
defer lock.Unlock()
if !wr.IsSet() {
t.Error("IsSet() returned false after second set")
} else if wr.Get() != aval {
t.Error("Get() after second Set returned wrong value")
}
}()
}

View File

@ -28,7 +28,7 @@ import (
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
fqrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
"k8s.io/apiserver/pkg/util/shufflesharding"
@ -356,7 +356,7 @@ func (req *request) wait() (bool, bool) {
// Step 4:
// The final step is to wait on a decision from
// somewhere and then act on it.
decisionAny := req.decision.GetLocked()
decisionAny := req.decision.Get()
qs.syncTimeLocked()
decision, isDecision := decisionAny.(requestDecision)
if !isDecision {
@ -453,7 +453,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
fsName: fsName,
flowDistinguisher: flowDistinguisher,
ctx: ctx,
decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter),
decision: promise.NewWriteOnce(&qs.lock, qs.counter),
arrivalTime: qs.clock.Now(),
queue: queue,
descr1: descr1,
@ -503,7 +503,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s
waitLimit := now.Add(-qs.qCfg.RequestWaitLimit)
reqs.Walk(func(req *request) bool {
if waitLimit.After(req.arrivalTime) {
req.decision.SetLocked(decisionReject)
req.decision.Set(decisionReject)
timeoutCount++
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
req.NoteQueued(false)
@ -588,13 +588,13 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
flowDistinguisher: flowDistinguisher,
ctx: ctx,
startTime: now,
decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter),
decision: promise.NewWriteOnce(&qs.lock, qs.counter),
arrivalTime: now,
descr1: descr1,
descr2: descr2,
workEstimate: *workEstimate,
}
req.decision.SetLocked(decisionExecute)
req.decision.Set(decisionExecute)
qs.totRequestsExecuting++
qs.totSeatsInUse += req.Seats()
metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1)
@ -643,7 +643,7 @@ func (qs *queueSet) dispatchLocked() bool {
}
// When a request is dequeued for service -> qs.virtualStart += G
queue.virtualStart += qs.estimatedServiceTime * float64(request.Seats())
request.decision.SetLocked(decisionExecute)
request.decision.Set(decisionExecute)
return ok
}
@ -652,12 +652,12 @@ func (qs *queueSet) dispatchLocked() bool {
func (qs *queueSet) cancelWait(req *request) {
qs.lock.Lock()
defer qs.lock.Unlock()
if req.decision.IsSetLocked() {
if req.decision.IsSet() {
// The request has already been removed from the queue
// and so we consider its wait to be over.
return
}
req.decision.SetLocked(decisionCancel)
req.decision.Set(decisionCancel)
// remove the request from the queue as it has timed out
req.removeFromQueueFn()

View File

@ -52,7 +52,10 @@ type request struct {
// is removed from its queue. The value will be decisionReject,
// decisionCancel, or decisionExecute; decisionTryAnother never
// appears here.
decision promise.LockingWriteOnce
//
// The field is NOT thread-safe and should be protected by the
// queueset's lock.
decision promise.WriteOnce
// arrivalTime is the real time when the request entered this system
arrivalTime time.Time

1
vendor/modules.txt vendored
View File

@ -1549,7 +1549,6 @@ k8s.io/apiserver/pkg/util/flowcontrol/counter
k8s.io/apiserver/pkg/util/flowcontrol/debug
k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing
k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise
k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise
k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset
k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing
k8s.io/apiserver/pkg/util/flowcontrol/format