diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go index a6afb2b24ca..57e601e62b9 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go @@ -123,7 +123,7 @@ func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest Reque noteFn(fs, pl) if req == nil { if queued { - metrics.ObserveWaitingDuration(pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) + metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) } klog.V(7).Infof("Handle(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, isExempt=%v, reject", requestDigest, fs.Name, fs.Spec.DistinguisherMethod, pl.Name, isExempt) return @@ -140,18 +140,18 @@ func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest Reque }() idle = req.Finish(func() { if queued { - metrics.ObserveWaitingDuration(pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) + metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) } - metrics.AddDispatch(pl.Name, fs.Name) + metrics.AddDispatch(ctx, pl.Name, fs.Name) executed = true startExecutionTime := time.Now() defer func() { - metrics.ObserveExecutionDuration(pl.Name, fs.Name, time.Since(startExecutionTime)) + metrics.ObserveExecutionDuration(ctx, pl.Name, fs.Name, time.Since(startExecutionTime)) }() execFn() }) if queued && !executed { - metrics.ObserveWaitingDuration(pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) + metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) } panicking = false } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index acdec4bba1f..6a85bb58990 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -243,7 +243,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist if qs.qCfg.DesiredNumQueues < 1 { if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit { klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d are executing and the limit is %d", qs.qCfg.Name, fsName, descr1, descr2, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit) - metrics.AddReject(qs.qCfg.Name, fsName, "concurrency-limit") + metrics.AddReject(ctx, qs.qCfg.Name, fsName, "concurrency-limit") return nil, qs.isIdleLocked() } req = qs.dispatchSansQueueLocked(ctx, flowDistinguisher, fsName, descr1, descr2) @@ -262,7 +262,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist // concurrency shares and at max queue length already if req == nil { klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v due to queue full", qs.qCfg.Name, fsName, descr1, descr2) - metrics.AddReject(qs.qCfg.Name, fsName, "queue-full") + metrics.AddReject(ctx, qs.qCfg.Name, fsName, "queue-full") return nil, qs.isIdleLocked() } @@ -351,7 +351,7 @@ func (req *request) wait() (bool, bool) { switch decision { case decisionReject: klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, req.descr1, req.descr2) - metrics.AddReject(qs.qCfg.Name, req.fsName, "time-out") + metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "time-out") return false, qs.isIdleLocked() case decisionCancel: // TODO(aaron-prindle) add metrics for this case @@ -448,7 +448,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte if ok := qs.rejectOrEnqueueLocked(req); !ok { return nil } - metrics.ObserveQueueLength(qs.qCfg.Name, fsName, len(queue.requests)) + metrics.ObserveQueueLength(ctx, qs.qCfg.Name, fsName, len(queue.requests)) return req } @@ -486,7 +486,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s req.decision.SetLocked(decisionReject) // get index for timed out requests timeoutIdx = i - metrics.AddRequestsInQueues(qs.qCfg.Name, req.fsName, -1) + metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) req.NoteQueued(false) } else { break @@ -534,7 +534,7 @@ func (qs *queueSet) enqueueLocked(request *request) { } queue.Enqueue(request) qs.totRequestsWaiting++ - metrics.AddRequestsInQueues(qs.qCfg.Name, request.fsName, 1) + metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1) request.NoteQueued(true) qs.obsPair.RequestsWaiting.Add(1) } @@ -569,7 +569,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, flowDistinguish } req.decision.SetLocked(decisionExecute) qs.totRequestsExecuting++ - metrics.AddRequestsExecuting(qs.qCfg.Name, fsName, 1) + metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1) qs.obsPair.RequestsExecuting.Add(1) if klog.V(5).Enabled() { klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting) @@ -599,9 +599,9 @@ func (qs *queueSet) dispatchLocked() bool { qs.totRequestsWaiting-- qs.totRequestsExecuting++ queue.requestsExecuting++ - metrics.AddRequestsInQueues(qs.qCfg.Name, request.fsName, -1) + metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1) request.NoteQueued(false) - metrics.AddRequestsExecuting(qs.qCfg.Name, request.fsName, 1) + metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1) qs.obsPair.RequestsWaiting.Add(-1) qs.obsPair.RequestsExecuting.Add(1) if klog.V(6).Enabled() { @@ -631,7 +631,7 @@ func (qs *queueSet) cancelWait(req *request) { // remove the request queue.requests = append(queue.requests[:i], queue.requests[i+1:]...) qs.totRequestsWaiting-- - metrics.AddRequestsInQueues(qs.qCfg.Name, req.fsName, -1) + metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) req.NoteQueued(false) qs.obsPair.RequestsWaiting.Add(-1) break @@ -704,7 +704,7 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool func (qs *queueSet) finishRequestLocked(r *request) { now := qs.clock.Now() qs.totRequestsExecuting-- - metrics.AddRequestsExecuting(qs.qCfg.Name, r.fsName, -1) + metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1) qs.obsPair.RequestsExecuting.Add(-1) if r.queue == nil { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go index bdbaa94601f..4ebe85577ba 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go @@ -17,6 +17,7 @@ limitations under the License. package metrics import ( + "context" "strings" "sync" "time" @@ -221,12 +222,12 @@ var ( ) // AddRequestsInQueues adds the given delta to the gauge of the # of requests in the queues of the specified flowSchema and priorityLevel -func AddRequestsInQueues(priorityLevel, flowSchema string, delta int) { +func AddRequestsInQueues(ctx context.Context, priorityLevel, flowSchema string, delta int) { apiserverCurrentInqueueRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta)) } // AddRequestsExecuting adds the given delta to the gauge of executing requests of the given flowSchema and priorityLevel -func AddRequestsExecuting(priorityLevel, flowSchema string, delta int) { +func AddRequestsExecuting(ctx context.Context, priorityLevel, flowSchema string, delta int) { apiserverCurrentExecutingRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta)) } @@ -236,26 +237,26 @@ func UpdateSharedConcurrencyLimit(priorityLevel string, limit int) { } // AddReject increments the # of rejected requests for flow control -func AddReject(priorityLevel, flowSchema, reason string) { - apiserverRejectedRequestsTotal.WithLabelValues(priorityLevel, flowSchema, reason).Add(1) +func AddReject(ctx context.Context, priorityLevel, flowSchema, reason string) { + apiserverRejectedRequestsTotal.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema, reason).Add(1) } // AddDispatch increments the # of dispatched requests for flow control -func AddDispatch(priorityLevel, flowSchema string) { - apiserverDispatchedRequestsTotal.WithLabelValues(priorityLevel, flowSchema).Add(1) +func AddDispatch(ctx context.Context, priorityLevel, flowSchema string) { + apiserverDispatchedRequestsTotal.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema).Add(1) } // ObserveQueueLength observes the queue length for flow control -func ObserveQueueLength(priorityLevel, flowSchema string, length int) { - apiserverRequestQueueLength.WithLabelValues(priorityLevel, flowSchema).Observe(float64(length)) +func ObserveQueueLength(ctx context.Context, priorityLevel, flowSchema string, length int) { + apiserverRequestQueueLength.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema).Observe(float64(length)) } // ObserveWaitingDuration observes the queue length for flow control -func ObserveWaitingDuration(priorityLevel, flowSchema, execute string, waitTime time.Duration) { - apiserverRequestWaitingSeconds.WithLabelValues(priorityLevel, flowSchema, execute).Observe(waitTime.Seconds()) +func ObserveWaitingDuration(ctx context.Context, priorityLevel, flowSchema, execute string, waitTime time.Duration) { + apiserverRequestWaitingSeconds.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema, execute).Observe(waitTime.Seconds()) } // ObserveExecutionDuration observes the execution duration for flow control -func ObserveExecutionDuration(priorityLevel, flowSchema string, executionTime time.Duration) { - apiserverRequestExecutionSeconds.WithLabelValues(priorityLevel, flowSchema).Observe(executionTime.Seconds()) +func ObserveExecutionDuration(ctx context.Context, priorityLevel, flowSchema string, executionTime time.Duration) { + apiserverRequestExecutionSeconds.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema).Observe(executionTime.Seconds()) }