From 24e19229101d242d924ce98a562be3864dde9eae Mon Sep 17 00:00:00 2001 From: Abu Kashem Date: Sun, 27 Jun 2021 12:45:24 -0400 Subject: [PATCH] apf: add additional latency into width --- .../filters/priority-and-fairness_test.go | 9 ++++--- .../pkg/util/flowcontrol/apf_controller.go | 5 ++-- .../pkg/util/flowcontrol/controller_test.go | 3 ++- .../util/flowcontrol/fairqueuing/interface.go | 3 ++- .../fairqueuing/queueset/queueset.go | 15 ++++++----- .../fairqueuing/queueset/queueset_test.go | 27 ++++++++++--------- .../flowcontrol/fairqueuing/queueset/types.go | 3 ++- .../fairqueuing/testing/no-restraint.go | 3 ++- .../pkg/util/flowcontrol/match_test.go | 7 ++--- .../pkg/util/flowcontrol/request/width.go | 15 ++++++++--- 10 files changed, 54 insertions(+), 36 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index d94b18d40df..166c8fdde6a 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -44,6 +44,7 @@ import ( utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics" + fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" @@ -69,7 +70,7 @@ const ( decisionSkipFilter ) -var defaultRequestWidthEstimator = func(*http.Request) uint { return 1 } +var defaultRequestWidthEstimator = func(*http.Request) fcrequest.Width { return fcrequest.Width{Seats: 1} } type fakeApfFilter struct { mockDecision mockDecision @@ -586,13 +587,15 @@ func TestApfWithRequestDigest(t *testing.T) { reqDigestExpected := &utilflowcontrol.RequestDigest{ RequestInfo: &apirequest.RequestInfo{Verb: "get"}, User: &user.DefaultInfo{Name: "foo"}, - Width: 5, + Width: fcrequest.Width{ + Seats: 5, + }, } handler := WithPriorityAndFairness(http.HandlerFunc(func(_ http.ResponseWriter, req *http.Request) {}), longRunningFunc, fakeFilter, - func(_ *http.Request) uint { return reqDigestExpected.Width }, + func(_ *http.Request) fcrequest.Width { return reqDigestExpected.Width }, ) w := httptest.NewRecorder() diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index 722be7300f3..90228584c6b 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -46,6 +46,7 @@ import ( fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format" "k8s.io/apiserver/pkg/util/flowcontrol/metrics" + fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" @@ -81,7 +82,7 @@ type StartFunction func(ctx context.Context, hashValue uint64) (execute bool, af type RequestDigest struct { RequestInfo *request.RequestInfo User user.Info - Width uint + Width fcrequest.Width } // `*configController` maintains eventual consistency with the API @@ -804,7 +805,7 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig } startWaitingTime = time.Now() klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues) - req, idle := plState.queues.StartRequest(ctx, rd.Width, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn) + req, idle := plState.queues.StartRequest(ctx, &rd.Width, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn) if idle { cfgCtlr.maybeReapLocked(plName, plState) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go index cf0280d3c02..08b7722b0cd 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go @@ -36,6 +36,7 @@ import ( fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format" "k8s.io/apiserver/pkg/util/flowcontrol/metrics" + fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" "k8s.io/client-go/informers" clientsetfake "k8s.io/client-go/kubernetes/fake" fcclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta1" @@ -139,7 +140,7 @@ func (cqs *ctlrTestQueueSet) IsIdle() bool { return cqs.countActive == 0 } -func (cqs *ctlrTestQueueSet) StartRequest(ctx context.Context, width uint, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (req fq.Request, idle bool) { +func (cqs *ctlrTestQueueSet) StartRequest(ctx context.Context, width *fcrequest.Width, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (req fq.Request, idle bool) { cqs.cts.lock.Lock() defer cqs.cts.lock.Unlock() cqs.countActive++ diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go index 1865daeef12..2de44c350d1 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go @@ -22,6 +22,7 @@ import ( "k8s.io/apiserver/pkg/util/flowcontrol/debug" "k8s.io/apiserver/pkg/util/flowcontrol/metrics" + "k8s.io/apiserver/pkg/util/flowcontrol/request" ) // QueueSetFactory is used to create QueueSet objects. Creation, like @@ -80,7 +81,7 @@ type QueueSet interface { // was idle at the moment of the return. Otherwise idle==false // and the client must call the Finish method of the Request // exactly once. - StartRequest(ctx context.Context, width uint, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn QueueNoteFn) (req Request, idle bool) + StartRequest(ctx context.Context, width *request.Width, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn QueueNoteFn) (req Request, idle bool) // UpdateObservations makes sure any time-based statistics have // caught up with the current clock reading diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index fb9c2c4429f..c2275c0a393 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -30,6 +30,7 @@ import ( fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise" "k8s.io/apiserver/pkg/util/flowcontrol/metrics" + fqrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" "k8s.io/apiserver/pkg/util/shufflesharding" "k8s.io/klog/v2" ) @@ -234,7 +235,7 @@ const ( // executing at each point where there is a change in that quantity, // because the metrics --- and only the metrics --- track that // quantity per FlowSchema. -func (qs *queueSet) StartRequest(ctx context.Context, width uint, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) { +func (qs *queueSet) StartRequest(ctx context.Context, width *fqrequest.Width, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) { qs.lockAndSyncTime() defer qs.lock.Unlock() var req *request @@ -243,7 +244,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, width uint, hashValue uint // Step 0: // Apply only concurrency limit, if zero queues desired if qs.qCfg.DesiredNumQueues < 1 { - if !qs.canAccommodateSeatsLocked(int(width)) { + if !qs.canAccommodateSeatsLocked(int(width.Seats)) { klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d seats are asked for, %d seats are in use (%d are executing) and the limit is %d", qs.qCfg.Name, fsName, descr1, descr2, width, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit) metrics.AddReject(ctx, qs.qCfg.Name, fsName, "concurrency-limit") @@ -315,7 +316,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, width uint, hashValue uint // Seats returns the number of seats this request requires. func (req *request) Seats() int { - return int(req.width) + return int(req.width.Seats) } func (req *request) NoteQueued(inQueue bool) { @@ -436,7 +437,7 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 { // returns the enqueud request on a successful enqueue // returns nil in the case that there is no available concurrency or // the queuelengthlimit has been reached -func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, width uint, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request { +func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, width *fqrequest.Width, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request { // Start with the shuffle sharding, to pick a queue. queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2) queue := qs.queues[queueIdx] @@ -458,7 +459,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte descr1: descr1, descr2: descr2, queueNoteFn: queueNoteFn, - width: width, + width: *width, } if ok := qs.rejectOrEnqueueLocked(req); !ok { return nil @@ -574,7 +575,7 @@ func (qs *queueSet) dispatchAsMuchAsPossibleLocked() { } } -func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, width uint, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request { +func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, width *fqrequest.Width, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request { // does not call metrics.SetDispatchMetrics because there is no queuing and thus no interesting virtual world now := qs.clock.Now() req := &request{ @@ -587,7 +588,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, width uint, flo arrivalTime: now, descr1: descr1, descr2: descr2, - width: width, + width: *width, } req.decision.SetLocked(decisionExecute) qs.totRequestsExecuting++ diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 1edf8500056..3e85daaba32 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -33,6 +33,7 @@ import ( test "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing" testclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock" "k8s.io/apiserver/pkg/util/flowcontrol/metrics" + fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" "k8s.io/klog/v2" ) @@ -226,7 +227,7 @@ func (ust *uniformScenarioThread) callK(k int) { if k >= ust.nCalls { return } - req, idle := ust.uss.qs.StartRequest(context.Background(), 1, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil) + req, idle := ust.uss.qs.StartRequest(context.Background(), &fcrequest.Width{Seats: 1}, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil) ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle) if req == nil { atomic.AddUint64(&ust.uss.failedCount, 1) @@ -671,7 +672,7 @@ func TestContextCancel(t *testing.T) { ctx1 := context.Background() b2i := map[bool]int{false: 0, true: 1} var qnc [2][2]int32 - req1, _ := qs.StartRequest(ctx1, 1, 1, "", "fs1", "test", "one", func(inQueue bool) { atomic.AddInt32(&qnc[0][b2i[inQueue]], 1) }) + req1, _ := qs.StartRequest(ctx1, &fcrequest.Width{Seats: 1}, 1, "", "fs1", "test", "one", func(inQueue bool) { atomic.AddInt32(&qnc[0][b2i[inQueue]], 1) }) if req1 == nil { t.Error("Request rejected") return @@ -699,7 +700,7 @@ func TestContextCancel(t *testing.T) { counter.Add(1) cancel2() }() - req2, idle2a := qs.StartRequest(ctx2, 1, 2, "", "fs2", "test", "two", func(inQueue bool) { atomic.AddInt32(&qnc[1][b2i[inQueue]], 1) }) + req2, idle2a := qs.StartRequest(ctx2, &fcrequest.Width{Seats: 1}, 2, "", "fs2", "test", "two", func(inQueue bool) { atomic.AddInt32(&qnc[1][b2i[inQueue]], 1) }) if idle2a { t.Error("2nd StartRequest returned idle") } @@ -758,7 +759,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) { } ctx := context.Background() - req, _ := qs.StartRequest(ctx, 1, 1, "", "fs", "test", "one", func(inQueue bool) {}) + req, _ := qs.StartRequest(ctx, &fcrequest.Width{Seats: 1}, 1, "", "fs", "test", "one", func(inQueue bool) {}) if req == nil { t.Fatal("expected a Request object from StartRequest, but got nil") } @@ -811,13 +812,13 @@ func TestSelectQueueLocked(t *testing.T) { { virtualStart: 200, requests: newFIFO( - &request{width: 1}, + &request{width: fcrequest.Width{Seats: 1}}, ), }, { virtualStart: 100, requests: newFIFO( - &request{width: 1}, + &request{width: fcrequest.Width{Seats: 1}}, ), }, }, @@ -834,7 +835,7 @@ func TestSelectQueueLocked(t *testing.T) { { virtualStart: 200, requests: newFIFO( - &request{width: 1}, + &request{width: fcrequest.Width{Seats: 1}}, ), }, }, @@ -851,13 +852,13 @@ func TestSelectQueueLocked(t *testing.T) { { virtualStart: 200, requests: newFIFO( - &request{width: 50}, + &request{width: fcrequest.Width{Seats: 50}}, ), }, { virtualStart: 100, requests: newFIFO( - &request{width: 25}, + &request{width: fcrequest.Width{Seats: 25}}, ), }, }, @@ -874,13 +875,13 @@ func TestSelectQueueLocked(t *testing.T) { { virtualStart: 200, requests: newFIFO( - &request{width: 10}, + &request{width: fcrequest.Width{Seats: 10}}, ), }, { virtualStart: 100, requests: newFIFO( - &request{width: 25}, + &request{width: fcrequest.Width{Seats: 25}}, ), }, }, @@ -897,13 +898,13 @@ func TestSelectQueueLocked(t *testing.T) { { virtualStart: 200, requests: newFIFO( - &request{width: 10}, + &request{width: fcrequest.Width{Seats: 10}}, ), }, { virtualStart: 100, requests: newFIFO( - &request{width: 25}, + &request{width: fcrequest.Width{Seats: 25}}, ), }, }, diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index 92365c63b53..7f121d707aa 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -24,6 +24,7 @@ import ( "k8s.io/apiserver/pkg/util/flowcontrol/debug" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise" + fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" ) // request is a temporary container for "requests" with additional @@ -44,7 +45,7 @@ type request struct { startTime time.Time // width of the request - width uint + width fcrequest.Width // decision gets set to a `requestDecision` indicating what to do // with this request. It gets set exactly once, when the request diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go index f711126c4df..dadd2f5b1d7 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go @@ -22,6 +22,7 @@ import ( "k8s.io/apiserver/pkg/util/flowcontrol/debug" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" "k8s.io/apiserver/pkg/util/flowcontrol/metrics" + fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" ) // NewNoRestraintFactory makes a QueueSetFactory that produces @@ -55,7 +56,7 @@ func (noRestraint) IsIdle() bool { return false } -func (noRestraint) StartRequest(ctx context.Context, width uint, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) { +func (noRestraint) StartRequest(ctx context.Context, width *fcrequest.Width, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) { return noRestraintRequest{}, false } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/match_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/match_test.go index f82085f6087..936f10f0fda 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/match_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/match_test.go @@ -27,6 +27,7 @@ import ( "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints/request" fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format" + fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" ) func TestMatching(t *testing.T) { @@ -102,7 +103,7 @@ func TestLiterals(t *testing.T) { Parts: []string{"goodrscs", "eman"}, }, User: ui, - Width: 1, + Width: fcrequest.Width{Seats: 1}, } reqRU := RequestDigest{ RequestInfo: &request.RequestInfo{ @@ -118,7 +119,7 @@ func TestLiterals(t *testing.T) { Parts: []string{"goodrscs", "eman"}, }, User: ui, - Width: 1, + Width: fcrequest.Width{Seats: 1}, } reqN := RequestDigest{ RequestInfo: &request.RequestInfo{ @@ -127,7 +128,7 @@ func TestLiterals(t *testing.T) { Verb: "goodverb", }, User: ui, - Width: 1, + Width: fcrequest.Width{Seats: 1}, } checkRules(t, true, reqRN, []flowcontrol.PolicyRulesWithSubjects{{ Subjects: []flowcontrol.Subject{{Kind: flowcontrol.SubjectKindUser, diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go index 6fc65b5d5fc..c5fa478a711 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go @@ -20,21 +20,28 @@ import ( "net/http" ) +type Width struct { + // Seats represents the number of seats associated with this request + Seats uint +} + // DefaultWidthEstimator returns returns '1' as the "width" // of the given request. // // TODO: when we plumb in actual "width" handling for different // type of request(s) this function will iterate through a chain // of widthEstimator instance(s). -func DefaultWidthEstimator(_ *http.Request) uint { - return 1 +func DefaultWidthEstimator(_ *http.Request) Width { + return Width{ + Seats: 1, + } } // WidthEstimatorFunc returns the estimated "width" of a given request. // This function will be used by the Priority & Fairness filter to // estimate the "width" of incoming requests. -type WidthEstimatorFunc func(*http.Request) uint +type WidthEstimatorFunc func(*http.Request) Width -func (e WidthEstimatorFunc) EstimateWidth(r *http.Request) uint { +func (e WidthEstimatorFunc) EstimateWidth(r *http.Request) Width { return e(r) }