Merge pull request #110164 from MikeSpreitzer/supply-denominators

Supply denominators
This commit is contained in:
Kubernetes Prow Robot 2022-07-25 13:32:34 -07:00 committed by GitHub
commit d48c067771
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 105 additions and 34 deletions

View File

@ -41,10 +41,6 @@ const (
// the metrics tracks maximal value over period making this // the metrics tracks maximal value over period making this
// longer will increase the metric value. // longer will increase the metric value.
inflightUsageMetricUpdatePeriod = time.Second inflightUsageMetricUpdatePeriod = time.Second
// How often to run maintenance on observations to ensure
// that they do not fall too far behind.
observationMaintenancePeriod = 10 * time.Second
) )
var ( var (
@ -91,8 +87,6 @@ func (w *requestWatermark) recordReadOnly(readOnlyVal int) {
// watermark tracks requests being executed (not waiting in a queue) // watermark tracks requests being executed (not waiting in a queue)
var watermark = &requestWatermark{ var watermark = &requestWatermark{
phase: metrics.ExecutingPhase, phase: metrics.ExecutingPhase,
readOnlyObserver: fcmetrics.ReadWriteConcurrencyGaugeVec.NewForLabelValuesSafe(0, 1, []string{fcmetrics.LabelValueExecuting, metrics.ReadOnlyKind}),
mutatingObserver: fcmetrics.ReadWriteConcurrencyGaugeVec.NewForLabelValuesSafe(0, 1, []string{fcmetrics.LabelValueExecuting, metrics.MutatingKind}),
} }
// startWatermarkMaintenance starts the goroutines to observe and maintain the specified watermark. // startWatermarkMaintenance starts the goroutines to observe and maintain the specified watermark.
@ -108,14 +102,25 @@ func startWatermarkMaintenance(watermark *requestWatermark, stopCh <-chan struct
metrics.UpdateInflightRequestMetrics(watermark.phase, readOnlyWatermark, mutatingWatermark) metrics.UpdateInflightRequestMetrics(watermark.phase, readOnlyWatermark, mutatingWatermark)
}, inflightUsageMetricUpdatePeriod, stopCh) }, inflightUsageMetricUpdatePeriod, stopCh)
}
// Periodically observe the watermarks. This is done to ensure that they do not fall too far behind. When they do var initMaxInFlightOnce sync.Once
// fall too far behind, then there is a long delay in responding to the next request received while the observer
// catches back up. func initMaxInFlight(nonMutatingLimit, mutatingLimit int) {
go wait.Until(func() { initMaxInFlightOnce.Do(func() {
watermark.readOnlyObserver.Add(0) // Fetching these gauges is delayed until after their underlying metric has been registered
watermark.mutatingObserver.Add(0) // so that this latches onto the efficient implementation.
}, observationMaintenancePeriod, stopCh) watermark.readOnlyObserver = fcmetrics.GetExecutingReadonlyConcurrency()
watermark.mutatingObserver = fcmetrics.GetExecutingMutatingConcurrency()
if nonMutatingLimit != 0 {
watermark.readOnlyObserver.SetDenominator(float64(nonMutatingLimit))
klog.V(2).InfoS("Set denominator for readonly requests", "limit", nonMutatingLimit)
}
if mutatingLimit != 0 {
watermark.mutatingObserver.SetDenominator(float64(mutatingLimit))
klog.V(2).InfoS("Set denominator for mutating requests", "limit", mutatingLimit)
}
})
} }
// WithMaxInFlightLimit limits the number of in-flight requests to buffer size of the passed in channel. // WithMaxInFlightLimit limits the number of in-flight requests to buffer size of the passed in channel.
@ -132,12 +137,17 @@ func WithMaxInFlightLimit(
var mutatingChan chan bool var mutatingChan chan bool
if nonMutatingLimit != 0 { if nonMutatingLimit != 0 {
nonMutatingChan = make(chan bool, nonMutatingLimit) nonMutatingChan = make(chan bool, nonMutatingLimit)
watermark.readOnlyObserver.SetDenominator(float64(nonMutatingLimit)) klog.V(2).InfoS("Initialized nonMutatingChan", "len", nonMutatingLimit)
} else {
klog.V(2).InfoS("Running with nil nonMutatingChan")
} }
if mutatingLimit != 0 { if mutatingLimit != 0 {
mutatingChan = make(chan bool, mutatingLimit) mutatingChan = make(chan bool, mutatingLimit)
watermark.mutatingObserver.SetDenominator(float64(mutatingLimit)) klog.V(2).InfoS("Initialized mutatingChan", "len", mutatingLimit)
} else {
klog.V(2).InfoS("Running with nil mutatingChan")
} }
initMaxInFlight(nonMutatingLimit, mutatingLimit)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context() ctx := r.Context()

View File

@ -32,7 +32,7 @@ import (
fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics" fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
) )
func createMaxInflightServer(callsWg, blockWg *sync.WaitGroup, disableCallsWg *bool, disableCallsWgMutex *sync.Mutex, nonMutating, mutating int) *httptest.Server { func createMaxInflightServer(t *testing.T, callsWg, blockWg *sync.WaitGroup, disableCallsWg *bool, disableCallsWgMutex *sync.Mutex, nonMutating, mutating int) *httptest.Server {
fcmetrics.Register() fcmetrics.Register()
longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy")) longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy"))
@ -49,7 +49,9 @@ func createMaxInflightServer(callsWg, blockWg *sync.WaitGroup, disableCallsWg *b
if waitForCalls { if waitForCalls {
callsWg.Done() callsWg.Done()
} }
t.Logf("About to blockWg.Wait(), requestURI=%v, remoteAddr=%v", r.RequestURI, r.RemoteAddr)
blockWg.Wait() blockWg.Wait()
t.Logf("Returned from blockWg.Wait(), requestURI=%v, remoteAddr=%v", r.RequestURI, r.RemoteAddr)
}), }),
nonMutating, nonMutating,
mutating, mutating,
@ -103,7 +105,7 @@ func TestMaxInFlightNonMutating(t *testing.T) {
waitForCalls := true waitForCalls := true
waitForCallsMutex := sync.Mutex{} waitForCallsMutex := sync.Mutex{}
server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, AllowedNonMutatingInflightRequestsNo, 1) server := createMaxInflightServer(t, calls, block, &waitForCalls, &waitForCallsMutex, AllowedNonMutatingInflightRequestsNo, 1)
defer server.Close() defer server.Close()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -187,7 +189,7 @@ func TestMaxInFlightMutating(t *testing.T) {
waitForCalls := true waitForCalls := true
waitForCallsMutex := sync.Mutex{} waitForCallsMutex := sync.Mutex{}
server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo) server := createMaxInflightServer(t, calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo)
defer server.Close() defer server.Close()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -283,7 +285,7 @@ func TestMaxInFlightSkipsMasters(t *testing.T) {
waitForCalls := true waitForCalls := true
waitForCallsMutex := sync.Mutex{} waitForCallsMutex := sync.Mutex{}
server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo) server := createMaxInflightServer(t, calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo)
defer server.Close() defer server.Close()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())

View File

@ -21,6 +21,7 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"runtime" "runtime"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -47,8 +48,6 @@ type PriorityAndFairnessClassification struct {
// waitingMark tracks requests waiting rather than being executed // waitingMark tracks requests waiting rather than being executed
var waitingMark = &requestWatermark{ var waitingMark = &requestWatermark{
phase: epmetrics.WaitingPhase, phase: epmetrics.WaitingPhase,
readOnlyObserver: fcmetrics.ReadWriteConcurrencyGaugeVec.NewForLabelValuesSafe(0, 1, []string{fcmetrics.LabelValueWaiting, epmetrics.ReadOnlyKind}),
mutatingObserver: fcmetrics.ReadWriteConcurrencyGaugeVec.NewForLabelValuesSafe(0, 1, []string{fcmetrics.LabelValueWaiting, epmetrics.MutatingKind}),
} }
var atomicMutatingExecuting, atomicReadOnlyExecuting int32 var atomicMutatingExecuting, atomicReadOnlyExecuting int32
@ -66,6 +65,8 @@ func truncateLogField(s string) string {
return s return s
} }
var initAPFOnce sync.Once
// WithPriorityAndFairness limits the number of in-flight // WithPriorityAndFairness limits the number of in-flight
// requests in a fine-grained way. // requests in a fine-grained way.
func WithPriorityAndFairness( func WithPriorityAndFairness(
@ -78,6 +79,13 @@ func WithPriorityAndFairness(
klog.Warningf("priority and fairness support not found, skipping") klog.Warningf("priority and fairness support not found, skipping")
return handler return handler
} }
initAPFOnce.Do(func() {
initMaxInFlight(0, 0)
// Fetching these gauges is delayed until after their underlying metric has been registered
// so that this latches onto the efficient implementation.
waitingMark.readOnlyObserver = fcmetrics.GetWaitingReadonlyConcurrency()
waitingMark.mutatingObserver = fcmetrics.GetWaitingMutatingConcurrency()
})
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context() ctx := r.Context()
requestInfo, ok := apirequest.RequestInfoFrom(ctx) requestInfo, ok := apirequest.RequestInfoFrom(ctx)

View File

@ -143,8 +143,8 @@ type configController struct {
// This may only be accessed from the one and only worker goroutine. // This may only be accessed from the one and only worker goroutine.
mostRecentUpdates []updateAttempt mostRecentUpdates []updateAttempt
// This must be locked while accessing flowSchemas or // This must be locked while accessing the later fields.
// priorityLevelStates. A lock for writing is needed // A lock for writing is needed
// for writing to any of the following: // for writing to any of the following:
// - the flowSchemas field // - the flowSchemas field
// - the slice held in the flowSchemas field // - the slice held in the flowSchemas field
@ -387,6 +387,8 @@ type cfgMeal struct {
// provoking a call into this controller while the lock held // provoking a call into this controller while the lock held
// waiting on that request to complete. // waiting on that request to complete.
fsStatusUpdates []fsStatusUpdate fsStatusUpdates []fsStatusUpdate
maxWaitingRequests, maxExecutingRequests int
} }
// A buffered set of status updates for FlowSchemas // A buffered set of status updates for FlowSchemas
@ -511,7 +513,13 @@ func (cfgCtlr *configController) lockAndDigestConfigObjects(newPLs []*flowcontro
// The new config has been constructed // The new config has been constructed
cfgCtlr.priorityLevelStates = meal.newPLStates cfgCtlr.priorityLevelStates = meal.newPLStates
klog.V(5).Infof("Switched to new API Priority and Fairness configuration") klog.V(5).InfoS("Switched to new API Priority and Fairness configuration", "maxWaitingRequests", meal.maxWaitingRequests, "maxExecutinRequests", meal.maxExecutingRequests)
metrics.GetWaitingReadonlyConcurrency().SetDenominator(float64(meal.maxWaitingRequests))
metrics.GetWaitingMutatingConcurrency().SetDenominator(float64(meal.maxWaitingRequests))
metrics.GetExecutingReadonlyConcurrency().SetDenominator(float64(meal.maxExecutingRequests))
metrics.GetExecutingMutatingConcurrency().SetDenominator(float64(meal.maxExecutingRequests))
return meal.fsStatusUpdates return meal.fsStatusUpdates
} }
@ -663,6 +671,12 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
// difference will be negligible. // difference will be negligible.
concurrencyLimit := int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit) * float64(plState.pl.Spec.Limited.AssuredConcurrencyShares) / meal.shareSum)) concurrencyLimit := int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit) * float64(plState.pl.Spec.Limited.AssuredConcurrencyShares) / meal.shareSum))
metrics.UpdateSharedConcurrencyLimit(plName, concurrencyLimit) metrics.UpdateSharedConcurrencyLimit(plName, concurrencyLimit)
meal.maxExecutingRequests += concurrencyLimit
var waitLimit int
if qCfg := plState.pl.Spec.Limited.LimitResponse.Queuing; qCfg != nil {
waitLimit = int(qCfg.Queues * qCfg.QueueLengthLimit)
}
meal.maxWaitingRequests += waitLimit
if plState.queues == nil { if plState.queues == nil {
klog.V(5).Infof("Introducing queues for priority level %q: config=%s, concurrencyLimit=%d, quiescing=%v (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, plState.quiescing, plState.pl.Spec.Limited.AssuredConcurrencyShares, meal.shareSum) klog.V(5).Infof("Introducing queues for priority level %q: config=%s, concurrencyLimit=%d, quiescing=%v (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, plState.quiescing, plState.pl.Spec.Limited.AssuredConcurrencyShares, meal.shareSum)

View File

@ -243,6 +243,9 @@ func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig,
if qll < 1 { if qll < 1 {
qll = 1 qll = 1
} }
if qCfg.DesiredNumQueues > 0 {
qll *= qCfg.DesiredNumQueues
}
qs.reqsGaugePair.RequestsWaiting.SetDenominator(float64(qll)) qs.reqsGaugePair.RequestsWaiting.SetDenominator(float64(qll))
qs.reqsGaugePair.RequestsExecuting.SetDenominator(float64(dCfg.ConcurrencyLimit)) qs.reqsGaugePair.RequestsExecuting.SetDenominator(float64(dCfg.ConcurrencyLimit))
qs.execSeatsGauge.SetDenominator(float64(dCfg.ConcurrencyLimit)) qs.execSeatsGauge.SetDenominator(float64(dCfg.ConcurrencyLimit))

View File

@ -23,6 +23,7 @@ import (
"sync" "sync"
"time" "time"
epmetrics "k8s.io/apiserver/pkg/endpoints/metrics"
apirequest "k8s.io/apiserver/pkg/endpoints/request" apirequest "k8s.io/apiserver/pkg/endpoints/request"
compbasemetrics "k8s.io/component-base/metrics" compbasemetrics "k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/legacyregistry"
@ -128,21 +129,22 @@ var (
Name: "priority_level_request_utilization", Name: "priority_level_request_utilization",
Help: "Observations, at the end of every nanosecond, of number of requests (as a fraction of the relevant limit) waiting or in any stage of execution (but only initial stage for WATCHes)", Help: "Observations, at the end of every nanosecond, of number of requests (as a fraction of the relevant limit) waiting or in any stage of execution (but only initial stage for WATCHes)",
// For executing: the denominator will be seats, so this metric will skew low. // For executing: the denominator will be seats, so this metric will skew low.
// FOr waiting: the denominiator is individual queue length limit, so this metric can go over 1. Issue #110160 // For waiting: total queue capacity is generally quite generous, so this metric will skew low.
Buckets: []float64{0, 0.001, 0.0025, 0.005, 0.1, 0.25, 0.5, 0.75, 1, 10, 100}, Buckets: []float64{0, 0.001, 0.003, 0.01, 0.03, 0.1, 0.25, 0.5, 0.75, 1},
StabilityLevel: compbasemetrics.ALPHA, StabilityLevel: compbasemetrics.ALPHA,
}, },
LabelNamePhase, priorityLevel, LabelNamePhase, priorityLevel,
) )
// ReadWriteConcurrencyPairVec creates gauges of number of requests broken down by phase and mutating vs readonly // readWriteConcurrencyGaugeVec creates ratioed gauges of requests/limit broken down by phase and mutating vs readonly
ReadWriteConcurrencyGaugeVec = NewTimingRatioHistogramVec( readWriteConcurrencyGaugeVec = NewTimingRatioHistogramVec(
&compbasemetrics.TimingHistogramOpts{ &compbasemetrics.TimingHistogramOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "read_vs_write_current_requests", Name: "read_vs_write_current_requests",
Help: "Observations, at the end of every nanosecond, of the number of requests (as a fraction of the relevant limit, if max-in-flight filter is being used) waiting or in regular stage of execution", Help: "Observations, at the end of every nanosecond, of the number of requests (as a fraction of the relevant limit) waiting or in regular stage of execution",
Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99, 1, 3, 10, 30, 100, 300, 1000, 3000}, // This metric will skew low for the same reason as the priority level metrics
// TODO: something about the utilization vs count irregularity. Issue #109846 // and also because APF has a combined limit for mutating and readonly.
Buckets: []float64{0, 0.001, 0.01, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99, 1},
StabilityLevel: compbasemetrics.ALPHA, StabilityLevel: compbasemetrics.ALPHA,
}, },
LabelNamePhase, requestKind, LabelNamePhase, requestKind,
@ -337,9 +339,39 @@ var (
}. }.
Append(PriorityLevelExecutionSeatsGaugeVec.metrics()...). Append(PriorityLevelExecutionSeatsGaugeVec.metrics()...).
Append(PriorityLevelConcurrencyGaugeVec.metrics()...). Append(PriorityLevelConcurrencyGaugeVec.metrics()...).
Append(ReadWriteConcurrencyGaugeVec.metrics()...) Append(readWriteConcurrencyGaugeVec.metrics()...)
) )
type indexOnce struct {
labelValues []string
once sync.Once
gauge RatioedGauge
}
func (io *indexOnce) getGauge() RatioedGauge {
io.once.Do(func() {
io.gauge = readWriteConcurrencyGaugeVec.NewForLabelValuesSafe(0, 1, io.labelValues)
})
return io.gauge
}
var waitingReadonly = indexOnce{labelValues: []string{LabelValueWaiting, epmetrics.ReadOnlyKind}}
var executingReadonly = indexOnce{labelValues: []string{LabelValueExecuting, epmetrics.ReadOnlyKind}}
var waitingMutating = indexOnce{labelValues: []string{LabelValueWaiting, epmetrics.MutatingKind}}
var executingMutating = indexOnce{labelValues: []string{LabelValueExecuting, epmetrics.MutatingKind}}
// GetWaitingReadonlyConcurrency returns the gauge of number of readonly requests waiting / limit on those.
var GetWaitingReadonlyConcurrency = waitingReadonly.getGauge
// GetExecutingReadonlyConcurrency returns the gauge of number of executing readonly requests / limit on those.
var GetExecutingReadonlyConcurrency = executingReadonly.getGauge
// GetWaitingMutatingConcurrency returns the gauge of number of mutating requests waiting / limit on those.
var GetWaitingMutatingConcurrency = waitingMutating.getGauge
// GetExecutingMutatingConcurrency returns the gauge of number of executing mutating requests / limit on those.
var GetExecutingMutatingConcurrency = executingMutating.getGauge
// AddRequestsInQueues adds the given delta to the gauge of the # of requests in the queues of the specified flowSchema and priorityLevel // AddRequestsInQueues adds the given delta to the gauge of the # of requests in the queues of the specified flowSchema and priorityLevel
func AddRequestsInQueues(ctx context.Context, priorityLevel, flowSchema string, delta int) { func AddRequestsInQueues(ctx context.Context, priorityLevel, flowSchema string, delta int) {
apiserverCurrentInqueueRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta)) apiserverCurrentInqueueRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta))

View File

@ -192,12 +192,14 @@ func (v *TimingRatioHistogramVec) NewForLabelValuesChecked(initialNumerator, ini
func (v *TimingRatioHistogramVec) NewForLabelValuesSafe(initialNumerator, initialDenominator float64, labelValues []string) RatioedGauge { func (v *TimingRatioHistogramVec) NewForLabelValuesSafe(initialNumerator, initialDenominator float64, labelValues []string) RatioedGauge {
tro, err := v.NewForLabelValuesChecked(initialNumerator, initialDenominator, labelValues) tro, err := v.NewForLabelValuesChecked(initialNumerator, initialDenominator, labelValues)
if err == nil { if err == nil {
klog.V(3).InfoS("TimingRatioHistogramVec.NewForLabelValuesSafe hit the efficient case", "fqName", v.FQName(), "labelValues", labelValues)
return tro return tro
} }
if !compbasemetrics.ErrIsNotRegistered(err) { if !compbasemetrics.ErrIsNotRegistered(err) {
klog.ErrorS(err, "Failed to extract TimingRatioHistogramVec member, using noop instead", "vectorname", v.FQName(), "labelValues", labelValues) klog.ErrorS(err, "Failed to extract TimingRatioHistogramVec member, using noop instead", "vectorname", v.FQName(), "labelValues", labelValues)
return tro return tro
} }
klog.V(3).InfoS("TimingRatioHistogramVec.NewForLabelValuesSafe hit the inefficient case", "fqName", v.FQName(), "labelValues", labelValues)
// At this point we know v.NewForLabelValuesChecked(..) returns a permanent noop, // At this point we know v.NewForLabelValuesChecked(..) returns a permanent noop,
// which we precisely want to avoid using. Instead, make our own gauge that // which we precisely want to avoid using. Instead, make our own gauge that
// fetches the element on every Set. // fetches the element on every Set.