From 9b684579e230f105bcaa743f06bc07c39af703df Mon Sep 17 00:00:00 2001 From: Mike Spreitzer Date: Thu, 20 Oct 2022 15:21:09 -0400 Subject: [PATCH] Add instrumentation for seat borrowing --- .../pkg/util/flowcontrol/apf_controller.go | 98 ++++- .../pkg/util/flowcontrol/controller_test.go | 2 +- .../pkg/util/flowcontrol/debug/dump.go | 9 +- .../flowcontrol/fairqueuing/integrator.go | 6 +- .../fairqueuing/integrator_test.go | 2 +- .../util/flowcontrol/fairqueuing/interface.go | 4 +- .../fairqueuing/queueset/queueset.go | 55 ++- .../fairqueuing/queueset/queueset_test.go | 376 +++++++++++------- .../fairqueuing/testing/no-restraint.go | 2 +- .../pkg/util/flowcontrol/gen_test.go | 5 +- 10 files changed, 362 insertions(+), 197 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index 3cf805efb6a..aaea2af7a1d 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -71,21 +71,16 @@ const ( // Borrowing among priority levels will be accomplished by periodically // adjusting the current concurrency limits (CurrentCLs); // borrowingAdjustmentPeriod is that period. - borrowingAdjustmentPeriod = 10 * time.Second //nolint:golint,unused + borrowingAdjustmentPeriod = 10 * time.Second - // The input to the borrowing is smoothed seat demand figures. - // Every adjustment period, each priority level's smoothed demand is adjusted - // based on an envelope of that level's recent seat demand. The formula is: - // SmoothSeatDemand := max( EnvelopeSeatDemand, - // seatDemandSmoothingCoefficient * SmoothSeatDemand + - // (1-seatDemandSmoothingCoefficient) * EnvelopeSeatDemand ). - // Qualitatively: this parameter controls the rate at which the smoothed seat demand drifts - // down toward the envelope of seat demand while that is lower. - // The particular number appearing here has the property that half of the - // current smoothed value comes from the smoothed value of 5 minutes ago. + // The input to the seat borrowing is smoothed seat demand figures. + // This constant controls the decay rate of that smoothing, + // as described in the comment on the `seatDemandStats` field of `priorityLevelState`. + // The particular number appearing here has the property that half-life + // of that decay is 5 minutes. // This is a very preliminary guess at a good value and is likely to be tweaked // once we get some experience with borrowing. - seatDemandSmoothingCoefficient = 0.977 //nolint:golint,unused + seatDemandSmoothingCoefficient = 0.977 ) // The funcs in this package follow the naming convention that the suffix @@ -217,6 +212,43 @@ type priorityLevelState struct { // Observer of number of seats occupied throughout execution execSeatsObs metrics.RatioedGauge + + // Integrator of seat demand, reset every CurrentCL adjustment period + seatDemandIntegrator fq.Integrator + + // seatDemandStats is derived from periodically examining the seatDemandIntegrator. + // The average, standard deviation, and high watermark come directly from the integrator. + // envelope = avg + stdDev. + // Periodically smoothed gets replaced with `max(envelope, A*smoothed + (1-A)*envelope)`, + // where A is seatDemandSmoothingCoefficient. + seatDemandStats seatDemandStats +} + +type seatDemandStats struct { + avg float64 + stdDev float64 + highWatermark float64 + envelope float64 + smoothed float64 +} + +func newSeatDemandStats(val float64) seatDemandStats { + return seatDemandStats{ + avg: val, + stdDev: 0, + highWatermark: val, + envelope: val, + smoothed: val, + } +} + +func (stats *seatDemandStats) update(obs fq.IntegratorResults) { + stats.avg = obs.Average + stats.stdDev = obs.Deviation + stats.highWatermark = obs.Max + envelope := obs.Average + obs.Deviation + stats.envelope = envelope + stats.smoothed = math.Max(envelope, seatDemandSmoothingCoefficient*stats.smoothed+(1-seatDemandSmoothingCoefficient)*envelope) } // NewTestableController is extra flexible to facilitate testing @@ -325,11 +357,25 @@ func (cfgCtlr *configController) Run(stopCh <-chan struct{}) error { klog.Info("Running API Priority and Fairness config worker") go wait.Until(cfgCtlr.runWorker, time.Second, stopCh) + klog.Info("Running API Priority and Fairness periodic rebalancing process") + go wait.Until(cfgCtlr.updateBorrowing, borrowingAdjustmentPeriod, stopCh) + <-stopCh klog.Info("Shutting down API Priority and Fairness config worker") return nil } +func (cfgCtlr *configController) updateBorrowing() { + cfgCtlr.lock.Lock() + defer cfgCtlr.lock.Unlock() + for _, plState := range cfgCtlr.priorityLevelStates { + obs := plState.seatDemandIntegrator.Reset() + plState.seatDemandStats.update(obs) + // TODO: set borrowing metrics introduced in https://github.com/kubernetes/enhancements/pull/3391 + // TODO: updathe CurrentCL as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/1040-priority-and-fairness#dispatching + } +} + // runWorker is the logic of the one and only worker goroutine. We // limit the number to one in order to obviate explicit // synchronization around access to `cfgCtlr.mostRecentUpdates`. @@ -552,9 +598,13 @@ func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfi state := meal.cfgCtlr.priorityLevelStates[pl.Name] if state == nil { labelValues := []string{pl.Name} - state = &priorityLevelState{reqsGaugePair: metrics.RatioedGaugeVecPhasedElementPair(meal.cfgCtlr.reqsGaugeVec, 1, 1, labelValues), execSeatsObs: meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues)} + state = &priorityLevelState{ + reqsGaugePair: metrics.RatioedGaugeVecPhasedElementPair(meal.cfgCtlr.reqsGaugeVec, 1, 1, labelValues), + execSeatsObs: meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues), + seatDemandIntegrator: fq.NewNamedIntegrator(meal.cfgCtlr.clock, pl.Name), + } } - qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.reqsGaugePair, state.execSeatsObs) + qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.reqsGaugePair, state.execSeatsObs, state.seatDemandIntegrator) if err != nil { klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err) continue @@ -658,7 +708,7 @@ func (meal *cfgMeal) processOldPLsLocked() { } } var err error - plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs) + plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs, plState.seatDemandIntegrator) if err != nil { // This can not happen because queueSetCompleterForPL already approved this config panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec))) @@ -702,6 +752,8 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() { if plState.queues == nil { klog.V(5).Infof("Introducing queues for priority level %q: config=%s, concurrencyLimit=%d, quiescing=%v (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, plState.quiescing, plState.pl.Spec.Limited.NominalConcurrencyShares, meal.shareSum) + plState.seatDemandStats = newSeatDemandStats(float64(concurrencyLimit)) + // TODO: initialize as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/1040-priority-and-fairness#dispatching once NominalCL values are implemented } else { klog.V(5).Infof("Retaining queues for priority level %q: config=%s, concurrencyLimit=%d, quiescing=%v, numPending=%d (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, plState.quiescing, plState.numPending, plState.pl.Spec.Limited.NominalConcurrencyShares, meal.shareSum) } @@ -713,7 +765,7 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() { // given priority level configuration. Returns nil if that config // does not call for limiting. Returns nil and an error if the given // object is malformed in a way that is a problem for this package. -func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge) (fq.QueueSetCompleter, error) { +func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge, seatDemandIntgrator fq.Integrator) (fq.QueueSetCompleter, error) { if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) { return nil, errors.New("broken union structure at the top") } @@ -742,7 +794,7 @@ func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flow if queues != nil { qsc, err = queues.BeginConfigChange(qcQS) } else { - qsc, err = qsf.BeginConstruction(qcQS, reqsIntPair, execSeatsObs) + qsc, err = qsf.BeginConstruction(qcQS, reqsIntPair, execSeatsObs, seatDemandIntgrator) } if err != nil { err = fmt.Errorf("priority level %q has QueuingConfiguration %#+v, which is invalid: %w", pl.Name, qcAPI, err) @@ -790,17 +842,19 @@ func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, re labelValues := []string{proto.Name} reqsGaugePair := metrics.RatioedGaugeVecPhasedElementPair(meal.cfgCtlr.reqsGaugeVec, 1, 1, labelValues) execSeatsObs := meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues) - qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, reqsGaugePair, execSeatsObs) + seatDemandIntegrator := fq.NewNamedIntegrator(meal.cfgCtlr.clock, proto.Name) + qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, reqsGaugePair, execSeatsObs, seatDemandIntegrator) if err != nil { // This can not happen because proto is one of the mandatory // objects and these are not erroneous panic(err) } meal.newPLStates[proto.Name] = &priorityLevelState{ - pl: proto, - qsCompleter: qsCompleter, - reqsGaugePair: reqsGaugePair, - execSeatsObs: execSeatsObs, + pl: proto, + qsCompleter: qsCompleter, + reqsGaugePair: reqsGaugePair, + execSeatsObs: execSeatsObs, + seatDemandIntegrator: seatDemandIntegrator, } if proto.Spec.Limited != nil { meal.shareSum += float64(proto.Spec.Limited.NominalConcurrencyShares) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go index a2601e0dfac..5bae8bbfce0 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go @@ -105,7 +105,7 @@ type ctlrTestRequest struct { descr1, descr2 interface{} } -func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.RatioedGaugePair, eso metrics.RatioedGauge) (fq.QueueSetCompleter, error) { +func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.RatioedGaugePair, eso metrics.RatioedGauge, sdi fq.Integrator) (fq.QueueSetCompleter, error) { return ctlrTestQueueSetCompleter{cts, nil, qc}, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug/dump.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug/dump.go index b3cdb17bb95..439d48c45ab 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug/dump.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug/dump.go @@ -25,10 +25,11 @@ import ( // QueueSetDump is an instant dump of queue-set. type QueueSetDump struct { - Queues []QueueDump - Waiting int - Executing int - SeatsInUse int + Queues []QueueDump + Waiting int + Executing int + SeatsInUse int + SeatsWaiting int } // QueueDump is an instant dump of one queue in a queue-set. diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator.go index 07d6f3d376a..ce310137b83 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator.go @@ -53,6 +53,7 @@ func (x *IntegratorResults) Equal(y *IntegratorResults) bool { } type integrator struct { + name string clock clock.PassiveClock sync.Mutex lastTime time.Time @@ -61,9 +62,10 @@ type integrator struct { min, max float64 } -// NewIntegrator makes one that uses the given clock -func NewIntegrator(clock clock.PassiveClock) Integrator { +// NewNamedIntegrator makes one that uses the given clock and name +func NewNamedIntegrator(clock clock.PassiveClock, name string) Integrator { return &integrator{ + name: name, clock: clock, lastTime: clock.Now(), } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator_test.go index e377ea2da52..948518f60ea 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator_test.go @@ -27,7 +27,7 @@ import ( func TestIntegrator(t *testing.T) { now := time.Now() clk := testclock.NewFakeClock(now) - igr := NewIntegrator(clk) + igr := NewNamedIntegrator(clk, "testee") igr.Add(3) clk.Step(time.Second) results := igr.GetResults() diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go index 4a6d91fa91a..ff63c5e243b 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go @@ -35,7 +35,9 @@ type QueueSetFactory interface { // The RatioedGaugePair observes number of requests, // execution covering just the regular phase. // The RatioedGauge observes number of seats occupied through all phases of execution. - BeginConstruction(QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge) (QueueSetCompleter, error) + // The Integrator observes the seat demand (executing + queued seats), and + // the queueset does not read or reset the Integrator. + BeginConstruction(QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge, Integrator) (QueueSetCompleter, error) } // QueueSetCompleter finishes the two-step process of creating or diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 684e76da64e..860e09cf8cc 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -60,12 +60,13 @@ type promiseFactoryFactory func(*queueSet) promiseFactory // `*queueSetCompleter` implements QueueSetCompleter. Exactly one of // the fields `factory` and `theSet` is non-nil. type queueSetCompleter struct { - factory *queueSetFactory - reqsGaugePair metrics.RatioedGaugePair - execSeatsGauge metrics.RatioedGauge - theSet *queueSet - qCfg fq.QueuingConfig - dealer *shufflesharding.Dealer + factory *queueSetFactory + reqsGaugePair metrics.RatioedGaugePair + execSeatsGauge metrics.RatioedGauge + seatDemandIntegrator fq.Integrator + theSet *queueSet + qCfg fq.QueuingConfig + dealer *shufflesharding.Dealer } // queueSet implements the Fair Queuing for Server Requests technique @@ -93,6 +94,8 @@ type queueSet struct { execSeatsGauge metrics.RatioedGauge // for all phases of execution + seatDemandIntegrator fq.Integrator + promiseFactory promiseFactory lock sync.Mutex @@ -139,6 +142,10 @@ type queueSet struct { // request(s) that are currently executing in this queueset. totSeatsInUse int + // totSeatsWaiting is the sum, over all the waiting requests, of their + // max width. + totSeatsWaiting int + // enqueues is the number of requests that have ever been enqueued enqueues int } @@ -156,17 +163,18 @@ func newTestableQueueSetFactory(c eventclock.Interface, promiseFactoryFactory pr } } -func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsGaugePair metrics.RatioedGaugePair, execSeatsGauge metrics.RatioedGauge) (fq.QueueSetCompleter, error) { +func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsGaugePair metrics.RatioedGaugePair, execSeatsGauge metrics.RatioedGauge, seatDemandIntegrator fq.Integrator) (fq.QueueSetCompleter, error) { dealer, err := checkConfig(qCfg) if err != nil { return nil, err } return &queueSetCompleter{ - factory: qsf, - reqsGaugePair: reqsGaugePair, - execSeatsGauge: execSeatsGauge, - qCfg: qCfg, - dealer: dealer}, nil + factory: qsf, + reqsGaugePair: reqsGaugePair, + execSeatsGauge: execSeatsGauge, + seatDemandIntegrator: seatDemandIntegrator, + qCfg: qCfg, + dealer: dealer}, nil } // checkConfig returns a non-nil Dealer if the config is valid and @@ -191,6 +199,7 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet { estimatedServiceDuration: 3 * time.Millisecond, reqsGaugePair: qsc.reqsGaugePair, execSeatsGauge: qsc.execSeatsGauge, + seatDemandIntegrator: qsc.seatDemandIntegrator, qCfg: qsc.qCfg, currentR: 0, lastRealTime: qsc.factory.clock.Now(), @@ -408,10 +417,12 @@ func (req *request) wait() (bool, bool) { if req.removeFromQueueLocked() != nil { defer qs.boundNextDispatchLocked(queue) qs.totRequestsWaiting-- + qs.totSeatsWaiting -= req.MaxSeats() metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "cancelled") metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) req.NoteQueued(false) qs.reqsGaugePair.RequestsWaiting.Add(-1) + qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting)) } return false, qs.isIdleLocked() } @@ -597,6 +608,7 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac // past the requestWaitLimit func (qs *queueSet) removeTimedOutRequestsFromQueueToBoundLocked(queue *queue, fsName string) { timeoutCount := 0 + disqueueSeats := 0 now := qs.clock.Now() reqs := queue.requests // reqs are sorted oldest -> newest @@ -609,6 +621,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueToBoundLocked(queue *queue, f if arrivalLimit.After(req.arrivalTime) { if req.decision.Set(decisionReject) && req.removeFromQueueLocked() != nil { timeoutCount++ + disqueueSeats += req.MaxSeats() req.NoteQueued(false) metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) } @@ -622,7 +635,9 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueToBoundLocked(queue *queue, f // remove timed out requests from queue if timeoutCount > 0 { qs.totRequestsWaiting -= timeoutCount + qs.totSeatsWaiting -= disqueueSeats qs.reqsGaugePair.RequestsWaiting.Add(float64(-timeoutCount)) + qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting)) } } @@ -657,9 +672,11 @@ func (qs *queueSet) enqueueToBoundLocked(request *request) { } request.removeFromQueueLocked = queue.requests.Enqueue(request) qs.totRequestsWaiting++ + qs.totSeatsWaiting += request.MaxSeats() metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1) request.NoteQueued(true) qs.reqsGaugePair.RequestsWaiting.Add(1) + qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting)) } // dispatchAsMuchAsPossibleLocked does as many dispatches as possible now. @@ -690,6 +707,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.MaxSeats()) qs.reqsGaugePair.RequestsExecuting.Add(1) qs.execSeatsGauge.Add(float64(req.MaxSeats())) + qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting)) klogV := klog.V(5) if klogV.Enabled() { klogV.Infof("QS(%s) at t=%s R=%v: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, fsName, descr1, descr2, qs.totRequestsExecuting) @@ -711,11 +729,13 @@ func (qs *queueSet) dispatchLocked() bool { return false } qs.totRequestsWaiting-- + qs.totSeatsWaiting -= request.MaxSeats() metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1) request.NoteQueued(false) qs.reqsGaugePair.RequestsWaiting.Add(-1) defer qs.boundNextDispatchLocked(queue) if !request.decision.Set(decisionExecute) { + qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting)) return true } request.startTime = qs.clock.Now() @@ -732,6 +752,7 @@ func (qs *queueSet) dispatchLocked() bool { metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, request.fsName, request.MaxSeats()) qs.reqsGaugePair.RequestsExecuting.Add(1) qs.execSeatsGauge.Add(float64(request.MaxSeats())) + qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting)) klogV := klog.V(6) if klogV.Enabled() { klogV.Infof("QS(%s) at t=%s R=%v: dispatching request %#+v %#+v work %v from queue %d with start R %v, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied", @@ -894,6 +915,7 @@ func (qs *queueSet) finishRequestLocked(r *request) { qs.totSeatsInUse -= r.MaxSeats() metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.MaxSeats()) qs.execSeatsGauge.Add(-float64(r.MaxSeats())) + qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting)) if r.queue != nil { r.queue.seatsInUse -= r.MaxSeats() } @@ -1011,10 +1033,11 @@ func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump { qs.lock.Lock() defer qs.lock.Unlock() d := debug.QueueSetDump{ - Queues: make([]debug.QueueDump, len(qs.queues)), - Waiting: qs.totRequestsWaiting, - Executing: qs.totRequestsExecuting, - SeatsInUse: qs.totSeatsInUse, + Queues: make([]debug.QueueDump, len(qs.queues)), + Waiting: qs.totRequestsWaiting, + Executing: qs.totRequestsExecuting, + SeatsInUse: qs.totSeatsInUse, + SeatsWaiting: qs.totSeatsWaiting, } for i, q := range qs.queues { d.Queues[i] = q.dumpLocked(includeRequestDetails) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 26ba9fb7f38..3a9f81ac635 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -182,16 +182,18 @@ type uniformScenario struct { counter counter.GoRoutineCounter expectedAverages []float64 expectedEpochAdvances int + seatDemandIntegratorSubject fq.Integrator } func (us uniformScenario) exercise(t *testing.T) { uss := uniformScenarioState{ - t: t, - uniformScenario: us, - startTime: us.clk.Now(), - integrators: make([]fq.Integrator, len(us.clients)), - executions: make([]int32, len(us.clients)), - rejects: make([]int32, len(us.clients)), + t: t, + uniformScenario: us, + startTime: us.clk.Now(), + execSeatsIntegrators: make([]fq.Integrator, len(us.clients)), + seatDemandIntegratorCheck: fq.NewNamedIntegrator(us.clk, us.name+"-seatDemandCheck"), + executions: make([]int32, len(us.clients)), + rejects: make([]int32, len(us.clients)), } for _, uc := range us.clients { uss.doSplit = uss.doSplit || uc.split @@ -204,7 +206,8 @@ type uniformScenarioState struct { uniformScenario startTime time.Time doSplit bool - integrators []fq.Integrator + execSeatsIntegrators []fq.Integrator + seatDemandIntegratorCheck fq.Integrator failedCount uint64 expectedInqueue, expectedExecuting, expectedConcurrencyInUse string executions, rejects []int32 @@ -216,18 +219,18 @@ func (uss *uniformScenarioState) exercise() { metrics.Reset() } for i, uc := range uss.clients { - uss.integrators[i] = fq.NewIntegrator(uss.clk) + uss.execSeatsIntegrators[i] = fq.NewNamedIntegrator(uss.clk, fmt.Sprintf("%s client %d execSeats", uss.name, i)) fsName := fmt.Sprintf("client%d", i) uss.expectedInqueue = uss.expectedInqueue + fmt.Sprintf(` apiserver_flowcontrol_current_inqueue_requests{flow_schema=%q,priority_level=%q} 0%s`, fsName, uss.name, "\n") for j := 0; j < uc.nThreads; j++ { ust := uniformScenarioThread{ - uss: uss, - i: i, - j: j, - nCalls: uc.nCalls, - uc: uc, - igr: uss.integrators[i], - fsName: fsName, + uss: uss, + i: i, + j: j, + nCalls: uc.nCalls, + uc: uc, + execSeatsIntegrator: uss.execSeatsIntegrators[i], + fsName: fsName, } ust.start() } @@ -241,12 +244,12 @@ func (uss *uniformScenarioState) exercise() { } type uniformScenarioThread struct { - uss *uniformScenarioState - i, j int - nCalls int - uc uniformClient - igr fq.Integrator - fsName string + uss *uniformScenarioState + i, j int + nCalls int + uc uniformClient + execSeatsIntegrator fq.Integrator + fsName string } func (ust *uniformScenarioThread) start() { @@ -269,11 +272,15 @@ func (ust *uniformScenarioThread) callK(k int) { if k >= ust.nCalls { return } + maxWidth := float64(uint64max(ust.uc.initialSeats, ust.uc.finalSeats)) + ust.uss.seatDemandIntegratorCheck.Add(maxWidth) + returnSeatDemand := func(time.Time) { ust.uss.seatDemandIntegratorCheck.Add(-maxWidth) } req, idle := ust.uss.qs.StartRequest(context.Background(), &fcrequest.WorkEstimate{InitialSeats: ust.uc.initialSeats, FinalSeats: ust.uc.finalSeats, AdditionalLatency: ust.uc.padDuration}, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil) ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle) if req == nil { atomic.AddUint64(&ust.uss.failedCount, 1) atomic.AddInt32(&ust.uss.rejects[ust.i], 1) + returnSeatDemand(ust.uss.clk.Now()) return } if idle { @@ -285,11 +292,12 @@ func (ust *uniformScenarioThread) callK(k int) { executed = true execStart := ust.uss.clk.Now() atomic.AddInt32(&ust.uss.executions[ust.i], 1) - ust.igr.Add(float64(ust.uc.initialSeats)) + ust.execSeatsIntegrator.Add(float64(ust.uc.initialSeats)) ust.uss.t.Logf("%s: %d, %d, %d executing; width1=%d", execStart.Format(nsTimeFmt), ust.i, ust.j, k, ust.uc.initialSeats) ust.uss.clk.EventAfterDuration(ust.genCallK(k+1), ust.uc.execDuration+ust.uc.thinkDuration) ust.uss.clk.Sleep(ust.uc.execDuration) - ust.igr.Add(-float64(ust.uc.initialSeats)) + ust.execSeatsIntegrator.Add(-float64(ust.uc.initialSeats)) + ust.uss.clk.EventAfterDuration(returnSeatDemand, ust.uc.padDuration) returnTime = ust.uss.clk.Now() }) now := ust.uss.clk.Now() @@ -297,6 +305,7 @@ func (ust *uniformScenarioThread) callK(k int) { if !executed { atomic.AddUint64(&ust.uss.failedCount, 1) atomic.AddInt32(&ust.uss.rejects[ust.i], 1) + returnSeatDemand(ust.uss.clk.Now()) } else if now != returnTime { ust.uss.t.Errorf("%s: %d, %d, %d returnTime=%s", now.Format(nsTimeFmt), ust.i, ust.j, k, returnTime.Format(nsTimeFmt)) } @@ -319,7 +328,7 @@ func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool, ma } sep := uc.thinkDuration demands[i] = float64(nThreads) * float64(uc.initialSeats) * float64(uc.execDuration) / float64(sep+uc.execDuration) - averages[i] = uss.integrators[i].Reset().Average + averages[i] = uss.execSeatsIntegrators[i].Reset().Average } fairAverages := uss.expectedAverages if fairAverages == nil { @@ -341,6 +350,25 @@ func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool, ma uss.t.Logf("%s client %d last=%v expectFair=%v margin=%v got an Average of %v and the expected average was %v", uss.name, i, last, expectFair, margin, averages[i], expectedAverage) } } + if uss.seatDemandIntegratorSubject != nil { + checkResults := uss.seatDemandIntegratorCheck.GetResults() + subjectResults := uss.seatDemandIntegratorSubject.GetResults() + if float64close(subjectResults.Duration, checkResults.Duration) { + uss.t.Logf("%s last=%v got duration of %v and expected %v", uss.name, last, subjectResults.Duration, checkResults.Duration) + } else { + uss.t.Errorf("%s last=%v got duration of %v but expected %v", uss.name, last, subjectResults.Duration, checkResults.Duration) + } + if got, expected := float64NaNTo0(subjectResults.Average), float64NaNTo0(checkResults.Average); float64close(got, expected) { + uss.t.Logf("%s last=%v got SeatDemand average of %v and expected %v", uss.name, last, got, expected) + } else { + uss.t.Errorf("%s last=%v got SeatDemand average of %v but expected %v", uss.name, last, got, expected) + } + if got, expected := float64NaNTo0(subjectResults.Deviation), float64NaNTo0(checkResults.Deviation); float64close(got, expected) { + uss.t.Logf("%s last=%v got SeatDemand standard deviation of %v and expected %v", uss.name, last, got, expected) + } else { + uss.t.Errorf("%s last=%v got SeatDemand standard deviation of %v but expected %v", uss.name, last, got, expected) + } + } } func (uss *uniformScenarioState) finalReview() { @@ -445,7 +473,7 @@ func TestNoRestraint(t *testing.T) { t.Run(testCase.name, func(t *testing.T) { now := time.Now() clk, counter := testeventclock.NewFake(now, 0, nil) - nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newGaugePair(clk), newExecSeatsGauge(clk)) + nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newGaugePair(clk), newExecSeatsGauge(clk), fq.NewNamedIntegrator(clk, "TestNoRestraint")) if err != nil { t.Fatal(err) } @@ -481,7 +509,8 @@ func TestBaseline(t *testing.T) { HandSize: 3, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) + seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, "seatDemandSubject") + qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) if err != nil { t.Fatal(err) } @@ -492,15 +521,16 @@ func TestBaseline(t *testing.T) { clients: []uniformClient{ newUniformClient(1001001001, 1, 21, time.Second, 0), }, - concurrencyLimit: 1, - evalDuration: time.Second * 20, - expectedFair: []bool{true}, - expectedFairnessMargin: []float64{0}, - expectAllRequests: true, - evalInqueueMetrics: true, - evalExecutingMetrics: true, - clk: clk, - counter: counter, + concurrencyLimit: 1, + evalDuration: time.Second * 20, + expectedFair: []bool{true}, + expectedFairnessMargin: []float64{0}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, + seatDemandIntegratorSubject: seatDemandIntegratorSubject, }.exercise(t) } @@ -550,7 +580,8 @@ func TestSeparations(t *testing.T) { HandSize: 3, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) + seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, caseName+" seatDemandSubject") + qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) if err != nil { t.Fatal(err) } @@ -561,16 +592,17 @@ func TestSeparations(t *testing.T) { newUniformClient(1001001001, 1, 25, time.Second, seps.think).pad(seps.finalSeats, seps.pad), newUniformClient(2002002002, 1, 25, time.Second, seps.think).pad(seps.finalSeats, seps.pad), }[:seps.nClients], - concurrencyLimit: seps.conc, - evalDuration: time.Second * 24, // multiple of every period involved, so that margin can be 0 below - expectedFair: []bool{true}, - expectedFairnessMargin: []float64{0}, - expectAllRequests: true, - evalInqueueMetrics: true, - evalExecutingMetrics: true, - clk: clk, - counter: counter, - expectedAverages: seps.exp, + concurrencyLimit: seps.conc, + evalDuration: time.Second * 24, // multiple of every period involved, so that margin can be 0 below + expectedFair: []bool{true}, + expectedFairnessMargin: []float64{0}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, + expectedAverages: seps.exp, + seatDemandIntegratorSubject: seatDemandIntegratorSubject, }.exercise(t) }) } @@ -589,7 +621,8 @@ func TestUniformFlowsHandSize1(t *testing.T) { HandSize: 1, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) + seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, "seatDemandSubject") + qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) if err != nil { t.Fatal(err) } @@ -601,15 +634,16 @@ func TestUniformFlowsHandSize1(t *testing.T) { newUniformClient(1001001001, 8, 20, time.Second, time.Second-1), newUniformClient(2002002002, 8, 20, time.Second, time.Second-1), }, - concurrencyLimit: 4, - evalDuration: time.Second * 50, - expectedFair: []bool{true}, - expectedFairnessMargin: []float64{0.01}, - expectAllRequests: true, - evalInqueueMetrics: true, - evalExecutingMetrics: true, - clk: clk, - counter: counter, + concurrencyLimit: 4, + evalDuration: time.Second * 50, + expectedFair: []bool{true}, + expectedFairnessMargin: []float64{0.01}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, + seatDemandIntegratorSubject: seatDemandIntegratorSubject, }.exercise(t) } @@ -626,7 +660,8 @@ func TestUniformFlowsHandSize3(t *testing.T) { HandSize: 3, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) + seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) + qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) if err != nil { t.Fatal(err) } @@ -637,15 +672,16 @@ func TestUniformFlowsHandSize3(t *testing.T) { newUniformClient(400900100100, 8, 30, time.Second, time.Second-1), newUniformClient(300900200200, 8, 30, time.Second, time.Second-1), }, - concurrencyLimit: 4, - evalDuration: time.Second * 60, - expectedFair: []bool{true}, - expectedFairnessMargin: []float64{0.03}, - expectAllRequests: true, - evalInqueueMetrics: true, - evalExecutingMetrics: true, - clk: clk, - counter: counter, + concurrencyLimit: 4, + evalDuration: time.Second * 60, + expectedFair: []bool{true}, + expectedFairnessMargin: []float64{0.03}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, + seatDemandIntegratorSubject: seatDemandIntegratorSubject, }.exercise(t) } @@ -662,7 +698,8 @@ func TestDifferentFlowsExpectEqual(t *testing.T) { HandSize: 1, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) + seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) + qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) if err != nil { t.Fatal(err) } @@ -674,15 +711,16 @@ func TestDifferentFlowsExpectEqual(t *testing.T) { newUniformClient(1001001001, 8, 20, time.Second, time.Second), newUniformClient(2002002002, 7, 30, time.Second, time.Second/2), }, - concurrencyLimit: 4, - evalDuration: time.Second * 40, - expectedFair: []bool{true}, - expectedFairnessMargin: []float64{0.01}, - expectAllRequests: true, - evalInqueueMetrics: true, - evalExecutingMetrics: true, - clk: clk, - counter: counter, + concurrencyLimit: 4, + evalDuration: time.Second * 40, + expectedFair: []bool{true}, + expectedFairnessMargin: []float64{0.01}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, + seatDemandIntegratorSubject: seatDemandIntegratorSubject, }.exercise(t) } @@ -702,7 +740,8 @@ func TestSeatSecondsRollover(t *testing.T) { HandSize: 1, RequestWaitLimit: 40 * Quarter, } - qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) + seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) + qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) if err != nil { t.Fatal(err) } @@ -714,16 +753,17 @@ func TestSeatSecondsRollover(t *testing.T) { newUniformClient(1001001001, 8, 20, Quarter, Quarter).setInitWidth(500), newUniformClient(2002002002, 7, 30, Quarter, Quarter/2).setInitWidth(500), }, - concurrencyLimit: 2000, - evalDuration: Quarter * 40, - expectedFair: []bool{true}, - expectedFairnessMargin: []float64{0.01}, - expectAllRequests: true, - evalInqueueMetrics: true, - evalExecutingMetrics: true, - clk: clk, - counter: counter, - expectedEpochAdvances: 8, + concurrencyLimit: 2000, + evalDuration: Quarter * 40, + expectedFair: []bool{true}, + expectedFairnessMargin: []float64{0.01}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, + expectedEpochAdvances: 8, + seatDemandIntegratorSubject: seatDemandIntegratorSubject, }.exercise(t) } @@ -740,7 +780,8 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) { HandSize: 1, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) + seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) + qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) if err != nil { t.Fatal(err) } @@ -752,15 +793,16 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) { newUniformClient(1001001001, 4, 20, time.Second, time.Second-1), newUniformClient(2002002002, 2, 20, time.Second, time.Second-1), }, - concurrencyLimit: 3, - evalDuration: time.Second * 20, - expectedFair: []bool{true}, - expectedFairnessMargin: []float64{0.01}, - expectAllRequests: true, - evalInqueueMetrics: true, - evalExecutingMetrics: true, - clk: clk, - counter: counter, + concurrencyLimit: 3, + evalDuration: time.Second * 20, + expectedFair: []bool{true}, + expectedFairnessMargin: []float64{0.01}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, + seatDemandIntegratorSubject: seatDemandIntegratorSubject, }.exercise(t) } @@ -777,7 +819,8 @@ func TestDifferentWidths(t *testing.T) { HandSize: 7, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) + seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) + qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) if err != nil { t.Fatal(err) } @@ -788,15 +831,16 @@ func TestDifferentWidths(t *testing.T) { newUniformClient(10010010010010, 13, 10, time.Second, time.Second-1), newUniformClient(20020020020020, 7, 10, time.Second, time.Second-1).setInitWidth(2), }, - concurrencyLimit: 6, - evalDuration: time.Second * 20, - expectedFair: []bool{true}, - expectedFairnessMargin: []float64{0.15}, - expectAllRequests: true, - evalInqueueMetrics: true, - evalExecutingMetrics: true, - clk: clk, - counter: counter, + concurrencyLimit: 6, + evalDuration: time.Second * 20, + expectedFair: []bool{true}, + expectedFairnessMargin: []float64{0.15}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, + seatDemandIntegratorSubject: seatDemandIntegratorSubject, }.exercise(t) } @@ -813,7 +857,8 @@ func TestTooWide(t *testing.T) { HandSize: 7, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) + seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) + qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) if err != nil { t.Fatal(err) } @@ -827,15 +872,16 @@ func TestTooWide(t *testing.T) { newUniformClient(70070070070070, 15, 21, time.Second, time.Second-1).setInitWidth(2), newUniformClient(90090090090090, 15, 21, time.Second, time.Second-1).setInitWidth(7), }, - concurrencyLimit: 6, - evalDuration: time.Second * 225, - expectedFair: []bool{true}, - expectedFairnessMargin: []float64{0.33}, - expectAllRequests: true, - evalInqueueMetrics: true, - evalExecutingMetrics: true, - clk: clk, - counter: counter, + concurrencyLimit: 6, + evalDuration: time.Second * 225, + expectedFair: []bool{true}, + expectedFairnessMargin: []float64{0.33}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, + seatDemandIntegratorSubject: seatDemandIntegratorSubject, }.exercise(t) } @@ -874,7 +920,8 @@ func TestWindup(t *testing.T) { HandSize: 1, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) + seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) + qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) if err != nil { t.Fatal(err) } @@ -885,15 +932,16 @@ func TestWindup(t *testing.T) { newUniformClient(1001001001, 2, 40, time.Second, -1), newUniformClient(2002002002, 2, 40, time.Second, -1).setSplit(), }, - concurrencyLimit: 3, - evalDuration: time.Second * 40, - expectedFair: []bool{true, testCase.expectFair2}, - expectedFairnessMargin: []float64{0.01, testCase.margin2}, - expectAllRequests: true, - evalInqueueMetrics: true, - evalExecutingMetrics: true, - clk: clk, - counter: counter, + concurrencyLimit: 3, + evalDuration: time.Second * 40, + expectedFair: []bool{true, testCase.expectFair2}, + expectedFairnessMargin: []float64{0.01, testCase.margin2}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, + seatDemandIntegratorSubject: seatDemandIntegratorSubject, }.exercise(t) }) } @@ -909,7 +957,8 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) { Name: "TestDifferentFlowsWithoutQueuing", DesiredNumQueues: 0, } - qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) + seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, "seatDemandSubject") + qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) if err != nil { t.Fatal(err) } @@ -921,14 +970,15 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) { newUniformClient(1001001001, 6, 10, time.Second, 57*time.Millisecond), newUniformClient(2002002002, 4, 15, time.Second, 750*time.Millisecond), }, - concurrencyLimit: 4, - evalDuration: time.Second * 13, - expectedFair: []bool{false}, - expectedFairnessMargin: []float64{0.20}, - evalExecutingMetrics: true, - rejectReason: "concurrency-limit", - clk: clk, - counter: counter, + concurrencyLimit: 4, + evalDuration: time.Second * 13, + expectedFair: []bool{false}, + expectedFairnessMargin: []float64{0.20}, + evalExecutingMetrics: true, + rejectReason: "concurrency-limit", + clk: clk, + counter: counter, + seatDemandIntegratorSubject: seatDemandIntegratorSubject, }.exercise(t) } @@ -945,7 +995,8 @@ func TestTimeout(t *testing.T) { HandSize: 1, RequestWaitLimit: 0, } - qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) + seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) + qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) if err != nil { t.Fatal(err) } @@ -956,15 +1007,16 @@ func TestTimeout(t *testing.T) { clients: []uniformClient{ newUniformClient(1001001001, 5, 100, time.Second, time.Second), }, - concurrencyLimit: 1, - evalDuration: time.Second * 10, - expectedFair: []bool{true}, - expectedFairnessMargin: []float64{0.01}, - evalInqueueMetrics: true, - evalExecutingMetrics: true, - rejectReason: "time-out", - clk: clk, - counter: counter, + concurrencyLimit: 1, + evalDuration: time.Second * 10, + expectedFair: []bool{true}, + expectedFairnessMargin: []float64{0.01}, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + rejectReason: "time-out", + clk: clk, + counter: counter, + seatDemandIntegratorSubject: seatDemandIntegratorSubject, }.exercise(t) } @@ -996,7 +1048,8 @@ func TestContextCancel(t *testing.T) { HandSize: 1, RequestWaitLimit: 15 * time.Second, } - qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) + seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) + qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) if err != nil { t.Fatal(err) } @@ -1102,7 +1155,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) { DesiredNumQueues: 0, RequestWaitLimit: 15 * time.Second, } - qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk)) + qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), fq.NewNamedIntegrator(clk, qCfg.Name)) if err != nil { t.Fatal(err) } @@ -1286,6 +1339,7 @@ func TestFindDispatchQueueLocked(t *testing.T) { t.Run(test.name, func(t *testing.T) { qs := &queueSet{ estimatedServiceDuration: G, + seatDemandIntegrator: fq.NewNamedIntegrator(clock.RealClock{}, "seatDemandSubject"), robinIndex: test.robinIndex, totSeatsInUse: test.totSeatsInUse, qCfg: fq.QueuingConfig{Name: "TestSelectQueueLocked/" + test.name}, @@ -1359,6 +1413,7 @@ func TestFinishRequestLocked(t *testing.T) { estimatedServiceDuration: time.Second, reqsGaugePair: newGaugePair(clk), execSeatsGauge: newExecSeatsGauge(clk), + seatDemandIntegrator: fq.NewNamedIntegrator(clk, "seatDemandSubject"), } queue := &queue{ requests: newRequestFIFO(), @@ -1468,3 +1523,28 @@ func newGaugePair(clk clock.PassiveClock) metrics.RatioedGaugePair { func newExecSeatsGauge(clk clock.PassiveClock) metrics.RatioedGauge { return metrics.PriorityLevelExecutionSeatsGaugeVec.NewForLabelValuesSafe(0, 1, []string{"test"}) } + +func float64close(x, y float64) bool { + x0 := float64NaNTo0(x) + y0 := float64NaNTo0(y) + diff := math.Abs(x - y) + den := math.Max(math.Abs(x0), math.Abs(y0)) + if den == 0 { + return diff < 1e-10 + } + return diff/den < 1e-10 +} + +func uint64max(a, b uint64) uint64 { + if b > a { + return b + } + return a +} + +func float64NaNTo0(x float64) float64 { + if math.IsNaN(x) { + return 0 + } + return x +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go index 476b9e86298..1787b4d8abb 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go @@ -40,7 +40,7 @@ type noRestraint struct{} type noRestraintRequest struct{} -func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge) (fq.QueueSetCompleter, error) { +func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge, fq.Integrator) (fq.QueueSetCompleter, error) { return noRestraintCompleter{}, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/gen_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/gen_test.go index d7a221bf400..13c6c7bcc7f 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/gen_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/gen_test.go @@ -23,12 +23,15 @@ import ( "testing" "time" + "k8s.io/utils/clock" + flowcontrol "k8s.io/api/flowcontrol/v1beta3" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints/request" + fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" fqtesting "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing" fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format" "k8s.io/apiserver/pkg/util/flowcontrol/metrics" @@ -57,7 +60,7 @@ func genPL(rng *rand.Rand, name string) *flowcontrol.PriorityLevelConfiguration QueueLengthLimit: 5} } labelVals := []string{"test"} - _, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, time.Minute, metrics.RatioedGaugeVecPhasedElementPair(metrics.PriorityLevelConcurrencyGaugeVec, 1, 1, labelVals), metrics.PriorityLevelExecutionSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelVals)) + _, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, time.Minute, metrics.RatioedGaugeVecPhasedElementPair(metrics.PriorityLevelConcurrencyGaugeVec, 1, 1, labelVals), metrics.PriorityLevelExecutionSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelVals), fq.NewNamedIntegrator(clock.RealClock{}, name)) if err != nil { panic(err) }