From fdd921cad0cd9308ec62c1b86c9c1cc5d12e5d21 Mon Sep 17 00:00:00 2001 From: Mike Spreitzer Date: Sun, 22 May 2022 23:39:49 -0400 Subject: [PATCH] Fix APF metric denominator problems Co-authored-by: JUN YANG --- .../pkg/server/filters/maxinflight.go | 42 +++++++++------- .../pkg/server/filters/maxinflight_test.go | 10 ++-- .../server/filters/priority-and-fairness.go | 14 ++++-- .../pkg/util/flowcontrol/apf_controller.go | 20 ++++++-- .../fairqueuing/queueset/queueset.go | 3 ++ .../pkg/util/flowcontrol/metrics/metrics.go | 48 +++++++++++++++---- .../metrics/timing_ratio_histogram.go | 2 + 7 files changed, 105 insertions(+), 34 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go index acfcf78c3f0..5d7b00ec337 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go @@ -41,10 +41,6 @@ const ( // the metrics tracks maximal value over period making this // longer will increase the metric value. inflightUsageMetricUpdatePeriod = time.Second - - // How often to run maintenance on observations to ensure - // that they do not fall too far behind. - observationMaintenancePeriod = 10 * time.Second ) var ( @@ -90,9 +86,7 @@ func (w *requestWatermark) recordReadOnly(readOnlyVal int) { // watermark tracks requests being executed (not waiting in a queue) var watermark = &requestWatermark{ - phase: metrics.ExecutingPhase, - readOnlyObserver: fcmetrics.ReadWriteConcurrencyGaugeVec.NewForLabelValuesSafe(0, 1, []string{fcmetrics.LabelValueExecuting, metrics.ReadOnlyKind}), - mutatingObserver: fcmetrics.ReadWriteConcurrencyGaugeVec.NewForLabelValuesSafe(0, 1, []string{fcmetrics.LabelValueExecuting, metrics.MutatingKind}), + phase: metrics.ExecutingPhase, } // startWatermarkMaintenance starts the goroutines to observe and maintain the specified watermark. @@ -108,14 +102,25 @@ func startWatermarkMaintenance(watermark *requestWatermark, stopCh <-chan struct metrics.UpdateInflightRequestMetrics(watermark.phase, readOnlyWatermark, mutatingWatermark) }, inflightUsageMetricUpdatePeriod, stopCh) +} - // Periodically observe the watermarks. This is done to ensure that they do not fall too far behind. When they do - // fall too far behind, then there is a long delay in responding to the next request received while the observer - // catches back up. - go wait.Until(func() { - watermark.readOnlyObserver.Add(0) - watermark.mutatingObserver.Add(0) - }, observationMaintenancePeriod, stopCh) +var initMaxInFlightOnce sync.Once + +func initMaxInFlight(nonMutatingLimit, mutatingLimit int) { + initMaxInFlightOnce.Do(func() { + // Fetching these gauges is delayed until after their underlying metric has been registered + // so that this latches onto the efficient implementation. + watermark.readOnlyObserver = fcmetrics.GetExecutingReadonlyConcurrency() + watermark.mutatingObserver = fcmetrics.GetExecutingMutatingConcurrency() + if nonMutatingLimit != 0 { + watermark.readOnlyObserver.SetDenominator(float64(nonMutatingLimit)) + klog.V(2).InfoS("Set denominator for readonly requests", "limit", nonMutatingLimit) + } + if mutatingLimit != 0 { + watermark.mutatingObserver.SetDenominator(float64(mutatingLimit)) + klog.V(2).InfoS("Set denominator for mutating requests", "limit", mutatingLimit) + } + }) } // WithMaxInFlightLimit limits the number of in-flight requests to buffer size of the passed in channel. @@ -132,12 +137,17 @@ func WithMaxInFlightLimit( var mutatingChan chan bool if nonMutatingLimit != 0 { nonMutatingChan = make(chan bool, nonMutatingLimit) - watermark.readOnlyObserver.SetDenominator(float64(nonMutatingLimit)) + klog.V(2).InfoS("Initialized nonMutatingChan", "len", nonMutatingLimit) + } else { + klog.V(2).InfoS("Running with nil nonMutatingChan") } if mutatingLimit != 0 { mutatingChan = make(chan bool, mutatingLimit) - watermark.mutatingObserver.SetDenominator(float64(mutatingLimit)) + klog.V(2).InfoS("Initialized mutatingChan", "len", mutatingLimit) + } else { + klog.V(2).InfoS("Running with nil mutatingChan") } + initMaxInFlight(nonMutatingLimit, mutatingLimit) return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight_test.go index db5a0da17a4..ad6f0f0d541 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight_test.go @@ -32,7 +32,7 @@ import ( fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics" ) -func createMaxInflightServer(callsWg, blockWg *sync.WaitGroup, disableCallsWg *bool, disableCallsWgMutex *sync.Mutex, nonMutating, mutating int) *httptest.Server { +func createMaxInflightServer(t *testing.T, callsWg, blockWg *sync.WaitGroup, disableCallsWg *bool, disableCallsWgMutex *sync.Mutex, nonMutating, mutating int) *httptest.Server { fcmetrics.Register() longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy")) @@ -49,7 +49,9 @@ func createMaxInflightServer(callsWg, blockWg *sync.WaitGroup, disableCallsWg *b if waitForCalls { callsWg.Done() } + t.Logf("About to blockWg.Wait(), requestURI=%v, remoteAddr=%v", r.RequestURI, r.RemoteAddr) blockWg.Wait() + t.Logf("Returned from blockWg.Wait(), requestURI=%v, remoteAddr=%v", r.RequestURI, r.RemoteAddr) }), nonMutating, mutating, @@ -103,7 +105,7 @@ func TestMaxInFlightNonMutating(t *testing.T) { waitForCalls := true waitForCallsMutex := sync.Mutex{} - server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, AllowedNonMutatingInflightRequestsNo, 1) + server := createMaxInflightServer(t, calls, block, &waitForCalls, &waitForCallsMutex, AllowedNonMutatingInflightRequestsNo, 1) defer server.Close() ctx, cancel := context.WithCancel(context.Background()) @@ -187,7 +189,7 @@ func TestMaxInFlightMutating(t *testing.T) { waitForCalls := true waitForCallsMutex := sync.Mutex{} - server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo) + server := createMaxInflightServer(t, calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo) defer server.Close() ctx, cancel := context.WithCancel(context.Background()) @@ -283,7 +285,7 @@ func TestMaxInFlightSkipsMasters(t *testing.T) { waitForCalls := true waitForCallsMutex := sync.Mutex{} - server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo) + server := createMaxInflightServer(t, calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo) defer server.Close() ctx, cancel := context.WithCancel(context.Background()) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go index f117a587e79..f62b809ce61 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go @@ -21,6 +21,7 @@ import ( "fmt" "net/http" "runtime" + "sync" "sync/atomic" "time" @@ -46,9 +47,7 @@ type PriorityAndFairnessClassification struct { // waitingMark tracks requests waiting rather than being executed var waitingMark = &requestWatermark{ - phase: epmetrics.WaitingPhase, - readOnlyObserver: fcmetrics.ReadWriteConcurrencyGaugeVec.NewForLabelValuesSafe(0, 1, []string{fcmetrics.LabelValueWaiting, epmetrics.ReadOnlyKind}), - mutatingObserver: fcmetrics.ReadWriteConcurrencyGaugeVec.NewForLabelValuesSafe(0, 1, []string{fcmetrics.LabelValueWaiting, epmetrics.MutatingKind}), + phase: epmetrics.WaitingPhase, } var atomicMutatingExecuting, atomicReadOnlyExecuting int32 @@ -66,6 +65,8 @@ func truncateLogField(s string) string { return s } +var initAPFOnce sync.Once + // WithPriorityAndFairness limits the number of in-flight // requests in a fine-grained way. func WithPriorityAndFairness( @@ -78,6 +79,13 @@ func WithPriorityAndFairness( klog.Warningf("priority and fairness support not found, skipping") return handler } + initAPFOnce.Do(func() { + initMaxInFlight(0, 0) + // Fetching these gauges is delayed until after their underlying metric has been registered + // so that this latches onto the efficient implementation. + waitingMark.readOnlyObserver = fcmetrics.GetWaitingReadonlyConcurrency() + waitingMark.mutatingObserver = fcmetrics.GetWaitingMutatingConcurrency() + }) return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() requestInfo, ok := apirequest.RequestInfoFrom(ctx) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index fa18875e9de..c655ff37b44 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -143,8 +143,8 @@ type configController struct { // This may only be accessed from the one and only worker goroutine. mostRecentUpdates []updateAttempt - // This must be locked while accessing flowSchemas or - // priorityLevelStates. A lock for writing is needed + // This must be locked while accessing the later fields. + // A lock for writing is needed // for writing to any of the following: // - the flowSchemas field // - the slice held in the flowSchemas field @@ -404,6 +404,8 @@ type cfgMeal struct { // provoking a call into this controller while the lock held // waiting on that request to complete. fsStatusUpdates []fsStatusUpdate + + maxWaitingRequests, maxExecutingRequests int } // A buffered set of status updates for FlowSchemas @@ -528,7 +530,13 @@ func (cfgCtlr *configController) lockAndDigestConfigObjects(newPLs []*flowcontro // The new config has been constructed cfgCtlr.priorityLevelStates = meal.newPLStates - klog.V(5).Infof("Switched to new API Priority and Fairness configuration") + klog.V(5).InfoS("Switched to new API Priority and Fairness configuration", "maxWaitingRequests", meal.maxWaitingRequests, "maxExecutinRequests", meal.maxExecutingRequests) + + metrics.GetWaitingReadonlyConcurrency().SetDenominator(float64(meal.maxWaitingRequests)) + metrics.GetWaitingMutatingConcurrency().SetDenominator(float64(meal.maxWaitingRequests)) + metrics.GetExecutingReadonlyConcurrency().SetDenominator(float64(meal.maxExecutingRequests)) + metrics.GetExecutingMutatingConcurrency().SetDenominator(float64(meal.maxExecutingRequests)) + return meal.fsStatusUpdates } @@ -680,6 +688,12 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() { // difference will be negligible. concurrencyLimit := int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit) * float64(plState.pl.Spec.Limited.AssuredConcurrencyShares) / meal.shareSum)) metrics.UpdateSharedConcurrencyLimit(plName, concurrencyLimit) + meal.maxExecutingRequests += concurrencyLimit + var waitLimit int + if qCfg := plState.pl.Spec.Limited.LimitResponse.Queuing; qCfg != nil { + waitLimit = int(qCfg.Queues * qCfg.QueueLengthLimit) + } + meal.maxWaitingRequests += waitLimit if plState.queues == nil { klog.V(5).Infof("Introducing queues for priority level %q: config=%s, concurrencyLimit=%d, quiescing=%v (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, plState.quiescing, plState.pl.Spec.Limited.AssuredConcurrencyShares, meal.shareSum) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 171382a7c4a..25c255ea74e 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -243,6 +243,9 @@ func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig, if qll < 1 { qll = 1 } + if qCfg.DesiredNumQueues > 0 { + qll *= qCfg.DesiredNumQueues + } qs.reqsGaugePair.RequestsWaiting.SetDenominator(float64(qll)) qs.reqsGaugePair.RequestsExecuting.SetDenominator(float64(dCfg.ConcurrencyLimit)) qs.execSeatsGauge.SetDenominator(float64(dCfg.ConcurrencyLimit)) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go index e8ee8652348..3ef66292eb3 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go @@ -23,6 +23,7 @@ import ( "sync" "time" + epmetrics "k8s.io/apiserver/pkg/endpoints/metrics" apirequest "k8s.io/apiserver/pkg/endpoints/request" compbasemetrics "k8s.io/component-base/metrics" "k8s.io/component-base/metrics/legacyregistry" @@ -128,21 +129,22 @@ var ( Name: "priority_level_request_utilization", Help: "Observations, at the end of every nanosecond, of number of requests (as a fraction of the relevant limit) waiting or in any stage of execution (but only initial stage for WATCHes)", // For executing: the denominator will be seats, so this metric will skew low. - // FOr waiting: the denominiator is individual queue length limit, so this metric can go over 1. Issue #110160 - Buckets: []float64{0, 0.001, 0.0025, 0.005, 0.1, 0.25, 0.5, 0.75, 1, 10, 100}, + // For waiting: total queue capacity is generally quite generous, so this metric will skew low. + Buckets: []float64{0, 0.001, 0.003, 0.01, 0.03, 0.1, 0.25, 0.5, 0.75, 1}, StabilityLevel: compbasemetrics.ALPHA, }, LabelNamePhase, priorityLevel, ) - // ReadWriteConcurrencyPairVec creates gauges of number of requests broken down by phase and mutating vs readonly - ReadWriteConcurrencyGaugeVec = NewTimingRatioHistogramVec( + // readWriteConcurrencyGaugeVec creates ratioed gauges of requests/limit broken down by phase and mutating vs readonly + readWriteConcurrencyGaugeVec = NewTimingRatioHistogramVec( &compbasemetrics.TimingHistogramOpts{ Namespace: namespace, Subsystem: subsystem, Name: "read_vs_write_current_requests", - Help: "Observations, at the end of every nanosecond, of the number of requests (as a fraction of the relevant limit, if max-in-flight filter is being used) waiting or in regular stage of execution", - Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99, 1, 3, 10, 30, 100, 300, 1000, 3000}, - // TODO: something about the utilization vs count irregularity. Issue #109846 + Help: "Observations, at the end of every nanosecond, of the number of requests (as a fraction of the relevant limit) waiting or in regular stage of execution", + // This metric will skew low for the same reason as the priority level metrics + // and also because APF has a combined limit for mutating and readonly. + Buckets: []float64{0, 0.001, 0.01, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99, 1}, StabilityLevel: compbasemetrics.ALPHA, }, LabelNamePhase, requestKind, @@ -337,9 +339,39 @@ var ( }. Append(PriorityLevelExecutionSeatsGaugeVec.metrics()...). Append(PriorityLevelConcurrencyGaugeVec.metrics()...). - Append(ReadWriteConcurrencyGaugeVec.metrics()...) + Append(readWriteConcurrencyGaugeVec.metrics()...) ) +type indexOnce struct { + labelValues []string + once sync.Once + gauge RatioedGauge +} + +func (io *indexOnce) getGauge() RatioedGauge { + io.once.Do(func() { + io.gauge = readWriteConcurrencyGaugeVec.NewForLabelValuesSafe(0, 1, io.labelValues) + }) + return io.gauge +} + +var waitingReadonly = indexOnce{labelValues: []string{LabelValueWaiting, epmetrics.ReadOnlyKind}} +var executingReadonly = indexOnce{labelValues: []string{LabelValueExecuting, epmetrics.ReadOnlyKind}} +var waitingMutating = indexOnce{labelValues: []string{LabelValueWaiting, epmetrics.MutatingKind}} +var executingMutating = indexOnce{labelValues: []string{LabelValueExecuting, epmetrics.MutatingKind}} + +// GetWaitingReadonlyConcurrency returns the gauge of number of readonly requests waiting / limit on those. +var GetWaitingReadonlyConcurrency = waitingReadonly.getGauge + +// GetExecutingReadonlyConcurrency returns the gauge of number of executing readonly requests / limit on those. +var GetExecutingReadonlyConcurrency = executingReadonly.getGauge + +// GetWaitingMutatingConcurrency returns the gauge of number of mutating requests waiting / limit on those. +var GetWaitingMutatingConcurrency = waitingMutating.getGauge + +// GetExecutingMutatingConcurrency returns the gauge of number of executing mutating requests / limit on those. +var GetExecutingMutatingConcurrency = executingMutating.getGauge + // AddRequestsInQueues adds the given delta to the gauge of the # of requests in the queues of the specified flowSchema and priorityLevel func AddRequestsInQueues(ctx context.Context, priorityLevel, flowSchema string, delta int) { apiserverCurrentInqueueRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta)) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/timing_ratio_histogram.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/timing_ratio_histogram.go index 09baf434059..cd32782a49c 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/timing_ratio_histogram.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/timing_ratio_histogram.go @@ -192,12 +192,14 @@ func (v *TimingRatioHistogramVec) NewForLabelValuesChecked(initialNumerator, ini func (v *TimingRatioHistogramVec) NewForLabelValuesSafe(initialNumerator, initialDenominator float64, labelValues []string) RatioedGauge { tro, err := v.NewForLabelValuesChecked(initialNumerator, initialDenominator, labelValues) if err == nil { + klog.V(3).InfoS("TimingRatioHistogramVec.NewForLabelValuesSafe hit the efficient case", "fqName", v.FQName(), "labelValues", labelValues) return tro } if !compbasemetrics.ErrIsNotRegistered(err) { klog.ErrorS(err, "Failed to extract TimingRatioHistogramVec member, using noop instead", "vectorname", v.FQName(), "labelValues", labelValues) return tro } + klog.V(3).InfoS("TimingRatioHistogramVec.NewForLabelValuesSafe hit the inefficient case", "fqName", v.FQName(), "labelValues", labelValues) // At this point we know v.NewForLabelValuesChecked(..) returns a permanent noop, // which we precisely want to avoid using. Instead, make our own gauge that // fetches the element on every Set.