mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 01:40:07 +00:00
Merge pull request #120162 from tkashem/apf-promise-refactor
apf: refactor promise to use a context
This commit is contained in:
commit
714e77595c
@ -17,12 +17,13 @@ limitations under the License.
|
|||||||
package promise
|
package promise
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
// promise implements the WriteOnce interface.
|
// promise implements the WriteOnce interface.
|
||||||
type promise struct {
|
type promise struct {
|
||||||
doneCh <-chan struct{}
|
doneCtx context.Context
|
||||||
doneVal interface{}
|
doneVal interface{}
|
||||||
setCh chan struct{}
|
setCh chan struct{}
|
||||||
onceler sync.Once
|
onceler sync.Once
|
||||||
@ -35,12 +36,12 @@ var _ WriteOnce = &promise{}
|
|||||||
//
|
//
|
||||||
// If `initial` is non-nil then that value is Set at creation time.
|
// If `initial` is non-nil then that value is Set at creation time.
|
||||||
//
|
//
|
||||||
// If a `Get` is waiting soon after `doneCh` becomes selectable (which
|
// If a `Get` is waiting soon after the channel associated with the
|
||||||
// never happens for the nil channel) then `Set(doneVal)` effectively
|
// `doneCtx` becomes selectable (which never happens for the nil
|
||||||
// happens at that time.
|
// channel) then `Set(doneVal)` effectively happens at that time.
|
||||||
func NewWriteOnce(initial interface{}, doneCh <-chan struct{}, doneVal interface{}) WriteOnce {
|
func NewWriteOnce(initial interface{}, doneCtx context.Context, doneVal interface{}) WriteOnce {
|
||||||
p := &promise{
|
p := &promise{
|
||||||
doneCh: doneCh,
|
doneCtx: doneCtx,
|
||||||
doneVal: doneVal,
|
doneVal: doneVal,
|
||||||
setCh: make(chan struct{}),
|
setCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
@ -53,7 +54,7 @@ func NewWriteOnce(initial interface{}, doneCh <-chan struct{}, doneVal interface
|
|||||||
func (p *promise) Get() interface{} {
|
func (p *promise) Get() interface{} {
|
||||||
select {
|
select {
|
||||||
case <-p.setCh:
|
case <-p.setCh:
|
||||||
case <-p.doneCh:
|
case <-p.doneCtx.Done():
|
||||||
p.Set(p.doneVal)
|
p.Set(p.doneVal)
|
||||||
}
|
}
|
||||||
return p.value
|
return p.value
|
||||||
|
@ -28,7 +28,7 @@ func TestWriteOnceSet(t *testing.T) {
|
|||||||
oldTime := time.Now()
|
oldTime := time.Now()
|
||||||
cval := &oldTime
|
cval := &oldTime
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
wr := NewWriteOnce(nil, ctx.Done(), cval)
|
wr := NewWriteOnce(nil, ctx, cval)
|
||||||
gots := make(chan interface{})
|
gots := make(chan interface{})
|
||||||
goGetExpectNotYet(t, wr, gots, "Set")
|
goGetExpectNotYet(t, wr, gots, "Set")
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
@ -53,7 +53,7 @@ func TestWriteOnceCancel(t *testing.T) {
|
|||||||
oldTime := time.Now()
|
oldTime := time.Now()
|
||||||
cval := &oldTime
|
cval := &oldTime
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
wr := NewWriteOnce(nil, ctx.Done(), cval)
|
wr := NewWriteOnce(nil, ctx, cval)
|
||||||
gots := make(chan interface{})
|
gots := make(chan interface{})
|
||||||
goGetExpectNotYet(t, wr, gots, "cancel")
|
goGetExpectNotYet(t, wr, gots, "cancel")
|
||||||
cancel()
|
cancel()
|
||||||
@ -73,7 +73,7 @@ func TestWriteOnceInitial(t *testing.T) {
|
|||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
aval := &now
|
aval := &now
|
||||||
wr := NewWriteOnce(aval, ctx.Done(), cval)
|
wr := NewWriteOnce(aval, ctx, cval)
|
||||||
gots := make(chan interface{})
|
gots := make(chan interface{})
|
||||||
goGetAndExpect(t, wr, gots, aval)
|
goGetAndExpect(t, wr, gots, aval)
|
||||||
later := time.Now()
|
later := time.Now()
|
||||||
|
@ -53,7 +53,7 @@ type queueSetFactory struct {
|
|||||||
// - whose Set method is invoked with the queueSet locked, and
|
// - whose Set method is invoked with the queueSet locked, and
|
||||||
// - whose Get method is invoked with the queueSet not locked.
|
// - whose Get method is invoked with the queueSet not locked.
|
||||||
// The parameters are the same as for `promise.NewWriteOnce`.
|
// The parameters are the same as for `promise.NewWriteOnce`.
|
||||||
type promiseFactory func(initial interface{}, doneCh <-chan struct{}, doneVal interface{}) promise.WriteOnce
|
type promiseFactory func(initial interface{}, doneCtx context.Context, doneVal interface{}) promise.WriteOnce
|
||||||
|
|
||||||
// promiseFactoryFactory returns the promiseFactory to use for the given queueSet
|
// promiseFactoryFactory returns the promiseFactory to use for the given queueSet
|
||||||
type promiseFactoryFactory func(*queueSet) promiseFactory
|
type promiseFactoryFactory func(*queueSet) promiseFactory
|
||||||
@ -584,7 +584,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
|
|||||||
fsName: fsName,
|
fsName: fsName,
|
||||||
flowDistinguisher: flowDistinguisher,
|
flowDistinguisher: flowDistinguisher,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
decision: qs.promiseFactory(nil, ctx.Done(), decisionCancel),
|
decision: qs.promiseFactory(nil, ctx, decisionCancel),
|
||||||
arrivalTime: qs.clock.Now(),
|
arrivalTime: qs.clock.Now(),
|
||||||
arrivalR: qs.currentR,
|
arrivalR: qs.currentR,
|
||||||
queue: queue,
|
queue: queue,
|
||||||
@ -725,7 +725,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
|
|||||||
flowDistinguisher: flowDistinguisher,
|
flowDistinguisher: flowDistinguisher,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
startTime: now,
|
startTime: now,
|
||||||
decision: qs.promiseFactory(decisionExecute, ctx.Done(), decisionCancel),
|
decision: qs.promiseFactory(decisionExecute, ctx, decisionCancel),
|
||||||
arrivalTime: now,
|
arrivalTime: now,
|
||||||
arrivalR: qs.currentR,
|
arrivalR: qs.currentR,
|
||||||
descr1: descr1,
|
descr1: descr1,
|
||||||
|
@ -1223,8 +1223,8 @@ func TestContextCancel(t *testing.T) {
|
|||||||
|
|
||||||
func countingPromiseFactoryFactory(activeCounter counter.GoRoutineCounter) promiseFactoryFactory {
|
func countingPromiseFactoryFactory(activeCounter counter.GoRoutineCounter) promiseFactoryFactory {
|
||||||
return func(qs *queueSet) promiseFactory {
|
return func(qs *queueSet) promiseFactory {
|
||||||
return func(initial interface{}, doneCh <-chan struct{}, doneVal interface{}) promise.WriteOnce {
|
return func(initial interface{}, doneCtx context.Context, doneVal interface{}) promise.WriteOnce {
|
||||||
return testpromise.NewCountingWriteOnce(activeCounter, &qs.lock, initial, doneCh, doneVal)
|
return testpromise.NewCountingWriteOnce(activeCounter, &qs.lock, initial, doneCtx.Done(), doneVal)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user