diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 2528e65e35c..69562d72815 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -406,6 +406,7 @@ func (qs *queueSet) syncTimeLocked() { timeSinceLast := realNow.Sub(qs.lastRealTime).Seconds() qs.lastRealTime = realNow qs.virtualTime += timeSinceLast * qs.getVirtualTimeRatioLocked() + metrics.SetCurrentR(qs.qCfg.Name, qs.virtualTime) } // getVirtualTimeRatio calculates the rate at which virtual time has @@ -574,6 +575,7 @@ func (qs *queueSet) dispatchAsMuchAsPossibleLocked() { } func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, width uint, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request { + // does not call metrics.SetDispatchMetrics because there is no queuing and thus no interesting virtual world now := qs.clock.Now() req := &request{ qs: qs, @@ -692,6 +694,10 @@ func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool { // the oldest waiting request is minimal. func (qs *queueSet) selectQueueLocked() *queue { minVirtualFinish := math.Inf(1) + sMin := math.Inf(1) + dsMin := math.Inf(1) + sMax := math.Inf(-1) + dsMax := math.Inf(-1) var minQueue *queue var minIndex int nq := len(qs.queues) @@ -699,6 +705,11 @@ func (qs *queueSet) selectQueueLocked() *queue { qs.robinIndex = (qs.robinIndex + 1) % nq queue := qs.queues[qs.robinIndex] if queue.requests.Length() != 0 { + sMin = math.Min(sMin, queue.virtualStart) + sMax = math.Max(sMax, queue.virtualStart) + estimatedWorkInProgress := qs.estimatedServiceTime * float64(queue.seatsInUse) + dsMin = math.Min(dsMin, queue.virtualStart-estimatedWorkInProgress) + dsMax = math.Max(dsMax, queue.virtualStart-estimatedWorkInProgress) // the virtual finish time of the oldest request is: // virtual start time + G // we are not taking the width of the request into account when @@ -758,6 +769,7 @@ func (qs *queueSet) selectQueueLocked() *queue { // per-queue virtual time should not fall behind the global minQueue.virtualStart = qs.virtualTime + previouslyEstimatedServiceTime } + metrics.SetDispatchMetrics(qs.qCfg.Name, qs.virtualTime, minQueue.virtualStart, sMin, sMax, dsMin, dsMax) return minQueue } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go index 88b2f13c7c4..a0d22852318 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go @@ -144,6 +144,61 @@ var ( }, []string{requestKind}) + apiserverCurrentR = compbasemetrics.NewGaugeVec( + &compbasemetrics.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "current_r", + Help: "R(time of last change)", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{priorityLevel}, + ) + + apiserverDispatchR = compbasemetrics.NewGaugeVec( + &compbasemetrics.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "dispatch_r", + Help: "R(time of last dispatch)", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{priorityLevel}, + ) + + apiserverLatestS = compbasemetrics.NewGaugeVec( + &compbasemetrics.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "latest_s", + Help: "S(most recently dispatched request)", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{priorityLevel}, + ) + + apiserverNextSBounds = compbasemetrics.NewGaugeVec( + &compbasemetrics.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "next_s_bounds", + Help: "min and max, over queues, of S(oldest waiting request in queue)", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{priorityLevel, "bound"}, + ) + + apiserverNextDiscountedSBounds = compbasemetrics.NewGaugeVec( + &compbasemetrics.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "next_discounted_s_bounds", + Help: "min and max, over queues, of S(oldest waiting request in queue) - estimated work in progress", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{priorityLevel, "bound"}, + ) + apiserverCurrentInqueueRequests = compbasemetrics.NewGaugeVec( &compbasemetrics.GaugeOpts{ Namespace: namespace, @@ -220,6 +275,11 @@ var ( metrics = Registerables{ apiserverRejectedRequestsTotal, apiserverDispatchedRequestsTotal, + apiserverCurrentR, + apiserverDispatchR, + apiserverLatestS, + apiserverNextSBounds, + apiserverNextDiscountedSBounds, apiserverCurrentInqueueRequests, apiserverRequestQueueLength, apiserverRequestConcurrencyLimit, @@ -242,6 +302,21 @@ func AddRequestsExecuting(ctx context.Context, priorityLevel, flowSchema string, apiserverCurrentExecutingRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta)) } +// SetCurrentR sets the current-R (virtualTime) gauge for the given priority level +func SetCurrentR(priorityLevel string, r float64) { + apiserverCurrentR.WithLabelValues(priorityLevel).Set(r) +} + +// SetLatestS sets the latest-S (virtual time of dispatched request) gauge for the given priority level +func SetDispatchMetrics(priorityLevel string, r, s, sMin, sMax, discountedSMin, discountedSMax float64) { + apiserverDispatchR.WithLabelValues(priorityLevel).Set(r) + apiserverLatestS.WithLabelValues(priorityLevel).Set(s) + apiserverNextSBounds.WithLabelValues(priorityLevel, "min").Set(sMin) + apiserverNextSBounds.WithLabelValues(priorityLevel, "max").Set(sMax) + apiserverNextDiscountedSBounds.WithLabelValues(priorityLevel, "min").Set(discountedSMin) + apiserverNextDiscountedSBounds.WithLabelValues(priorityLevel, "max").Set(discountedSMax) +} + // AddRequestConcurrencyInUse adds the given delta to the gauge of concurrency in use by // the currently executing requests of the given flowSchema and priorityLevel func AddRequestConcurrencyInUse(priorityLevel, flowSchema string, delta int) {