From f535a9c9ed4b6a0def47c354acad0ac2a8f961b0 Mon Sep 17 00:00:00 2001 From: Mike Spreitzer Date: Sun, 1 Mar 2020 20:22:58 -0500 Subject: [PATCH 1/3] Make some metrics finer-grained, add dispatch counts, note immediate reject Also add testing of metrics for queuesets. --- .../pkg/util/flowcontrol/apf_controller.go | 2 +- .../pkg/util/flowcontrol/apf_filter.go | 8 +- .../pkg/util/flowcontrol/controller_test.go | 4 +- .../util/flowcontrol/fairqueuing/interface.go | 2 +- .../flowcontrol/fairqueuing/queueset/BUILD | 1 + .../fairqueuing/queueset/queueset.go | 49 ++++++----- .../fairqueuing/queueset/queueset_test.go | 83 +++++++++++++++---- .../flowcontrol/fairqueuing/queueset/types.go | 5 +- .../fairqueuing/testing/no-restraint.go | 2 +- .../pkg/util/flowcontrol/metrics/BUILD | 1 + .../pkg/util/flowcontrol/metrics/metrics.go | 76 ++++++++++++----- 11 files changed, 162 insertions(+), 71 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index 918ea99386a..6e3280a0559 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -648,7 +648,7 @@ func (cfgCtl *configController) startRequest(ctx context.Context, rd RequestDige } startWaitingTime = time.Now() klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, fs.Name, fs.Spec.DistinguisherMethod, plName, numQueues) - req, idle := plState.queues.StartRequest(ctx, hashValue, rd.RequestInfo, rd.User) + req, idle := plState.queues.StartRequest(ctx, hashValue, fs.Name, rd.RequestInfo, rd.User) if idle { cfgCtl.maybeReapLocked(plName, plState) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go index 0a3cae0e037..812ecaf2d24 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go @@ -21,13 +21,6 @@ import ( "strconv" "time" - // TODO: decide whether to use the existing metrics, which - // categorize according to mutating vs readonly, or make new - // metrics because this filter does not pay attention to that - // distinction - - // "k8s.io/apiserver/pkg/endpoints/metrics" - "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apiserver/pkg/util/flowcontrol/counter" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" @@ -109,6 +102,7 @@ func (cfgCtl *configController) Handle(ctx context.Context, requestDigest Reques if queued { metrics.ObserveWaitingDuration(pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) } + metrics.AddDispatch(pl.Name, fs.Name) executed = true startExecutionTime := time.Now() execFn() diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go index e54e5b397fd..abd334379b1 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go @@ -115,11 +115,11 @@ func (cqs *ctlTestQueueSet) IsIdle() bool { return cqs.countActive == 0 } -func (cqs *ctlTestQueueSet) StartRequest(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (req fq.Request, idle bool) { +func (cqs *ctlTestQueueSet) StartRequest(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) (req fq.Request, idle bool) { cqs.cts.lock.Lock() defer cqs.cts.lock.Unlock() cqs.countActive++ - cqs.cts.t.Logf("Queued %#+v %#+v for %p QS=%s, countActive:=%d", descr1, descr2, cqs, cqs.qc.Name, cqs.countActive) + cqs.cts.t.Logf("Queued %q %#+v %#+v for %p QS=%s, countActive:=%d", fsName, descr1, descr2, cqs, cqs.qc.Name, cqs.countActive) return &ctlTestRequest{cqs, cqs.qc.Name, descr1, descr2}, false } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go index 9d4dd27de57..c989ccc9fab 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go @@ -76,7 +76,7 @@ type QueueSet interface { // returned bool indicates whether the QueueSet was idle at the // moment of the return. Otherwise idle==false and the client // must call the Wait method of the Request exactly once. - StartRequest(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (req Request, idle bool) + StartRequest(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) (req Request, idle bool) } // Request represents the remainder of the handling of one request diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/BUILD b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/BUILD index ca0cde93c53..12291ce2b1d 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/BUILD @@ -33,6 +33,7 @@ go_test( "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 32d531a66da..8f4db56dd78 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -218,10 +218,10 @@ const ( // StartRequest begins the process of handling a request. We take the // approach of updating the metrics about total requests queued and -// executing on each path out of this method and Request::Wait. We do -// not update those metrics in lower level functions because there can -// be multiple lower level changes in one invocation here. -func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (fq.Request, bool) { +// executing at each point where there is a change in that quantity, +// because the metrics --- and only the metrics --- track that +// quantity per FlowSchema. +func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) (fq.Request, bool) { qs.lockAndSyncTime() defer qs.lock.Unlock() var req *request @@ -231,11 +231,11 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, descr1, // Apply only concurrency limit, if zero queues desired if qs.qCfg.DesiredNumQueues < 1 { if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit { - klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v because %d are executing and the limit is %d", qs.qCfg.Name, descr1, descr2, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit) + klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d are executing and the limit is %d", qs.qCfg.Name, fsName, descr1, descr2, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit) + metrics.AddReject(qs.qCfg.Name, fsName, "concurrency-limit") return nil, qs.isIdleLocked() } - req = qs.dispatchSansQueueLocked(ctx, descr1, descr2) - metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting) + req = qs.dispatchSansQueueLocked(ctx, fsName, descr1, descr2) return req, false } @@ -246,13 +246,12 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, descr1, // 3) Reject current request if there is not enough concurrency shares and // we are at max queue length // 4) If not rejected, create a request and enqueue - req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, hashValue, descr1, descr2) - defer metrics.UpdateFlowControlRequestsInQueue(qs.qCfg.Name, qs.totRequestsWaiting) + req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, hashValue, fsName, descr1, descr2) // req == nil means that the request was rejected - no remaining // concurrency shares and at max queue length already if req == nil { - klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v due to queue full", qs.qCfg.Name, descr1, descr2) - metrics.AddReject(qs.qCfg.Name, "queue-full") + klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v due to queue full", qs.qCfg.Name, fsName, descr1, descr2) + metrics.AddReject(qs.qCfg.Name, fsName, "queue-full") return nil, qs.isIdleLocked() } @@ -266,7 +265,6 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, descr1, // fair queuing technique to pick a queue and dispatch a // request from that queue. qs.dispatchAsMuchAsPossibleLocked() - defer metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting) // ======================================================================== // Step 3: @@ -288,7 +286,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, descr1, // known that the count does not need to be accurate. // BTW, the count only needs to be accurate in a test that // uses FakeEventClock::Run(). - klog.V(6).Infof("QS(%s): Context of request %#+v %#+v is Done", qs.qCfg.Name, descr1, descr2) + klog.V(6).Infof("QS(%s): Context of request %q %#+v %#+v is Done", qs.qCfg.Name, fsName, descr1, descr2) qs.cancelWait(req) qs.goroutineDoneOrBlocked() }() @@ -329,7 +327,7 @@ func (req *request) wait() (bool, bool) { switch decision { case decisionReject: klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, req.descr1, req.descr2) - metrics.AddReject(qs.qCfg.Name, "time-out") + metrics.AddReject(qs.qCfg.Name, req.fsName, "time-out") return false, qs.isIdleLocked() case decisionCancel: // TODO(aaron-prindle) add metrics for this case @@ -400,12 +398,12 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 { // returns the enqueud request on a successful enqueue // returns nil in the case that there is no available concurrency or // the queuelengthlimit has been reached -func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) *request { +func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) *request { // Start with the shuffle sharding, to pick a queue. queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2) queue := qs.queues[queueIdx] // The next step is the logic to reject requests that have been waiting too long - qs.removeTimedOutRequestsFromQueueLocked(queue) + qs.removeTimedOutRequestsFromQueueLocked(queue, fsName) // NOTE: currently timeout is only checked for each new request. This means that there can be // requests that are in the queue longer than the timeout if there are no new requests // We prefer the simplicity over the promptness, at least for now. @@ -413,6 +411,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte // Create a request and enqueue req := &request{ qs: qs, + fsName: fsName, ctx: ctx, decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter), arrivalTime: qs.clock.Now(), @@ -423,7 +422,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte if ok := qs.rejectOrEnqueueLocked(req); !ok { return nil } - metrics.ObserveQueueLength(qs.qCfg.Name, len(queue.requests)) + metrics.ObserveQueueLength(qs.qCfg.Name, fsName, len(queue.requests)) return req } @@ -446,7 +445,7 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte // removeTimedOutRequestsFromQueueLocked rejects old requests that have been enqueued // past the requestWaitLimit -func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue) { +func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName string) { timeoutIdx := -1 now := qs.clock.Now() reqs := queue.requests @@ -473,6 +472,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue) { queue.requests = reqs[removeIdx:] // decrement the # of requestsEnqueued qs.totRequestsWaiting -= removeIdx + metrics.ChangeRequestsInQueues(qs.qCfg.Name, fsName, -removeIdx) } } @@ -505,6 +505,7 @@ func (qs *queueSet) enqueueLocked(request *request) { } queue.Enqueue(request) qs.totRequestsWaiting++ + metrics.ChangeRequestsInQueues(qs.qCfg.Name, request.fsName, 1) } // dispatchAsMuchAsPossibleLocked runs a loop, as long as there @@ -522,10 +523,11 @@ func (qs *queueSet) dispatchAsMuchAsPossibleLocked() { } } -func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, descr1, descr2 interface{}) *request { +func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, fsName string, descr1, descr2 interface{}) *request { now := qs.clock.Now() req := &request{ qs: qs, + fsName: fsName, ctx: ctx, startTime: now, decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter), @@ -535,8 +537,9 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, descr1, descr2 } req.decision.SetLocked(decisionExecute) qs.totRequestsExecuting++ + metrics.ChangeRequestsExecuting(qs.qCfg.Name, fsName, 1) if klog.V(5) { - klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, descr1, descr2, qs.totRequestsExecuting) + klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting) } return req } @@ -563,6 +566,8 @@ func (qs *queueSet) dispatchLocked() bool { qs.totRequestsWaiting-- qs.totRequestsExecuting++ queue.requestsExecuting++ + metrics.ChangeRequestsInQueues(qs.qCfg.Name, request.fsName, -1) + metrics.ChangeRequestsExecuting(qs.qCfg.Name, request.fsName, 1) if klog.V(6) { klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.index, queue.virtualStart, len(queue.requests), queue.requestsExecuting) } @@ -590,6 +595,7 @@ func (qs *queueSet) cancelWait(req *request) { // remove the request queue.requests = append(queue.requests[:i], queue.requests[i+1:]...) qs.totRequestsWaiting-- + metrics.ChangeRequestsInQueues(qs.qCfg.Name, req.fsName, -1) break } } @@ -634,8 +640,6 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool qs.finishRequestLocked(req) qs.dispatchAsMuchAsPossibleLocked() - metrics.UpdateFlowControlRequestsInQueue(qs.qCfg.Name, qs.totRequestsWaiting) - metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting) return qs.isIdleLocked() } @@ -644,6 +648,7 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool // callback updates important state in the queueSet func (qs *queueSet) finishRequestLocked(r *request) { qs.totRequestsExecuting-- + metrics.ChangeRequestsExecuting(qs.qCfg.Name, r.fsName, -1) if r.queue == nil { if klog.V(6) { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index b3de31dedd1..13579179947 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -18,6 +18,7 @@ package queueset import ( "context" + "fmt" "math" "sync/atomic" "testing" @@ -27,6 +28,7 @@ import ( fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" test "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing" "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock" + "k8s.io/apiserver/pkg/util/flowcontrol/metrics" "k8s.io/klog" ) @@ -52,21 +54,30 @@ type uniformClient struct { // expectPass indicates whether the QueueSet is expected to be fair. // expectedAllRequests indicates whether all requests are expected to get dispatched. func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet, sc uniformScenario, - evalDuration time.Duration, expectPass bool, expectedAllRequests bool, + evalDuration time.Duration, + expectPass, expectedAllRequests, expectInqueueMetrics, expectExecutingMetrics bool, clk *clock.FakeEventClock, counter counter.GoRoutineCounter) { now := time.Now() t.Logf("%s: Start %s, clk=%p, grc=%p", clk.Now().Format(nsTimeFmt), name, clk, counter) integrators := make([]test.Integrator, len(sc)) var failedCount uint64 + expectedInqueue := "" + expectedExecuting := "" + if expectInqueueMetrics || expectExecutingMetrics { + metrics.Reset() + } + executions := make([]int32, len(sc)) for i, uc := range sc { integrators[i] = test.NewIntegrator(clk) + fsName := fmt.Sprintf("client%d", i) + expectedInqueue = expectedInqueue + fmt.Sprintf(` apiserver_flowcontrol_current_inqueue_requests{flowSchema=%q,priorityLevel=%q} 0%s`, fsName, name, "\n") for j := 0; j < uc.nThreads; j++ { counter.Add(1) go func(i, j int, uc uniformClient, igr test.Integrator) { for k := 0; k < uc.nCalls; k++ { ClockWait(clk, counter, uc.thinkDuration) - req, idle := qs.StartRequest(context.Background(), uc.hash, name, []int{i, j, k}) + req, idle := qs.StartRequest(context.Background(), uc.hash, fsName, name, []int{i, j, k}) t.Logf("%s: %d, %d, %d got req=%p, idle=%v", clk.Now().Format(nsTimeFmt), i, j, k, req, idle) if req == nil { atomic.AddUint64(&failedCount, 1) @@ -79,6 +90,7 @@ func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet, idle2 := req.Finish(func() { executed = true t.Logf("%s: %d, %d, %d executing", clk.Now().Format(nsTimeFmt), i, j, k) + atomic.AddInt32(&executions[i], 1) igr.Add(1) ClockWait(clk, counter, uc.execDuration) igr.Add(-1) @@ -124,6 +136,32 @@ func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet, } else if !expectedAllRequests && failedCount == 0 { t.Errorf("Expected failed requests but all requests succeeded") } + if expectInqueueMetrics { + err := metrics.GatherAndCompare(` + # HELP apiserver_flowcontrol_current_inqueue_requests [ALPHA] Number of requests currently pending in queues of the API Priority and Fairness system + # TYPE apiserver_flowcontrol_current_inqueue_requests gauge +`+expectedInqueue, + "apiserver_flowcontrol_current_inqueue_requests") + if err != nil { + t.Fatal(err) + } + } + for i := range sc { + fsName := fmt.Sprintf("client%d", i) + if atomic.AddInt32(&executions[i], 0) > 0 { + expectedExecuting = expectedExecuting + fmt.Sprintf(` apiserver_flowcontrol_current_executing_requests{flowSchema=%q,priorityLevel=%q} 0%s`, fsName, name, "\n") + } + } + if expectExecutingMetrics && len(expectedExecuting) > 0 { + err := metrics.GatherAndCompare(` + # HELP apiserver_flowcontrol_current_executing_requests [ALPHA] Number of requests currently executing in the API Priority and Fairness system + # TYPE apiserver_flowcontrol_current_executing_requests gauge +`+expectedExecuting, + "apiserver_flowcontrol_current_executing_requests") + if err != nil { + t.Fatal(err) + } + } } func ClockWait(clk *clock.FakeEventClock, counter counter.GoRoutineCounter, duration time.Duration) { @@ -144,6 +182,7 @@ func init() { // TestNoRestraint should fail because the dummy QueueSet exercises no control func TestNoRestraint(t *testing.T) { + metrics.Register() now := time.Now() clk, counter := clock.NewFakeEventClock(now, 0, nil) nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}) @@ -154,10 +193,11 @@ func TestNoRestraint(t *testing.T) { exerciseQueueSetUniformScenario(t, "NoRestraint", nr, []uniformClient{ {1001001001, 5, 10, time.Second, time.Second}, {2002002002, 2, 10, time.Second, time.Second / 2}, - }, time.Second*10, false, true, clk, counter) + }, time.Second*10, false, true, false, false, clk, counter) } func TestUniformFlows(t *testing.T) { + metrics.Register() now := time.Now() clk, counter := clock.NewFakeEventClock(now, 0, nil) @@ -175,13 +215,14 @@ func TestUniformFlows(t *testing.T) { } qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4}) - exerciseQueueSetUniformScenario(t, "UniformFlows", qs, []uniformClient{ + exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{ {1001001001, 5, 10, time.Second, time.Second}, {2002002002, 5, 10, time.Second, time.Second}, - }, time.Second*20, true, true, clk, counter) + }, time.Second*20, true, true, true, true, clk, counter) } func TestDifferentFlows(t *testing.T) { + metrics.Register() now := time.Now() clk, counter := clock.NewFakeEventClock(now, 0, nil) @@ -199,13 +240,14 @@ func TestDifferentFlows(t *testing.T) { } qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4}) - exerciseQueueSetUniformScenario(t, "DifferentFlows", qs, []uniformClient{ + exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{ {1001001001, 6, 10, time.Second, time.Second}, {2002002002, 5, 15, time.Second, time.Second / 2}, - }, time.Second*20, true, true, clk, counter) + }, time.Second*20, true, true, true, true, clk, counter) } func TestDifferentFlowsWithoutQueuing(t *testing.T) { + metrics.Register() now := time.Now() clk, counter := clock.NewFakeEventClock(now, 0, nil) @@ -220,13 +262,24 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) { } qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4}) - exerciseQueueSetUniformScenario(t, "DifferentFlowsWithoutQueuing", qs, []uniformClient{ + exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{ {1001001001, 6, 10, time.Second, 57 * time.Millisecond}, {2002002002, 4, 15, time.Second, 750 * time.Millisecond}, - }, time.Second*13, false, false, clk, counter) + }, time.Second*13, false, false, false, true, clk, counter) + err = metrics.GatherAndCompare(` + # HELP apiserver_flowcontrol_rejected_requests_total [ALPHA] Number of requests rejected by API Priority and Fairness system + # TYPE apiserver_flowcontrol_rejected_requests_total counter + apiserver_flowcontrol_rejected_requests_total{flowSchema="client0",priorityLevel="TestDifferentFlowsWithoutQueuing",reason="concurrency-limit"} 2 + apiserver_flowcontrol_rejected_requests_total{flowSchema="client1",priorityLevel="TestDifferentFlowsWithoutQueuing",reason="concurrency-limit"} 4 + `, + "apiserver_flowcontrol_rejected_requests_total") + if err != nil { + t.Fatal(err) + } } func TestTimeout(t *testing.T) { + metrics.Register() now := time.Now() clk, counter := clock.NewFakeEventClock(now, 0, nil) @@ -244,17 +297,19 @@ func TestTimeout(t *testing.T) { } qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1}) - exerciseQueueSetUniformScenario(t, "Timeout", qs, []uniformClient{ + exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{ {1001001001, 5, 100, time.Second, time.Second}, - }, time.Second*10, true, false, clk, counter) + }, time.Second*10, true, false, true, true, clk, counter) } func TestContextCancel(t *testing.T) { + metrics.Register() + metrics.Reset() now := time.Now() clk, counter := clock.NewFakeEventClock(now, 0, nil) qsf := NewQueueSetFactory(clk, counter) qCfg := fq.QueuingConfig{ - Name: "TestTimeout", + Name: "TestContextCancel", DesiredNumQueues: 11, QueueLengthLimit: 11, HandSize: 1, @@ -267,7 +322,7 @@ func TestContextCancel(t *testing.T) { qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1}) counter.Add(1) // account for the goroutine running this test ctx1 := context.Background() - req1, _ := qs.StartRequest(ctx1, 1, "test", "one") + req1, _ := qs.StartRequest(ctx1, 1, "fs1", "test", "one") if req1 == nil { t.Error("Request rejected") return @@ -283,7 +338,7 @@ func TestContextCancel(t *testing.T) { counter.Add(1) cancel2() }() - req2, idle2a := qs.StartRequest(ctx2, 2, "test", "two") + req2, idle2a := qs.StartRequest(ctx2, 2, "fs2", "test", "two") if idle2a { t.Error("2nd StartRequest returned idle") } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index 4a154a425db..1facc701d9e 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -26,8 +26,9 @@ import ( // request is a temporary container for "requests" with additional // tracking fields required for the functionality FQScheduler type request struct { - qs *queueSet - ctx context.Context + qs *queueSet + fsName string + ctx context.Context // The relevant queue. Is nil if this request did not go through // a queue. diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go index 77e42b636d3..14504f20179 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go @@ -53,7 +53,7 @@ func (noRestraint) IsIdle() bool { return false } -func (noRestraint) StartRequest(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (fq.Request, bool) { +func (noRestraint) StartRequest(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) (fq.Request, bool) { return noRestraintRequest{}, false } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/BUILD b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/BUILD index a1478b15992..1074189822a 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/BUILD @@ -9,6 +9,7 @@ go_library( deps = [ "//staging/src/k8s.io/component-base/metrics:go_default_library", "//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library", + "//staging/src/k8s.io/component-base/metrics/testutil:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go index 887dd3db97c..0c585003640 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go @@ -17,11 +17,13 @@ limitations under the License. package metrics import ( + "strings" "sync" "time" compbasemetrics "k8s.io/component-base/metrics" "k8s.io/component-base/metrics/legacyregistry" + basemetricstestutil "k8s.io/component-base/metrics/testutil" ) const ( @@ -50,41 +52,67 @@ func Register() { }) } +type resettable interface { + Reset() +} + +// Reset all metrics to zero +func Reset() { + for _, metric := range metrics { + rm := metric.(resettable) + rm.Reset() + } +} + +// GatherAndCompare the given metrics with the given Prometheus syntax expected value +func GatherAndCompare(expected string, metricNames ...string) error { + return basemetricstestutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...) +} + var ( apiserverRejectedRequestsTotal = compbasemetrics.NewCounterVec( &compbasemetrics.CounterOpts{ Namespace: namespace, Subsystem: subsystem, Name: "rejected_requests_total", - Help: "Number of rejected requests by api priority and fairness system", + Help: "Number of requests rejected by API Priority and Fairness system", }, - []string{priorityLevel, "reason"}, + []string{priorityLevel, flowSchema, "reason"}, + ) + apiserverDispatchedRequestsTotal = compbasemetrics.NewCounterVec( + &compbasemetrics.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "dispatched_requests_total", + Help: "Number of requests released by API Priority and Fairness system for service", + }, + []string{priorityLevel, flowSchema}, ) apiserverCurrentInqueueRequests = compbasemetrics.NewGaugeVec( &compbasemetrics.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, Name: "current_inqueue_requests", - Help: "Number of requests currently pending in the queue by the api priority and fairness system", + Help: "Number of requests currently pending in queues of the API Priority and Fairness system", }, - []string{priorityLevel}, + []string{priorityLevel, flowSchema}, ) apiserverRequestQueueLength = compbasemetrics.NewHistogramVec( &compbasemetrics.HistogramOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "request_queue_length", - Help: "Length of queue in the api priority and fairness system", + Name: "request_queue_length_after_enqueue", + Help: "Length of queue in the API Priority and Fairness system, as seen by each request after it is enqueued", Buckets: queueLengthBuckets, }, - []string{priorityLevel}, + []string{priorityLevel, flowSchema}, ) apiserverRequestConcurrencyLimit = compbasemetrics.NewGaugeVec( &compbasemetrics.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, Name: "request_concurrency_limit", - Help: "Shared concurrency limit in the api priority and fairness system", + Help: "Shared concurrency limit in the API Priority and Fairness system", }, []string{priorityLevel}, ) @@ -93,9 +121,9 @@ var ( Namespace: namespace, Subsystem: subsystem, Name: "current_executing_requests", - Help: "Number of requests currently executing in the api priority and fairness system", + Help: "Number of requests currently executing in the API Priority and Fairness system", }, - []string{priorityLevel}, + []string{priorityLevel, flowSchema}, ) apiserverRequestWaitingSeconds = compbasemetrics.NewHistogramVec( &compbasemetrics.HistogramOpts{ @@ -112,13 +140,14 @@ var ( Namespace: namespace, Subsystem: subsystem, Name: "request_execution_seconds", - Help: "Time of request executing in the api priority and fairness system", + Help: "Duration of request execution in the API Priority and Fairness system", Buckets: requestDurationSecondsBuckets, }, []string{priorityLevel, flowSchema}, ) metrics = []compbasemetrics.Registerable{ apiserverRejectedRequestsTotal, + apiserverDispatchedRequestsTotal, apiserverCurrentInqueueRequests, apiserverRequestQueueLength, apiserverRequestConcurrencyLimit, @@ -128,14 +157,14 @@ var ( } ) -// UpdateFlowControlRequestsInQueue updates the value for the # of requests in the specified queues in flow control -func UpdateFlowControlRequestsInQueue(priorityLevel string, inqueue int) { - apiserverCurrentInqueueRequests.WithLabelValues(priorityLevel).Set(float64(inqueue)) +// ChangeRequestsInQueues adds the given delta to the gauge of the # of requests in the queues of the specified flowSchema and priorityLevel +func ChangeRequestsInQueues(priorityLevel, flowSchema string, delta int) { + apiserverCurrentInqueueRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta)) } -// UpdateFlowControlRequestsExecuting updates the value for the # of requests executing in flow control -func UpdateFlowControlRequestsExecuting(priorityLevel string, executing int) { - apiserverCurrentExecutingRequests.WithLabelValues(priorityLevel).Set(float64(executing)) +// ChangeRequestsExecuting adds the given delta to the gauge of executing requests of the given flowSchema and priorityLevel +func ChangeRequestsExecuting(priorityLevel, flowSchema string, delta int) { + apiserverCurrentExecutingRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta)) } // UpdateSharedConcurrencyLimit updates the value for the concurrency limit in flow control @@ -144,13 +173,18 @@ func UpdateSharedConcurrencyLimit(priorityLevel string, limit int) { } // AddReject increments the # of rejected requests for flow control -func AddReject(priorityLevel string, reason string) { - apiserverRejectedRequestsTotal.WithLabelValues(priorityLevel, reason).Add(1) +func AddReject(priorityLevel, flowSchema, reason string) { + apiserverRejectedRequestsTotal.WithLabelValues(priorityLevel, flowSchema, reason).Add(1) +} + +// AddDispatch increments the # of dispatched requests for flow control +func AddDispatch(priorityLevel, flowSchema string) { + apiserverDispatchedRequestsTotal.WithLabelValues(priorityLevel, flowSchema).Add(1) } // ObserveQueueLength observes the queue length for flow control -func ObserveQueueLength(priorityLevel string, length int) { - apiserverRequestQueueLength.WithLabelValues(priorityLevel).Observe(float64(length)) +func ObserveQueueLength(priorityLevel, flowSchema string, length int) { + apiserverRequestQueueLength.WithLabelValues(priorityLevel, flowSchema).Observe(float64(length)) } // ObserveWaitingDuration observes the queue length for flow control From 8a1b60320986eca05cb281bcce45332e0969268e Mon Sep 17 00:00:00 2001 From: Mike Spreitzer Date: Thu, 5 Mar 2020 15:13:46 -0500 Subject: [PATCH 2/3] Fix queued request accounting, extended queueset test --- .../fairqueuing/queueset/queueset.go | 2 +- .../fairqueuing/queueset/queueset_test.go | 50 ++++++++++++++----- 2 files changed, 38 insertions(+), 14 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 8f4db56dd78..3c64bdcb5c8 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -460,6 +460,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s req.decision.SetLocked(decisionReject) // get index for timed out requests timeoutIdx = i + metrics.ChangeRequestsInQueues(qs.qCfg.Name, req.fsName, -1) } else { break } @@ -472,7 +473,6 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s queue.requests = reqs[removeIdx:] // decrement the # of requestsEnqueued qs.totRequestsWaiting -= removeIdx - metrics.ChangeRequestsInQueues(qs.qCfg.Name, fsName, -removeIdx) } } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 13579179947..984917edb7a 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -56,6 +56,7 @@ type uniformClient struct { func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet, sc uniformScenario, evalDuration time.Duration, expectPass, expectedAllRequests, expectInqueueMetrics, expectExecutingMetrics bool, + rejectReason string, clk *clock.FakeEventClock, counter counter.GoRoutineCounter) { now := time.Now() @@ -68,6 +69,7 @@ func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet, metrics.Reset() } executions := make([]int32, len(sc)) + rejects := make([]int32, len(sc)) for i, uc := range sc { integrators[i] = test.NewIntegrator(clk) fsName := fmt.Sprintf("client%d", i) @@ -81,6 +83,7 @@ func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet, t.Logf("%s: %d, %d, %d got req=%p, idle=%v", clk.Now().Format(nsTimeFmt), i, j, k, req, idle) if req == nil { atomic.AddUint64(&failedCount, 1) + atomic.AddInt32(&rejects[i], 1) break } if idle { @@ -98,6 +101,7 @@ func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet, t.Logf("%s: %d, %d, %d got executed=%v, idle2=%v", clk.Now().Format(nsTimeFmt), i, j, k, executed, idle2) if !executed { atomic.AddUint64(&failedCount, 1) + atomic.AddInt32(&rejects[i], 1) } } counter.Add(-1) @@ -137,29 +141,49 @@ func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet, t.Errorf("Expected failed requests but all requests succeeded") } if expectInqueueMetrics { - err := metrics.GatherAndCompare(` + e := ` # HELP apiserver_flowcontrol_current_inqueue_requests [ALPHA] Number of requests currently pending in queues of the API Priority and Fairness system # TYPE apiserver_flowcontrol_current_inqueue_requests gauge -`+expectedInqueue, - "apiserver_flowcontrol_current_inqueue_requests") +` + expectedInqueue + err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_current_inqueue_requests") if err != nil { - t.Fatal(err) + t.Error(err) + } else { + t.Log("Success with" + e) } } + expectedRejects := "" for i := range sc { fsName := fmt.Sprintf("client%d", i) if atomic.AddInt32(&executions[i], 0) > 0 { expectedExecuting = expectedExecuting + fmt.Sprintf(` apiserver_flowcontrol_current_executing_requests{flowSchema=%q,priorityLevel=%q} 0%s`, fsName, name, "\n") } + if atomic.AddInt32(&rejects[i], 0) > 0 { + expectedRejects = expectedRejects + fmt.Sprintf(` apiserver_flowcontrol_rejected_requests_total{flowSchema=%q,priorityLevel=%q,reason=%q} %d%s`, fsName, name, rejectReason, rejects[i], "\n") + } } if expectExecutingMetrics && len(expectedExecuting) > 0 { - err := metrics.GatherAndCompare(` + e := ` # HELP apiserver_flowcontrol_current_executing_requests [ALPHA] Number of requests currently executing in the API Priority and Fairness system # TYPE apiserver_flowcontrol_current_executing_requests gauge -`+expectedExecuting, - "apiserver_flowcontrol_current_executing_requests") +` + expectedExecuting + err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_current_executing_requests") if err != nil { - t.Fatal(err) + t.Error(err) + } else { + t.Log("Success with" + e) + } + } + if expectExecutingMetrics && len(expectedRejects) > 0 { + e := ` + # HELP apiserver_flowcontrol_rejected_requests_total [ALPHA] Number of requests rejected by API Priority and Fairness system + # TYPE apiserver_flowcontrol_rejected_requests_total counter +` + expectedRejects + err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_rejected_requests_total") + if err != nil { + t.Error(err) + } else { + t.Log("Success with" + e) } } } @@ -193,7 +217,7 @@ func TestNoRestraint(t *testing.T) { exerciseQueueSetUniformScenario(t, "NoRestraint", nr, []uniformClient{ {1001001001, 5, 10, time.Second, time.Second}, {2002002002, 2, 10, time.Second, time.Second / 2}, - }, time.Second*10, false, true, false, false, clk, counter) + }, time.Second*10, false, true, false, false, "", clk, counter) } func TestUniformFlows(t *testing.T) { @@ -218,7 +242,7 @@ func TestUniformFlows(t *testing.T) { exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{ {1001001001, 5, 10, time.Second, time.Second}, {2002002002, 5, 10, time.Second, time.Second}, - }, time.Second*20, true, true, true, true, clk, counter) + }, time.Second*20, true, true, true, true, "", clk, counter) } func TestDifferentFlows(t *testing.T) { @@ -243,7 +267,7 @@ func TestDifferentFlows(t *testing.T) { exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{ {1001001001, 6, 10, time.Second, time.Second}, {2002002002, 5, 15, time.Second, time.Second / 2}, - }, time.Second*20, true, true, true, true, clk, counter) + }, time.Second*20, true, true, true, true, "", clk, counter) } func TestDifferentFlowsWithoutQueuing(t *testing.T) { @@ -265,7 +289,7 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) { exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{ {1001001001, 6, 10, time.Second, 57 * time.Millisecond}, {2002002002, 4, 15, time.Second, 750 * time.Millisecond}, - }, time.Second*13, false, false, false, true, clk, counter) + }, time.Second*13, false, false, false, true, "concurrency-limit", clk, counter) err = metrics.GatherAndCompare(` # HELP apiserver_flowcontrol_rejected_requests_total [ALPHA] Number of requests rejected by API Priority and Fairness system # TYPE apiserver_flowcontrol_rejected_requests_total counter @@ -299,7 +323,7 @@ func TestTimeout(t *testing.T) { exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{ {1001001001, 5, 100, time.Second, time.Second}, - }, time.Second*10, true, false, true, true, clk, counter) + }, time.Second*10, true, false, true, true, "time-out", clk, counter) } func TestContextCancel(t *testing.T) { From c7b098ac6c276d65a79db6cfeb04f5f0f86eb315 Mon Sep 17 00:00:00 2001 From: Mike Spreitzer Date: Thu, 5 Mar 2020 15:17:33 -0500 Subject: [PATCH 3/3] Renaming: "Change" -> "Add" for consistency with underlying method --- .../flowcontrol/fairqueuing/queueset/queueset.go | 14 +++++++------- .../pkg/util/flowcontrol/metrics/metrics.go | 8 ++++---- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 3c64bdcb5c8..8a7e72b5f85 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -460,7 +460,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s req.decision.SetLocked(decisionReject) // get index for timed out requests timeoutIdx = i - metrics.ChangeRequestsInQueues(qs.qCfg.Name, req.fsName, -1) + metrics.AddRequestsInQueues(qs.qCfg.Name, req.fsName, -1) } else { break } @@ -505,7 +505,7 @@ func (qs *queueSet) enqueueLocked(request *request) { } queue.Enqueue(request) qs.totRequestsWaiting++ - metrics.ChangeRequestsInQueues(qs.qCfg.Name, request.fsName, 1) + metrics.AddRequestsInQueues(qs.qCfg.Name, request.fsName, 1) } // dispatchAsMuchAsPossibleLocked runs a loop, as long as there @@ -537,7 +537,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, fsName string, } req.decision.SetLocked(decisionExecute) qs.totRequestsExecuting++ - metrics.ChangeRequestsExecuting(qs.qCfg.Name, fsName, 1) + metrics.AddRequestsExecuting(qs.qCfg.Name, fsName, 1) if klog.V(5) { klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting) } @@ -566,8 +566,8 @@ func (qs *queueSet) dispatchLocked() bool { qs.totRequestsWaiting-- qs.totRequestsExecuting++ queue.requestsExecuting++ - metrics.ChangeRequestsInQueues(qs.qCfg.Name, request.fsName, -1) - metrics.ChangeRequestsExecuting(qs.qCfg.Name, request.fsName, 1) + metrics.AddRequestsInQueues(qs.qCfg.Name, request.fsName, -1) + metrics.AddRequestsExecuting(qs.qCfg.Name, request.fsName, 1) if klog.V(6) { klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.index, queue.virtualStart, len(queue.requests), queue.requestsExecuting) } @@ -595,7 +595,7 @@ func (qs *queueSet) cancelWait(req *request) { // remove the request queue.requests = append(queue.requests[:i], queue.requests[i+1:]...) qs.totRequestsWaiting-- - metrics.ChangeRequestsInQueues(qs.qCfg.Name, req.fsName, -1) + metrics.AddRequestsInQueues(qs.qCfg.Name, req.fsName, -1) break } } @@ -648,7 +648,7 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool // callback updates important state in the queueSet func (qs *queueSet) finishRequestLocked(r *request) { qs.totRequestsExecuting-- - metrics.ChangeRequestsExecuting(qs.qCfg.Name, r.fsName, -1) + metrics.AddRequestsExecuting(qs.qCfg.Name, r.fsName, -1) if r.queue == nil { if klog.V(6) { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go index 0c585003640..b2b6dab845b 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go @@ -157,13 +157,13 @@ var ( } ) -// ChangeRequestsInQueues adds the given delta to the gauge of the # of requests in the queues of the specified flowSchema and priorityLevel -func ChangeRequestsInQueues(priorityLevel, flowSchema string, delta int) { +// AddRequestsInQueues adds the given delta to the gauge of the # of requests in the queues of the specified flowSchema and priorityLevel +func AddRequestsInQueues(priorityLevel, flowSchema string, delta int) { apiserverCurrentInqueueRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta)) } -// ChangeRequestsExecuting adds the given delta to the gauge of executing requests of the given flowSchema and priorityLevel -func ChangeRequestsExecuting(priorityLevel, flowSchema string, delta int) { +// AddRequestsExecuting adds the given delta to the gauge of executing requests of the given flowSchema and priorityLevel +func AddRequestsExecuting(priorityLevel, flowSchema string, delta int) { apiserverCurrentExecutingRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta)) }