mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
apf: add additional latency into width
This commit is contained in:
parent
96dff7d0c7
commit
24e1922910
@ -44,6 +44,7 @@ import (
|
|||||||
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
||||||
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
||||||
fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||||
|
fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
@ -69,7 +70,7 @@ const (
|
|||||||
decisionSkipFilter
|
decisionSkipFilter
|
||||||
)
|
)
|
||||||
|
|
||||||
var defaultRequestWidthEstimator = func(*http.Request) uint { return 1 }
|
var defaultRequestWidthEstimator = func(*http.Request) fcrequest.Width { return fcrequest.Width{Seats: 1} }
|
||||||
|
|
||||||
type fakeApfFilter struct {
|
type fakeApfFilter struct {
|
||||||
mockDecision mockDecision
|
mockDecision mockDecision
|
||||||
@ -586,13 +587,15 @@ func TestApfWithRequestDigest(t *testing.T) {
|
|||||||
reqDigestExpected := &utilflowcontrol.RequestDigest{
|
reqDigestExpected := &utilflowcontrol.RequestDigest{
|
||||||
RequestInfo: &apirequest.RequestInfo{Verb: "get"},
|
RequestInfo: &apirequest.RequestInfo{Verb: "get"},
|
||||||
User: &user.DefaultInfo{Name: "foo"},
|
User: &user.DefaultInfo{Name: "foo"},
|
||||||
Width: 5,
|
Width: fcrequest.Width{
|
||||||
|
Seats: 5,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
handler := WithPriorityAndFairness(http.HandlerFunc(func(_ http.ResponseWriter, req *http.Request) {}),
|
handler := WithPriorityAndFairness(http.HandlerFunc(func(_ http.ResponseWriter, req *http.Request) {}),
|
||||||
longRunningFunc,
|
longRunningFunc,
|
||||||
fakeFilter,
|
fakeFilter,
|
||||||
func(_ *http.Request) uint { return reqDigestExpected.Width },
|
func(_ *http.Request) fcrequest.Width { return reqDigestExpected.Width },
|
||||||
)
|
)
|
||||||
|
|
||||||
w := httptest.NewRecorder()
|
w := httptest.NewRecorder()
|
||||||
|
@ -46,6 +46,7 @@ import (
|
|||||||
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
||||||
fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
|
fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
|
||||||
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||||
|
fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/client-go/util/workqueue"
|
"k8s.io/client-go/util/workqueue"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
@ -81,7 +82,7 @@ type StartFunction func(ctx context.Context, hashValue uint64) (execute bool, af
|
|||||||
type RequestDigest struct {
|
type RequestDigest struct {
|
||||||
RequestInfo *request.RequestInfo
|
RequestInfo *request.RequestInfo
|
||||||
User user.Info
|
User user.Info
|
||||||
Width uint
|
Width fcrequest.Width
|
||||||
}
|
}
|
||||||
|
|
||||||
// `*configController` maintains eventual consistency with the API
|
// `*configController` maintains eventual consistency with the API
|
||||||
@ -804,7 +805,7 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig
|
|||||||
}
|
}
|
||||||
startWaitingTime = time.Now()
|
startWaitingTime = time.Now()
|
||||||
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues)
|
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues)
|
||||||
req, idle := plState.queues.StartRequest(ctx, rd.Width, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn)
|
req, idle := plState.queues.StartRequest(ctx, &rd.Width, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn)
|
||||||
if idle {
|
if idle {
|
||||||
cfgCtlr.maybeReapLocked(plName, plState)
|
cfgCtlr.maybeReapLocked(plName, plState)
|
||||||
}
|
}
|
||||||
|
@ -36,6 +36,7 @@ import (
|
|||||||
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
||||||
fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
|
fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
|
||||||
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||||
|
fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
clientsetfake "k8s.io/client-go/kubernetes/fake"
|
clientsetfake "k8s.io/client-go/kubernetes/fake"
|
||||||
fcclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta1"
|
fcclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta1"
|
||||||
@ -139,7 +140,7 @@ func (cqs *ctlrTestQueueSet) IsIdle() bool {
|
|||||||
return cqs.countActive == 0
|
return cqs.countActive == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cqs *ctlrTestQueueSet) StartRequest(ctx context.Context, width uint, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (req fq.Request, idle bool) {
|
func (cqs *ctlrTestQueueSet) StartRequest(ctx context.Context, width *fcrequest.Width, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (req fq.Request, idle bool) {
|
||||||
cqs.cts.lock.Lock()
|
cqs.cts.lock.Lock()
|
||||||
defer cqs.cts.lock.Unlock()
|
defer cqs.cts.lock.Unlock()
|
||||||
cqs.countActive++
|
cqs.countActive++
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
|
|
||||||
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
|
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
|
||||||
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||||
|
"k8s.io/apiserver/pkg/util/flowcontrol/request"
|
||||||
)
|
)
|
||||||
|
|
||||||
// QueueSetFactory is used to create QueueSet objects. Creation, like
|
// QueueSetFactory is used to create QueueSet objects. Creation, like
|
||||||
@ -80,7 +81,7 @@ type QueueSet interface {
|
|||||||
// was idle at the moment of the return. Otherwise idle==false
|
// was idle at the moment of the return. Otherwise idle==false
|
||||||
// and the client must call the Finish method of the Request
|
// and the client must call the Finish method of the Request
|
||||||
// exactly once.
|
// exactly once.
|
||||||
StartRequest(ctx context.Context, width uint, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn QueueNoteFn) (req Request, idle bool)
|
StartRequest(ctx context.Context, width *request.Width, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn QueueNoteFn) (req Request, idle bool)
|
||||||
|
|
||||||
// UpdateObservations makes sure any time-based statistics have
|
// UpdateObservations makes sure any time-based statistics have
|
||||||
// caught up with the current clock reading
|
// caught up with the current clock reading
|
||||||
|
@ -30,6 +30,7 @@ import (
|
|||||||
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
||||||
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise"
|
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise"
|
||||||
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||||
|
fqrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
|
||||||
"k8s.io/apiserver/pkg/util/shufflesharding"
|
"k8s.io/apiserver/pkg/util/shufflesharding"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
)
|
)
|
||||||
@ -234,7 +235,7 @@ const (
|
|||||||
// executing at each point where there is a change in that quantity,
|
// executing at each point where there is a change in that quantity,
|
||||||
// because the metrics --- and only the metrics --- track that
|
// because the metrics --- and only the metrics --- track that
|
||||||
// quantity per FlowSchema.
|
// quantity per FlowSchema.
|
||||||
func (qs *queueSet) StartRequest(ctx context.Context, width uint, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) {
|
func (qs *queueSet) StartRequest(ctx context.Context, width *fqrequest.Width, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) {
|
||||||
qs.lockAndSyncTime()
|
qs.lockAndSyncTime()
|
||||||
defer qs.lock.Unlock()
|
defer qs.lock.Unlock()
|
||||||
var req *request
|
var req *request
|
||||||
@ -243,7 +244,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, width uint, hashValue uint
|
|||||||
// Step 0:
|
// Step 0:
|
||||||
// Apply only concurrency limit, if zero queues desired
|
// Apply only concurrency limit, if zero queues desired
|
||||||
if qs.qCfg.DesiredNumQueues < 1 {
|
if qs.qCfg.DesiredNumQueues < 1 {
|
||||||
if !qs.canAccommodateSeatsLocked(int(width)) {
|
if !qs.canAccommodateSeatsLocked(int(width.Seats)) {
|
||||||
klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d seats are asked for, %d seats are in use (%d are executing) and the limit is %d",
|
klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d seats are asked for, %d seats are in use (%d are executing) and the limit is %d",
|
||||||
qs.qCfg.Name, fsName, descr1, descr2, width, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit)
|
qs.qCfg.Name, fsName, descr1, descr2, width, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit)
|
||||||
metrics.AddReject(ctx, qs.qCfg.Name, fsName, "concurrency-limit")
|
metrics.AddReject(ctx, qs.qCfg.Name, fsName, "concurrency-limit")
|
||||||
@ -315,7 +316,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, width uint, hashValue uint
|
|||||||
|
|
||||||
// Seats returns the number of seats this request requires.
|
// Seats returns the number of seats this request requires.
|
||||||
func (req *request) Seats() int {
|
func (req *request) Seats() int {
|
||||||
return int(req.width)
|
return int(req.width.Seats)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (req *request) NoteQueued(inQueue bool) {
|
func (req *request) NoteQueued(inQueue bool) {
|
||||||
@ -436,7 +437,7 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 {
|
|||||||
// returns the enqueud request on a successful enqueue
|
// returns the enqueud request on a successful enqueue
|
||||||
// returns nil in the case that there is no available concurrency or
|
// returns nil in the case that there is no available concurrency or
|
||||||
// the queuelengthlimit has been reached
|
// the queuelengthlimit has been reached
|
||||||
func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, width uint, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request {
|
func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, width *fqrequest.Width, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request {
|
||||||
// Start with the shuffle sharding, to pick a queue.
|
// Start with the shuffle sharding, to pick a queue.
|
||||||
queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2)
|
queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2)
|
||||||
queue := qs.queues[queueIdx]
|
queue := qs.queues[queueIdx]
|
||||||
@ -458,7 +459,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
|
|||||||
descr1: descr1,
|
descr1: descr1,
|
||||||
descr2: descr2,
|
descr2: descr2,
|
||||||
queueNoteFn: queueNoteFn,
|
queueNoteFn: queueNoteFn,
|
||||||
width: width,
|
width: *width,
|
||||||
}
|
}
|
||||||
if ok := qs.rejectOrEnqueueLocked(req); !ok {
|
if ok := qs.rejectOrEnqueueLocked(req); !ok {
|
||||||
return nil
|
return nil
|
||||||
@ -574,7 +575,7 @@ func (qs *queueSet) dispatchAsMuchAsPossibleLocked() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, width uint, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request {
|
func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, width *fqrequest.Width, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request {
|
||||||
// does not call metrics.SetDispatchMetrics because there is no queuing and thus no interesting virtual world
|
// does not call metrics.SetDispatchMetrics because there is no queuing and thus no interesting virtual world
|
||||||
now := qs.clock.Now()
|
now := qs.clock.Now()
|
||||||
req := &request{
|
req := &request{
|
||||||
@ -587,7 +588,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, width uint, flo
|
|||||||
arrivalTime: now,
|
arrivalTime: now,
|
||||||
descr1: descr1,
|
descr1: descr1,
|
||||||
descr2: descr2,
|
descr2: descr2,
|
||||||
width: width,
|
width: *width,
|
||||||
}
|
}
|
||||||
req.decision.SetLocked(decisionExecute)
|
req.decision.SetLocked(decisionExecute)
|
||||||
qs.totRequestsExecuting++
|
qs.totRequestsExecuting++
|
||||||
|
@ -33,6 +33,7 @@ import (
|
|||||||
test "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing"
|
test "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing"
|
||||||
testclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock"
|
testclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock"
|
||||||
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||||
|
fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -226,7 +227,7 @@ func (ust *uniformScenarioThread) callK(k int) {
|
|||||||
if k >= ust.nCalls {
|
if k >= ust.nCalls {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
req, idle := ust.uss.qs.StartRequest(context.Background(), 1, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil)
|
req, idle := ust.uss.qs.StartRequest(context.Background(), &fcrequest.Width{Seats: 1}, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil)
|
||||||
ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle)
|
ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle)
|
||||||
if req == nil {
|
if req == nil {
|
||||||
atomic.AddUint64(&ust.uss.failedCount, 1)
|
atomic.AddUint64(&ust.uss.failedCount, 1)
|
||||||
@ -671,7 +672,7 @@ func TestContextCancel(t *testing.T) {
|
|||||||
ctx1 := context.Background()
|
ctx1 := context.Background()
|
||||||
b2i := map[bool]int{false: 0, true: 1}
|
b2i := map[bool]int{false: 0, true: 1}
|
||||||
var qnc [2][2]int32
|
var qnc [2][2]int32
|
||||||
req1, _ := qs.StartRequest(ctx1, 1, 1, "", "fs1", "test", "one", func(inQueue bool) { atomic.AddInt32(&qnc[0][b2i[inQueue]], 1) })
|
req1, _ := qs.StartRequest(ctx1, &fcrequest.Width{Seats: 1}, 1, "", "fs1", "test", "one", func(inQueue bool) { atomic.AddInt32(&qnc[0][b2i[inQueue]], 1) })
|
||||||
if req1 == nil {
|
if req1 == nil {
|
||||||
t.Error("Request rejected")
|
t.Error("Request rejected")
|
||||||
return
|
return
|
||||||
@ -699,7 +700,7 @@ func TestContextCancel(t *testing.T) {
|
|||||||
counter.Add(1)
|
counter.Add(1)
|
||||||
cancel2()
|
cancel2()
|
||||||
}()
|
}()
|
||||||
req2, idle2a := qs.StartRequest(ctx2, 1, 2, "", "fs2", "test", "two", func(inQueue bool) { atomic.AddInt32(&qnc[1][b2i[inQueue]], 1) })
|
req2, idle2a := qs.StartRequest(ctx2, &fcrequest.Width{Seats: 1}, 2, "", "fs2", "test", "two", func(inQueue bool) { atomic.AddInt32(&qnc[1][b2i[inQueue]], 1) })
|
||||||
if idle2a {
|
if idle2a {
|
||||||
t.Error("2nd StartRequest returned idle")
|
t.Error("2nd StartRequest returned idle")
|
||||||
}
|
}
|
||||||
@ -758,7 +759,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
req, _ := qs.StartRequest(ctx, 1, 1, "", "fs", "test", "one", func(inQueue bool) {})
|
req, _ := qs.StartRequest(ctx, &fcrequest.Width{Seats: 1}, 1, "", "fs", "test", "one", func(inQueue bool) {})
|
||||||
if req == nil {
|
if req == nil {
|
||||||
t.Fatal("expected a Request object from StartRequest, but got nil")
|
t.Fatal("expected a Request object from StartRequest, but got nil")
|
||||||
}
|
}
|
||||||
@ -811,13 +812,13 @@ func TestSelectQueueLocked(t *testing.T) {
|
|||||||
{
|
{
|
||||||
virtualStart: 200,
|
virtualStart: 200,
|
||||||
requests: newFIFO(
|
requests: newFIFO(
|
||||||
&request{width: 1},
|
&request{width: fcrequest.Width{Seats: 1}},
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
virtualStart: 100,
|
virtualStart: 100,
|
||||||
requests: newFIFO(
|
requests: newFIFO(
|
||||||
&request{width: 1},
|
&request{width: fcrequest.Width{Seats: 1}},
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -834,7 +835,7 @@ func TestSelectQueueLocked(t *testing.T) {
|
|||||||
{
|
{
|
||||||
virtualStart: 200,
|
virtualStart: 200,
|
||||||
requests: newFIFO(
|
requests: newFIFO(
|
||||||
&request{width: 1},
|
&request{width: fcrequest.Width{Seats: 1}},
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -851,13 +852,13 @@ func TestSelectQueueLocked(t *testing.T) {
|
|||||||
{
|
{
|
||||||
virtualStart: 200,
|
virtualStart: 200,
|
||||||
requests: newFIFO(
|
requests: newFIFO(
|
||||||
&request{width: 50},
|
&request{width: fcrequest.Width{Seats: 50}},
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
virtualStart: 100,
|
virtualStart: 100,
|
||||||
requests: newFIFO(
|
requests: newFIFO(
|
||||||
&request{width: 25},
|
&request{width: fcrequest.Width{Seats: 25}},
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -874,13 +875,13 @@ func TestSelectQueueLocked(t *testing.T) {
|
|||||||
{
|
{
|
||||||
virtualStart: 200,
|
virtualStart: 200,
|
||||||
requests: newFIFO(
|
requests: newFIFO(
|
||||||
&request{width: 10},
|
&request{width: fcrequest.Width{Seats: 10}},
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
virtualStart: 100,
|
virtualStart: 100,
|
||||||
requests: newFIFO(
|
requests: newFIFO(
|
||||||
&request{width: 25},
|
&request{width: fcrequest.Width{Seats: 25}},
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -897,13 +898,13 @@ func TestSelectQueueLocked(t *testing.T) {
|
|||||||
{
|
{
|
||||||
virtualStart: 200,
|
virtualStart: 200,
|
||||||
requests: newFIFO(
|
requests: newFIFO(
|
||||||
&request{width: 10},
|
&request{width: fcrequest.Width{Seats: 10}},
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
virtualStart: 100,
|
virtualStart: 100,
|
||||||
requests: newFIFO(
|
requests: newFIFO(
|
||||||
&request{width: 25},
|
&request{width: fcrequest.Width{Seats: 25}},
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
|
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
|
||||||
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
||||||
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise"
|
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise"
|
||||||
|
fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
|
||||||
)
|
)
|
||||||
|
|
||||||
// request is a temporary container for "requests" with additional
|
// request is a temporary container for "requests" with additional
|
||||||
@ -44,7 +45,7 @@ type request struct {
|
|||||||
startTime time.Time
|
startTime time.Time
|
||||||
|
|
||||||
// width of the request
|
// width of the request
|
||||||
width uint
|
width fcrequest.Width
|
||||||
|
|
||||||
// decision gets set to a `requestDecision` indicating what to do
|
// decision gets set to a `requestDecision` indicating what to do
|
||||||
// with this request. It gets set exactly once, when the request
|
// with this request. It gets set exactly once, when the request
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
|
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
|
||||||
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
||||||
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||||
|
fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewNoRestraintFactory makes a QueueSetFactory that produces
|
// NewNoRestraintFactory makes a QueueSetFactory that produces
|
||||||
@ -55,7 +56,7 @@ func (noRestraint) IsIdle() bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (noRestraint) StartRequest(ctx context.Context, width uint, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) {
|
func (noRestraint) StartRequest(ctx context.Context, width *fcrequest.Width, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) {
|
||||||
return noRestraintRequest{}, false
|
return noRestraintRequest{}, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,6 +27,7 @@ import (
|
|||||||
"k8s.io/apiserver/pkg/authentication/user"
|
"k8s.io/apiserver/pkg/authentication/user"
|
||||||
"k8s.io/apiserver/pkg/endpoints/request"
|
"k8s.io/apiserver/pkg/endpoints/request"
|
||||||
fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
|
fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
|
||||||
|
fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMatching(t *testing.T) {
|
func TestMatching(t *testing.T) {
|
||||||
@ -102,7 +103,7 @@ func TestLiterals(t *testing.T) {
|
|||||||
Parts: []string{"goodrscs", "eman"},
|
Parts: []string{"goodrscs", "eman"},
|
||||||
},
|
},
|
||||||
User: ui,
|
User: ui,
|
||||||
Width: 1,
|
Width: fcrequest.Width{Seats: 1},
|
||||||
}
|
}
|
||||||
reqRU := RequestDigest{
|
reqRU := RequestDigest{
|
||||||
RequestInfo: &request.RequestInfo{
|
RequestInfo: &request.RequestInfo{
|
||||||
@ -118,7 +119,7 @@ func TestLiterals(t *testing.T) {
|
|||||||
Parts: []string{"goodrscs", "eman"},
|
Parts: []string{"goodrscs", "eman"},
|
||||||
},
|
},
|
||||||
User: ui,
|
User: ui,
|
||||||
Width: 1,
|
Width: fcrequest.Width{Seats: 1},
|
||||||
}
|
}
|
||||||
reqN := RequestDigest{
|
reqN := RequestDigest{
|
||||||
RequestInfo: &request.RequestInfo{
|
RequestInfo: &request.RequestInfo{
|
||||||
@ -127,7 +128,7 @@ func TestLiterals(t *testing.T) {
|
|||||||
Verb: "goodverb",
|
Verb: "goodverb",
|
||||||
},
|
},
|
||||||
User: ui,
|
User: ui,
|
||||||
Width: 1,
|
Width: fcrequest.Width{Seats: 1},
|
||||||
}
|
}
|
||||||
checkRules(t, true, reqRN, []flowcontrol.PolicyRulesWithSubjects{{
|
checkRules(t, true, reqRN, []flowcontrol.PolicyRulesWithSubjects{{
|
||||||
Subjects: []flowcontrol.Subject{{Kind: flowcontrol.SubjectKindUser,
|
Subjects: []flowcontrol.Subject{{Kind: flowcontrol.SubjectKindUser,
|
||||||
|
@ -20,21 +20,28 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type Width struct {
|
||||||
|
// Seats represents the number of seats associated with this request
|
||||||
|
Seats uint
|
||||||
|
}
|
||||||
|
|
||||||
// DefaultWidthEstimator returns returns '1' as the "width"
|
// DefaultWidthEstimator returns returns '1' as the "width"
|
||||||
// of the given request.
|
// of the given request.
|
||||||
//
|
//
|
||||||
// TODO: when we plumb in actual "width" handling for different
|
// TODO: when we plumb in actual "width" handling for different
|
||||||
// type of request(s) this function will iterate through a chain
|
// type of request(s) this function will iterate through a chain
|
||||||
// of widthEstimator instance(s).
|
// of widthEstimator instance(s).
|
||||||
func DefaultWidthEstimator(_ *http.Request) uint {
|
func DefaultWidthEstimator(_ *http.Request) Width {
|
||||||
return 1
|
return Width{
|
||||||
|
Seats: 1,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WidthEstimatorFunc returns the estimated "width" of a given request.
|
// WidthEstimatorFunc returns the estimated "width" of a given request.
|
||||||
// This function will be used by the Priority & Fairness filter to
|
// This function will be used by the Priority & Fairness filter to
|
||||||
// estimate the "width" of incoming requests.
|
// estimate the "width" of incoming requests.
|
||||||
type WidthEstimatorFunc func(*http.Request) uint
|
type WidthEstimatorFunc func(*http.Request) Width
|
||||||
|
|
||||||
func (e WidthEstimatorFunc) EstimateWidth(r *http.Request) uint {
|
func (e WidthEstimatorFunc) EstimateWidth(r *http.Request) Width {
|
||||||
return e(r)
|
return e(r)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user