add context to metrics in util/flowcontrol.

This commit is contained in:
yoyinzyc 2020-12-16 17:08:43 -08:00
parent 266d67bd51
commit 57d0bc301a
3 changed files with 29 additions and 28 deletions

View File

@ -123,7 +123,7 @@ func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest Reque
noteFn(fs, pl) noteFn(fs, pl)
if req == nil { if req == nil {
if queued { if queued {
metrics.ObserveWaitingDuration(pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
} }
klog.V(7).Infof("Handle(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, isExempt=%v, reject", requestDigest, fs.Name, fs.Spec.DistinguisherMethod, pl.Name, isExempt) klog.V(7).Infof("Handle(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, isExempt=%v, reject", requestDigest, fs.Name, fs.Spec.DistinguisherMethod, pl.Name, isExempt)
return return
@ -140,18 +140,18 @@ func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest Reque
}() }()
idle = req.Finish(func() { idle = req.Finish(func() {
if queued { if queued {
metrics.ObserveWaitingDuration(pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
} }
metrics.AddDispatch(pl.Name, fs.Name) metrics.AddDispatch(ctx, pl.Name, fs.Name)
executed = true executed = true
startExecutionTime := time.Now() startExecutionTime := time.Now()
defer func() { defer func() {
metrics.ObserveExecutionDuration(pl.Name, fs.Name, time.Since(startExecutionTime)) metrics.ObserveExecutionDuration(ctx, pl.Name, fs.Name, time.Since(startExecutionTime))
}() }()
execFn() execFn()
}) })
if queued && !executed { if queued && !executed {
metrics.ObserveWaitingDuration(pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
} }
panicking = false panicking = false
} }

View File

@ -243,7 +243,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist
if qs.qCfg.DesiredNumQueues < 1 { if qs.qCfg.DesiredNumQueues < 1 {
if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit { if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit {
klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d are executing and the limit is %d", qs.qCfg.Name, fsName, descr1, descr2, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit) klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d are executing and the limit is %d", qs.qCfg.Name, fsName, descr1, descr2, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit)
metrics.AddReject(qs.qCfg.Name, fsName, "concurrency-limit") metrics.AddReject(ctx, qs.qCfg.Name, fsName, "concurrency-limit")
return nil, qs.isIdleLocked() return nil, qs.isIdleLocked()
} }
req = qs.dispatchSansQueueLocked(ctx, flowDistinguisher, fsName, descr1, descr2) req = qs.dispatchSansQueueLocked(ctx, flowDistinguisher, fsName, descr1, descr2)
@ -262,7 +262,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist
// concurrency shares and at max queue length already // concurrency shares and at max queue length already
if req == nil { if req == nil {
klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v due to queue full", qs.qCfg.Name, fsName, descr1, descr2) klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v due to queue full", qs.qCfg.Name, fsName, descr1, descr2)
metrics.AddReject(qs.qCfg.Name, fsName, "queue-full") metrics.AddReject(ctx, qs.qCfg.Name, fsName, "queue-full")
return nil, qs.isIdleLocked() return nil, qs.isIdleLocked()
} }
@ -351,7 +351,7 @@ func (req *request) wait() (bool, bool) {
switch decision { switch decision {
case decisionReject: case decisionReject:
klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, req.descr1, req.descr2) klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, req.descr1, req.descr2)
metrics.AddReject(qs.qCfg.Name, req.fsName, "time-out") metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "time-out")
return false, qs.isIdleLocked() return false, qs.isIdleLocked()
case decisionCancel: case decisionCancel:
// TODO(aaron-prindle) add metrics for this case // TODO(aaron-prindle) add metrics for this case
@ -448,7 +448,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
if ok := qs.rejectOrEnqueueLocked(req); !ok { if ok := qs.rejectOrEnqueueLocked(req); !ok {
return nil return nil
} }
metrics.ObserveQueueLength(qs.qCfg.Name, fsName, len(queue.requests)) metrics.ObserveQueueLength(ctx, qs.qCfg.Name, fsName, len(queue.requests))
return req return req
} }
@ -486,7 +486,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s
req.decision.SetLocked(decisionReject) req.decision.SetLocked(decisionReject)
// get index for timed out requests // get index for timed out requests
timeoutIdx = i timeoutIdx = i
metrics.AddRequestsInQueues(qs.qCfg.Name, req.fsName, -1) metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
req.NoteQueued(false) req.NoteQueued(false)
} else { } else {
break break
@ -534,7 +534,7 @@ func (qs *queueSet) enqueueLocked(request *request) {
} }
queue.Enqueue(request) queue.Enqueue(request)
qs.totRequestsWaiting++ qs.totRequestsWaiting++
metrics.AddRequestsInQueues(qs.qCfg.Name, request.fsName, 1) metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1)
request.NoteQueued(true) request.NoteQueued(true)
qs.obsPair.RequestsWaiting.Add(1) qs.obsPair.RequestsWaiting.Add(1)
} }
@ -569,7 +569,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, flowDistinguish
} }
req.decision.SetLocked(decisionExecute) req.decision.SetLocked(decisionExecute)
qs.totRequestsExecuting++ qs.totRequestsExecuting++
metrics.AddRequestsExecuting(qs.qCfg.Name, fsName, 1) metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1)
qs.obsPair.RequestsExecuting.Add(1) qs.obsPair.RequestsExecuting.Add(1)
if klog.V(5).Enabled() { if klog.V(5).Enabled() {
klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting) klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting)
@ -599,9 +599,9 @@ func (qs *queueSet) dispatchLocked() bool {
qs.totRequestsWaiting-- qs.totRequestsWaiting--
qs.totRequestsExecuting++ qs.totRequestsExecuting++
queue.requestsExecuting++ queue.requestsExecuting++
metrics.AddRequestsInQueues(qs.qCfg.Name, request.fsName, -1) metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1)
request.NoteQueued(false) request.NoteQueued(false)
metrics.AddRequestsExecuting(qs.qCfg.Name, request.fsName, 1) metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1)
qs.obsPair.RequestsWaiting.Add(-1) qs.obsPair.RequestsWaiting.Add(-1)
qs.obsPair.RequestsExecuting.Add(1) qs.obsPair.RequestsExecuting.Add(1)
if klog.V(6).Enabled() { if klog.V(6).Enabled() {
@ -631,7 +631,7 @@ func (qs *queueSet) cancelWait(req *request) {
// remove the request // remove the request
queue.requests = append(queue.requests[:i], queue.requests[i+1:]...) queue.requests = append(queue.requests[:i], queue.requests[i+1:]...)
qs.totRequestsWaiting-- qs.totRequestsWaiting--
metrics.AddRequestsInQueues(qs.qCfg.Name, req.fsName, -1) metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
req.NoteQueued(false) req.NoteQueued(false)
qs.obsPair.RequestsWaiting.Add(-1) qs.obsPair.RequestsWaiting.Add(-1)
break break
@ -704,7 +704,7 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool
func (qs *queueSet) finishRequestLocked(r *request) { func (qs *queueSet) finishRequestLocked(r *request) {
now := qs.clock.Now() now := qs.clock.Now()
qs.totRequestsExecuting-- qs.totRequestsExecuting--
metrics.AddRequestsExecuting(qs.qCfg.Name, r.fsName, -1) metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1)
qs.obsPair.RequestsExecuting.Add(-1) qs.obsPair.RequestsExecuting.Add(-1)
if r.queue == nil { if r.queue == nil {

View File

@ -17,6 +17,7 @@ limitations under the License.
package metrics package metrics
import ( import (
"context"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -221,12 +222,12 @@ var (
) )
// AddRequestsInQueues adds the given delta to the gauge of the # of requests in the queues of the specified flowSchema and priorityLevel // AddRequestsInQueues adds the given delta to the gauge of the # of requests in the queues of the specified flowSchema and priorityLevel
func AddRequestsInQueues(priorityLevel, flowSchema string, delta int) { func AddRequestsInQueues(ctx context.Context, priorityLevel, flowSchema string, delta int) {
apiserverCurrentInqueueRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta)) apiserverCurrentInqueueRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta))
} }
// AddRequestsExecuting adds the given delta to the gauge of executing requests of the given flowSchema and priorityLevel // AddRequestsExecuting adds the given delta to the gauge of executing requests of the given flowSchema and priorityLevel
func AddRequestsExecuting(priorityLevel, flowSchema string, delta int) { func AddRequestsExecuting(ctx context.Context, priorityLevel, flowSchema string, delta int) {
apiserverCurrentExecutingRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta)) apiserverCurrentExecutingRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta))
} }
@ -236,26 +237,26 @@ func UpdateSharedConcurrencyLimit(priorityLevel string, limit int) {
} }
// AddReject increments the # of rejected requests for flow control // AddReject increments the # of rejected requests for flow control
func AddReject(priorityLevel, flowSchema, reason string) { func AddReject(ctx context.Context, priorityLevel, flowSchema, reason string) {
apiserverRejectedRequestsTotal.WithLabelValues(priorityLevel, flowSchema, reason).Add(1) apiserverRejectedRequestsTotal.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema, reason).Add(1)
} }
// AddDispatch increments the # of dispatched requests for flow control // AddDispatch increments the # of dispatched requests for flow control
func AddDispatch(priorityLevel, flowSchema string) { func AddDispatch(ctx context.Context, priorityLevel, flowSchema string) {
apiserverDispatchedRequestsTotal.WithLabelValues(priorityLevel, flowSchema).Add(1) apiserverDispatchedRequestsTotal.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema).Add(1)
} }
// ObserveQueueLength observes the queue length for flow control // ObserveQueueLength observes the queue length for flow control
func ObserveQueueLength(priorityLevel, flowSchema string, length int) { func ObserveQueueLength(ctx context.Context, priorityLevel, flowSchema string, length int) {
apiserverRequestQueueLength.WithLabelValues(priorityLevel, flowSchema).Observe(float64(length)) apiserverRequestQueueLength.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema).Observe(float64(length))
} }
// ObserveWaitingDuration observes the queue length for flow control // ObserveWaitingDuration observes the queue length for flow control
func ObserveWaitingDuration(priorityLevel, flowSchema, execute string, waitTime time.Duration) { func ObserveWaitingDuration(ctx context.Context, priorityLevel, flowSchema, execute string, waitTime time.Duration) {
apiserverRequestWaitingSeconds.WithLabelValues(priorityLevel, flowSchema, execute).Observe(waitTime.Seconds()) apiserverRequestWaitingSeconds.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema, execute).Observe(waitTime.Seconds())
} }
// ObserveExecutionDuration observes the execution duration for flow control // ObserveExecutionDuration observes the execution duration for flow control
func ObserveExecutionDuration(priorityLevel, flowSchema string, executionTime time.Duration) { func ObserveExecutionDuration(ctx context.Context, priorityLevel, flowSchema string, executionTime time.Duration) {
apiserverRequestExecutionSeconds.WithLabelValues(priorityLevel, flowSchema).Observe(executionTime.Seconds()) apiserverRequestExecutionSeconds.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema).Observe(executionTime.Seconds())
} }