From a797fbd96de8c67aaed58aef54fbe9f0eb94a2c2 Mon Sep 17 00:00:00 2001 From: Mike Spreitzer Date: Fri, 1 Oct 2021 22:04:05 -0700 Subject: [PATCH] Keep the progress meter R from overflowing Also add test for that situation. --- .../fairqueuing/queueset/queueset.go | 73 +++++++++++++++---- .../fairqueuing/queueset/queueset_test.go | 18 ++++- .../fairqueuing/queueset/seatsecs_test.go | 28 ++++--- .../flowcontrol/fairqueuing/queueset/types.go | 1 + .../pkg/util/flowcontrol/metrics/metrics.go | 31 +++++--- 5 files changed, 114 insertions(+), 37 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 1a41a833e6e..da482688443 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -182,7 +182,7 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet { } qs.promiseFactory = qsc.factory.promiseFactoryFactory(qs) } - qs.setConfiguration(qsc.qCfg, qsc.dealer, dCfg) + qs.setConfiguration(context.Background(), qsc.qCfg, qsc.dealer, dCfg) return qs } @@ -210,8 +210,8 @@ func (qs *queueSet) BeginConfigChange(qCfg fq.QueuingConfig) (fq.QueueSetComplet // Update handling for when fields are updated is handled here as well - // eg: if DesiredNum is increased, SetConfiguration reconciles by // adding more queues. -func (qs *queueSet) setConfiguration(qCfg fq.QueuingConfig, dealer *shufflesharding.Dealer, dCfg fq.DispatchingConfig) { - qs.lockAndSyncTime() +func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig, dealer *shufflesharding.Dealer, dCfg fq.DispatchingConfig) { + qs.lockAndSyncTime(ctx) defer qs.lock.Unlock() if qCfg.DesiredNumQueues > 0 { @@ -260,7 +260,7 @@ const ( // The queueSet's promiseFactory is invoked once if the returns Request is non-nil, // not invoked if the Request is nil. func (qs *queueSet) StartRequest(ctx context.Context, workEstimate *fqrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) { - qs.lockAndSyncTime() + qs.lockAndSyncTime(ctx) defer qs.lock.Unlock() var req *request @@ -350,7 +350,7 @@ func (req *request) wait() (bool, bool) { // The final step is to wait on a decision from // somewhere and then act on it. decisionAny := req.decision.Get() - qs.lockAndSyncTime() + qs.lockAndSyncTime(req.ctx) defer qs.lock.Unlock() if req.waitStarted { // This can not happen, because the client is forbidden to @@ -396,27 +396,74 @@ func (qs *queueSet) isIdleLocked() bool { // lockAndSyncTime acquires the lock and updates the virtual time. // Doing them together avoids the mistake of modify some queue state // before calling syncTimeLocked. -func (qs *queueSet) lockAndSyncTime() { +func (qs *queueSet) lockAndSyncTime(ctx context.Context) { qs.lock.Lock() - qs.syncTimeLocked() + qs.syncTimeLocked(ctx) } // syncTimeLocked updates the virtual time based on the assumption // that the current state of the queues has been in effect since // `qs.lastRealTime`. Thus, it should be invoked after acquiring the // lock and before modifying the state of any queue. -func (qs *queueSet) syncTimeLocked() { +func (qs *queueSet) syncTimeLocked(ctx context.Context) { realNow := qs.clock.Now() timeSinceLast := realNow.Sub(qs.lastRealTime) qs.lastRealTime = realNow prevR := qs.currentR - qs.currentR += SeatsTimesDuration(qs.getVirtualTimeRatioLocked(), timeSinceLast) - if qs.currentR < prevR { - klog.ErrorS(errors.New("progress meter wrapped around"), "Wrap", "QS", qs.qCfg.Name, "prevR", prevR, "currentR", qs.currentR) + incrR := SeatsTimesDuration(qs.getVirtualTimeRatioLocked(), timeSinceLast) + qs.currentR = prevR + incrR + switch { + case prevR > qs.currentR: + klog.ErrorS(errors.New("queueset::currentR overflow"), "Overflow", "QS", qs.qCfg.Name, "when", realNow.Format(nsTimeFmt), "prevR", prevR, "incrR", incrR, "currentR", qs.currentR) + case qs.currentR >= highR: + qs.advanceEpoch(ctx, realNow, incrR) } metrics.SetCurrentR(qs.qCfg.Name, qs.currentR.ToFloat()) } +// rDecrement is the amount by which the progress meter R is wound backwards +// when needed to avoid overflow. +const rDecrement = MaxSeatSeconds / 2 + +// highR is the threshold that triggers advance of the epoch. +// That is, decrementing the global progress meter R by rDecrement. +const highR = rDecrement + rDecrement/2 + +// advanceEpoch subtracts rDecrement from the global progress meter R +// and all the readings that have been taked from that meter. +// The now and incrR parameters are only used to add info to the log messages. +func (qs *queueSet) advanceEpoch(ctx context.Context, now time.Time, incrR SeatSeconds) { + oldR := qs.currentR + qs.currentR -= rDecrement + klog.InfoS("Advancing epoch", "QS", qs.qCfg.Name, "when", now.Format(nsTimeFmt), "oldR", oldR, "newR", qs.currentR, "incrR", incrR) + success := true + for qIdx, queue := range qs.queues { + if queue.requests.Length() == 0 && queue.requestsExecuting == 0 { + // Do not just decrement, the value could be quite outdated. + // It is safe to reset to zero in this case, because the next request + // will overwrite the zero with `qs.currentR`. + queue.nextDispatchR = 0 + continue + } + oldNextDispatchR := queue.nextDispatchR + queue.nextDispatchR -= rDecrement + if queue.nextDispatchR > oldNextDispatchR { + klog.ErrorS(errors.New("queue::nextDispatchR underflow"), "Underflow", "QS", qs.qCfg.Name, "queue", qIdx, "oldNextDispatchR", oldNextDispatchR, "newNextDispatchR", queue.nextDispatchR, "incrR", incrR) + success = false + } + queue.requests.Walk(func(req *request) bool { + oldArrivalR := req.arrivalR + req.arrivalR -= rDecrement + if req.arrivalR > oldArrivalR { + klog.ErrorS(errors.New("request::arrivalR underflow"), "Underflow", "QS", qs.qCfg.Name, "queue", qIdx, "request", *req, "oldArrivalR", oldArrivalR, "incrR", incrR) + success = false + } + return true + }) + } + metrics.AddEpochAdvance(ctx, qs.qCfg.Name, success) +} + // getVirtualTimeRatio calculates the rate at which virtual time has // been advancing, according to the logic in `doc.go`. func (qs *queueSet) getVirtualTimeRatioLocked() float64 { @@ -774,7 +821,7 @@ func ssMax(a, b SeatSeconds) SeatSeconds { // once a request finishes execution or is canceled. This returns a bool // indicating whether the QueueSet is now idle. func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool { - qs.lockAndSyncTime() + qs.lockAndSyncTime(req.ctx) defer qs.lock.Unlock() qs.finishRequestLocked(req) @@ -835,7 +882,7 @@ func (qs *queueSet) finishRequestLocked(r *request) { // AdditionalLatency elapses, this ensures that the additional // latency has no impact on the user experience. qs.clock.EventAfterDuration(func(_ time.Time) { - qs.lockAndSyncTime() + qs.lockAndSyncTime(r.ctx) defer qs.lock.Unlock() now := qs.clock.Now() releaseSeatsLocked() diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 7a514a1fa8b..80e9afebc29 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -179,6 +179,7 @@ type uniformScenario struct { clk *testeventclock.Fake counter counter.GoRoutineCounter padConstrains bool + expectedEpochAdvances int } func (us uniformScenario) exercise(t *testing.T) { @@ -405,6 +406,18 @@ func (uss *uniformScenarioState) finalReview() { uss.t.Log("Success with" + e) } } + e := "" + if uss.expectedEpochAdvances > 0 { + e = fmt.Sprintf(` # HELP apiserver_flowcontrol_epoch_advance_total [ALPHA] Number of times the queueset's progress meter jumped backward + # TYPE apiserver_flowcontrol_epoch_advance_total counter + apiserver_flowcontrol_epoch_advance_total{priority_level=%q,success=%q} %d%s`, uss.name, "true", uss.expectedEpochAdvances, "\n") + } + err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_epoch_advance_total") + if err != nil { + uss.t.Error(err) + } else { + uss.t.Logf("Success with apiserver_flowcontrol_epoch_advance_total = %d", uss.expectedEpochAdvances) + } } func TestMain(m *testing.M) { @@ -647,7 +660,7 @@ func TestDifferentFlowsExpectEqual(t *testing.T) { }.exercise(t) } -// TestSeatSecondsRollover demonstrates that SeatSeconds overflow can cause bad stuff to happen. +// TestSeatSecondsRollover checks that there is not a problem with SeatSecons overflow. func TestSeatSecondsRollover(t *testing.T) { metrics.Register() now := time.Now() @@ -677,13 +690,14 @@ func TestSeatSecondsRollover(t *testing.T) { }, concurrencyLimit: 2000, evalDuration: Quarter * 40, - expectedFair: []bool{false}, + expectedFair: []bool{true}, expectedFairnessMargin: []float64{0.01}, expectAllRequests: true, evalInqueueMetrics: true, evalExecutingMetrics: true, clk: clk, counter: counter, + expectedEpochAdvances: 8, }.exercise(t) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/seatsecs_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/seatsecs_test.go index b0a98b2570b..0aca94bfa41 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/seatsecs_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/seatsecs_test.go @@ -17,29 +17,33 @@ limitations under the License. package queueset import ( - "fmt" "math" "testing" "time" ) +// TestSeatSecondsString exercises the SeatSeconds constructor and de-constructors (String, ToFloat). func TestSeatSecondsString(t *testing.T) { - digits := math.Log10(ssScale) - expectFmt := fmt.Sprintf("%%%d.%dfss", int(digits+2), int(digits)) testCases := []struct { - ss SeatSeconds - expect string + ss SeatSeconds + expectFloat float64 + expectStr string }{ - {ss: SeatSeconds(1), expect: fmt.Sprintf(expectFmt, 1.0/ssScale)}, - {ss: 0, expect: "0.00000000ss"}, - {ss: SeatsTimesDuration(1, time.Second), expect: "1.00000000ss"}, - {ss: SeatsTimesDuration(123, 100*time.Millisecond), expect: "12.30000000ss"}, - {ss: SeatsTimesDuration(1203, 10*time.Millisecond), expect: "12.03000000ss"}, + {ss: SeatSeconds(1), expectFloat: 1.0 / ssScale, expectStr: "0.00000001ss"}, + {ss: SeatSeconds(ssScale - 1), expectFloat: (ssScale - 1) / ssScale, expectStr: "0.99999999ss"}, + {ss: 0, expectFloat: 0, expectStr: "0.00000000ss"}, + {ss: SeatsTimesDuration(1, time.Second), expectFloat: 1, expectStr: "1.00000000ss"}, + {ss: SeatsTimesDuration(123, 100*time.Millisecond), expectFloat: 12.3, expectStr: "12.30000000ss"}, + {ss: SeatsTimesDuration(1203, 10*time.Millisecond), expectFloat: 12.03, expectStr: "12.03000000ss"}, } for _, testCase := range testCases { actualStr := testCase.ss.String() - if actualStr != testCase.expect { - t.Errorf("SeatSeonds(%d) formatted as %q rather than expected %q", uint64(testCase.ss), actualStr, testCase.expect) + if actualStr != testCase.expectStr { + t.Errorf("SeatSeconds(%d).String() is %q but expected %q", uint64(testCase.ss), actualStr, testCase.expectStr) + } + actualFloat := testCase.ss.ToFloat() + if math.Round(actualFloat*ssScale) != math.Round(testCase.expectFloat*ssScale) { + t.Errorf("SeatSeconds(%d).ToFloat() is %v but expected %v", uint64(testCase.ss), actualFloat, testCase.expectFloat) } } } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index 2a36040566e..4fce5e280b5 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -62,6 +62,7 @@ type request struct { arrivalTime time.Time // arrivalR is R(arrivalTime). R is, confusingly, also called "virtual time". + // This field is meaningful only while the request is waiting in the virtual world. arrivalR SeatSeconds // descr1 and descr2 are not used in any logic but they appear in diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go index cbff5485647..365b8093194 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go @@ -18,6 +18,7 @@ package metrics import ( "context" + "strconv" "strings" "sync" "time" @@ -103,7 +104,6 @@ var ( }, []string{priorityLevel, flowSchema}, ) - // PriorityLevelConcurrencyObserverPairGenerator creates pairs that observe concurrency for priority levels PriorityLevelConcurrencyObserverPairGenerator = NewSampleAndWaterMarkHistogramsPairGenerator(clock.RealClock{}, time.Millisecond, &compbasemetrics.HistogramOpts{ @@ -122,8 +122,8 @@ var ( Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1}, StabilityLevel: compbasemetrics.ALPHA, }, - []string{priorityLevel}) - + []string{priorityLevel}, + ) // ReadWriteConcurrencyObserverPairGenerator creates pairs that observe concurrency broken down by mutating vs readonly ReadWriteConcurrencyObserverPairGenerator = NewSampleAndWaterMarkHistogramsPairGenerator(clock.RealClock{}, time.Millisecond, &compbasemetrics.HistogramOpts{ @@ -142,8 +142,8 @@ var ( Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1}, StabilityLevel: compbasemetrics.ALPHA, }, - []string{requestKind}) - + []string{requestKind}, + ) apiserverCurrentR = compbasemetrics.NewGaugeVec( &compbasemetrics.GaugeOpts{ Namespace: namespace, @@ -154,7 +154,6 @@ var ( }, []string{priorityLevel}, ) - apiserverDispatchR = compbasemetrics.NewGaugeVec( &compbasemetrics.GaugeOpts{ Namespace: namespace, @@ -165,7 +164,6 @@ var ( }, []string{priorityLevel}, ) - apiserverLatestS = compbasemetrics.NewGaugeVec( &compbasemetrics.GaugeOpts{ Namespace: namespace, @@ -176,7 +174,6 @@ var ( }, []string{priorityLevel}, ) - apiserverNextSBounds = compbasemetrics.NewGaugeVec( &compbasemetrics.GaugeOpts{ Namespace: namespace, @@ -187,7 +184,6 @@ var ( }, []string{priorityLevel, "bound"}, ) - apiserverNextDiscountedSBounds = compbasemetrics.NewGaugeVec( &compbasemetrics.GaugeOpts{ Namespace: namespace, @@ -198,7 +194,6 @@ var ( }, []string{priorityLevel, "bound"}, ) - apiserverCurrentInqueueRequests = compbasemetrics.NewGaugeVec( &compbasemetrics.GaugeOpts{ Namespace: namespace, @@ -272,6 +267,17 @@ var ( }, []string{priorityLevel, flowSchema}, ) + apiserverEpochAdvances = compbasemetrics.NewCounterVec( + &compbasemetrics.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "epoch_advance_total", + Help: "Number of times the queueset's progress meter jumped backward", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{priorityLevel, "success"}, + ) + metrics = Registerables{ apiserverRejectedRequestsTotal, apiserverDispatchedRequestsTotal, @@ -287,6 +293,7 @@ var ( apiserverCurrentExecutingRequests, apiserverRequestWaitingSeconds, apiserverRequestExecutionSeconds, + apiserverEpochAdvances, }. Append(PriorityLevelConcurrencyObserverPairGenerator.metrics()...). Append(ReadWriteConcurrencyObserverPairGenerator.metrics()...) @@ -352,3 +359,7 @@ func ObserveWaitingDuration(ctx context.Context, priorityLevel, flowSchema, exec func ObserveExecutionDuration(ctx context.Context, priorityLevel, flowSchema string, executionTime time.Duration) { apiserverRequestExecutionSeconds.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema).Observe(executionTime.Seconds()) } + +func AddEpochAdvance(ctx context.Context, priorityLevel string, success bool) { + apiserverEpochAdvances.WithContext(ctx).WithLabelValues(priorityLevel, strconv.FormatBool(success)).Inc() +}