diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 035ffbbd049..1a41a833e6e 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -76,9 +76,9 @@ type queueSetCompleter struct { // not end in "Locked" either acquires the lock or does not care about // locking. type queueSet struct { - clock eventclock.Interface - estimatedServiceSeconds float64 - obsPair metrics.TimedObserverPair + clock eventclock.Interface + estimatedServiceDuration time.Duration + obsPair metrics.TimedObserverPair promiseFactory promiseFactory @@ -102,9 +102,9 @@ type queueSet struct { // queues are still draining. queues []*queue - // virtualTime is the amount of seat-seconds allocated per queue since process startup. + // currentR is the amount of seat-seconds allocated per queue since process startup. // This is our generalization of the progress meter named R in the original fair queuing work. - virtualTime float64 + currentR SeatSeconds // lastRealTime is what `clock.Now()` yielded when `virtualTime` was last updated lastRealTime time.Time @@ -173,12 +173,12 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet { qs := qsc.theSet if qs == nil { qs = &queueSet{ - clock: qsc.factory.clock, - estimatedServiceSeconds: 0.003, - obsPair: qsc.obsPair, - qCfg: qsc.qCfg, - virtualTime: 0, - lastRealTime: qsc.factory.clock.Now(), + clock: qsc.factory.clock, + estimatedServiceDuration: 3 * time.Millisecond, + obsPair: qsc.obsPair, + qCfg: qsc.qCfg, + currentR: 0, + lastRealTime: qsc.factory.clock.Now(), } qs.promiseFactory = qsc.factory.promiseFactoryFactory(qs) } @@ -407,10 +407,14 @@ func (qs *queueSet) lockAndSyncTime() { // lock and before modifying the state of any queue. func (qs *queueSet) syncTimeLocked() { realNow := qs.clock.Now() - timeSinceLast := realNow.Sub(qs.lastRealTime).Seconds() + timeSinceLast := realNow.Sub(qs.lastRealTime) qs.lastRealTime = realNow - qs.virtualTime += timeSinceLast * qs.getVirtualTimeRatioLocked() - metrics.SetCurrentR(qs.qCfg.Name, qs.virtualTime) + prevR := qs.currentR + qs.currentR += SeatsTimesDuration(qs.getVirtualTimeRatioLocked(), timeSinceLast) + if qs.currentR < prevR { + klog.ErrorS(errors.New("progress meter wrapped around"), "Wrap", "QS", qs.qCfg.Name, "prevR", prevR, "currentR", qs.currentR) + } + metrics.SetCurrentR(qs.qCfg.Name, qs.currentR.ToFloat()) } // getVirtualTimeRatio calculates the rate at which virtual time has @@ -460,7 +464,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte ctx: ctx, decision: qs.promiseFactory(nil, ctx.Done(), decisionCancel), arrivalTime: qs.clock.Now(), - arrivalR: qs.virtualTime, + arrivalR: qs.currentR, queue: queue, descr1: descr1, descr2: descr2, @@ -495,7 +499,7 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac // Ideally, this should be based on projected completion time in the // virtual world of the youngest request in the queue. thisSeatsSum := waitingSeats + queue.seatsInUse - klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of %d seats waiting and %d executing, virtualStart=%vss", qs.qCfg.Name, descr1, descr2, queueIdx, waitingSeats, queue.seatsInUse, queue.virtualStart) + klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of %d seats waiting and %d executing, nextDispatchR=%v", qs.qCfg.Name, descr1, descr2, queueIdx, waitingSeats, queue.seatsInUse, queue.nextDispatchR) if thisSeatsSum < bestQueueSeatsSum { bestQueueIdx, bestQueueSeatsSum = queueIdx, thisSeatsSum } @@ -503,7 +507,7 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac } if klog.V(6).Enabled() { chosenQueue := qs.queues[bestQueueIdx] - klog.V(6).Infof("QS(%s) at r=%s v=%.9fss: For request %#+v %#+v chose queue %d, had seatSum %d & %d requests executing & virtualStart=%vss", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, chosenQueue.requests.SeatsSum(), chosenQueue.requestsExecuting, chosenQueue.virtualStart) + klog.V(6).Infof("QS(%s) at t=%s R=%v: For request %#+v %#+v chose queue %d, had seatSum %d & %d requests executing & virtualStart=%v", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.currentR, descr1, descr2, bestQueueIdx, chosenQueue.requests.SeatsSum(), chosenQueue.requestsExecuting, chosenQueue.nextDispatchR) } return bestQueueIdx } @@ -571,9 +575,9 @@ func (qs *queueSet) enqueueLocked(request *request) { now := qs.clock.Now() if queue.requests.Length() == 0 && queue.requestsExecuting == 0 { // the queue’s start R is set to the virtual time. - queue.virtualStart = qs.virtualTime + queue.nextDispatchR = qs.currentR if klog.V(6).Enabled() { - klog.Infof("QS(%s) at r=%s v=%.9fss: initialized queue %d start R due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2) + klog.Infof("QS(%s) at t=%s R=%v: initialized queue %d start R due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.nextDispatchR, queue.index, request.descr1, request.descr2) } } queue.Enqueue(request) @@ -609,7 +613,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f startTime: now, decision: qs.promiseFactory(decisionExecute, ctx.Done(), decisionCancel), arrivalTime: now, - arrivalR: qs.virtualTime, + arrivalR: qs.currentR, descr1: descr1, descr2: descr2, workEstimate: *workEstimate, @@ -620,7 +624,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.Seats()) qs.obsPair.RequestsExecuting.Add(1) if klog.V(5).Enabled() { - klog.Infof("QS(%s) at r=%s v=%.9fss: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting) + klog.Infof("QS(%s) at t=%s R=%v: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, fsName, descr1, descr2, qs.totRequestsExecuting) } return req } @@ -656,12 +660,12 @@ func (qs *queueSet) dispatchLocked() bool { qs.obsPair.RequestsWaiting.Add(-1) qs.obsPair.RequestsExecuting.Add(1) if klog.V(6).Enabled() { - klog.Infof("QS(%s) at r=%s v=%.9fss: dispatching request %#+v %#+v work %v from queue %d with start R %.9fss, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied", - qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, - request.workEstimate, queue.index, queue.virtualStart, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse) + klog.Infof("QS(%s) at t=%s R=%v: dispatching request %#+v %#+v work %v from queue %d with start R %v, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied", + qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.currentR, request.descr1, request.descr2, + request.workEstimate, queue.index, queue.nextDispatchR, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse) } // When a request is dequeued for service -> qs.virtualStart += G * width - queue.virtualStart += qs.estimatedServiceSeconds * float64(request.Seats()) + queue.nextDispatchR += SeatsTimesDuration(float64(request.Seats()), qs.estimatedServiceDuration) qs.boundNextDispatch(queue) request.decision.Set(decisionExecute) return ok @@ -694,11 +698,11 @@ func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool { // returns the first one of those for which the virtual finish time of // the oldest waiting request is minimal. func (qs *queueSet) findDispatchQueueLocked() *queue { - minVirtualFinish := math.Inf(1) - sMin := math.Inf(1) - dsMin := math.Inf(1) - sMax := math.Inf(-1) - dsMax := math.Inf(-1) + minVirtualFinish := MaxSeatSeconds + sMin := MaxSeatSeconds + dsMin := MaxSeatSeconds + sMax := MinSeatSeconds + dsMax := MinSeatSeconds var minQueue *queue var minIndex int nq := len(qs.queues) @@ -707,12 +711,12 @@ func (qs *queueSet) findDispatchQueueLocked() *queue { queue := qs.queues[qs.robinIndex] oldestWaiting, _ := queue.requests.Peek() if oldestWaiting != nil { - sMin = math.Min(sMin, queue.virtualStart) - sMax = math.Max(sMax, queue.virtualStart) - estimatedWorkInProgress := qs.estimatedServiceSeconds * float64(queue.seatsInUse) - dsMin = math.Min(dsMin, queue.virtualStart-estimatedWorkInProgress) - dsMax = math.Max(dsMax, queue.virtualStart-estimatedWorkInProgress) - currentVirtualFinish := queue.virtualStart + qs.estimatedServiceSeconds*float64(oldestWaiting.Seats()) + sMin = ssMin(sMin, queue.nextDispatchR) + sMax = ssMax(sMax, queue.nextDispatchR) + estimatedWorkInProgress := SeatsTimesDuration(float64(queue.seatsInUse), qs.estimatedServiceDuration) + dsMin = ssMin(dsMin, queue.nextDispatchR-estimatedWorkInProgress) + dsMax = ssMax(dsMax, queue.nextDispatchR-estimatedWorkInProgress) + currentVirtualFinish := queue.nextDispatchR + SeatsTimesDuration(float64(oldestWaiting.Seats()), qs.estimatedServiceDuration) klog.V(11).InfoS("Considering queue to dispatch", "queueSet", qs.qCfg.Name, "queue", qs.robinIndex, "finishR", currentVirtualFinish) if currentVirtualFinish < minVirtualFinish { minVirtualFinish = currentVirtualFinish @@ -743,13 +747,27 @@ func (qs *queueSet) findDispatchQueueLocked() *queue { // win in the case that the virtual finish times are the same qs.robinIndex = minIndex - if minQueue.virtualStart < oldestReqFromMinQueue.arrivalR { - klog.ErrorS(errors.New("dispatch before arrival"), "Inconceivable!", "QS", qs.qCfg.Name, "queue", minQueue.index, "dispatchR", minQueue.virtualStart, "request", oldestReqFromMinQueue) + if minQueue.nextDispatchR < oldestReqFromMinQueue.arrivalR { + klog.ErrorS(errors.New("dispatch before arrival"), "Inconceivable!", "QS", qs.qCfg.Name, "queue", minQueue.index, "dispatchR", minQueue.nextDispatchR, "request", oldestReqFromMinQueue) } - metrics.SetDispatchMetrics(qs.qCfg.Name, qs.virtualTime, minQueue.virtualStart, sMin, sMax, dsMin, dsMax) + metrics.SetDispatchMetrics(qs.qCfg.Name, qs.currentR.ToFloat(), minQueue.nextDispatchR.ToFloat(), sMin.ToFloat(), sMax.ToFloat(), dsMin.ToFloat(), dsMax.ToFloat()) return minQueue } +func ssMin(a, b SeatSeconds) SeatSeconds { + if a > b { + return b + } + return a +} + +func ssMax(a, b SeatSeconds) SeatSeconds { + if a < b { + return b + } + return a +} + // finishRequestAndDispatchAsMuchAsPossible is a convenience method // which calls finishRequest for a given request and then dispatches // as many requests as possible. This is all of what needs to be done @@ -773,7 +791,7 @@ func (qs *queueSet) finishRequestLocked(r *request) { metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1) qs.obsPair.RequestsExecuting.Add(-1) - S := now.Sub(r.startTime).Seconds() + actualServiceDuration := now.Sub(r.startTime) // TODO: for now we keep the logic localized so it is easier to see // how the counters are tracked for queueset and queue, in future we @@ -794,11 +812,11 @@ func (qs *queueSet) finishRequestLocked(r *request) { releaseSeatsLocked() if !klog.V(6).Enabled() { } else if r.queue != nil { - klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished all use of %d seats, adjusted queue %d start R to %.9fss due to service time %.9fs, queue will have %d requests, %d seats waiting & %d requests occupying %d seats", - qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.InitialSeats, r.queue.index, - r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requests.SeatsSum(), r.queue.requestsExecuting, r.queue.seatsInUse) + klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished all use of %d seats, adjusted queue %d start R to %v due to service time %.9fs, queue will have %d requests, %d seats waiting & %d requests occupying %d seats", + qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, r.queue.index, + r.queue.nextDispatchR, actualServiceDuration.Seconds(), r.queue.requests.Length(), r.queue.requests.SeatsSum(), r.queue.requestsExecuting, r.queue.seatsInUse) } else { - klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished all use of %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.InitialSeats, qs.totRequestsExecuting, qs.totSeatsInUse) + klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished all use of %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, qs.totRequestsExecuting, qs.totSeatsInUse) } return } @@ -806,11 +824,11 @@ func (qs *queueSet) finishRequestLocked(r *request) { additionalLatency := r.workEstimate.AdditionalLatency if !klog.V(6).Enabled() { } else if r.queue != nil { - klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, adjusted queue %d start R to %.9fss due to service time %.9fs, queue will have %d requests waiting & %d executing, still has %d seats waiting & %d executing", - qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.InitialSeats, additionalLatency.Seconds(), r.queue.index, - r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requestsExecuting, r.queue.requests.SeatsSum(), r.queue.seatsInUse) + klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, adjusted queue %d start R to %v due to service time %.9fs, queue will have %d requests waiting & %d executing, still has %d seats waiting & %d executing", + qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, additionalLatency.Seconds(), r.queue.index, + r.queue.nextDispatchR, actualServiceDuration.Seconds(), r.queue.requests.Length(), r.queue.requestsExecuting, r.queue.requests.SeatsSum(), r.queue.seatsInUse) } else { - klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.InitialSeats, additionalLatency.Seconds(), qs.totRequestsExecuting, qs.totSeatsInUse) + klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, additionalLatency.Seconds(), qs.totRequestsExecuting, qs.totSeatsInUse) } // EventAfterDuration will execute the event func in a new goroutine, // so the seats allocated to this request will be released after @@ -823,11 +841,11 @@ func (qs *queueSet) finishRequestLocked(r *request) { releaseSeatsLocked() if !klog.V(6).Enabled() { } else if r.queue != nil { - klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished lingering on %d seats, queue %d will have %d requests, %d seats waiting & %d requests occupying %d seats", - qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.InitialSeats, r.queue.index, + klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished lingering on %d seats, queue %d will have %d requests, %d seats waiting & %d requests occupying %d seats", + qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, r.queue.index, r.queue.requests.Length(), r.queue.requests.SeatsSum(), r.queue.requestsExecuting, r.queue.seatsInUse) } else { - klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished lingering on %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.InitialSeats, qs.totRequestsExecuting, qs.totSeatsInUse) + klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished lingering on %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, qs.totRequestsExecuting, qs.totSeatsInUse) } qs.dispatchAsMuchAsPossibleLocked() }, additionalLatency) @@ -839,7 +857,7 @@ func (qs *queueSet) finishRequestLocked(r *request) { // When a request finishes being served, and the actual service time was S, // the queue’s start R is decremented by (G - S)*width. - r.queue.virtualStart -= (qs.estimatedServiceSeconds - S) * float64(r.Seats()) + r.queue.nextDispatchR -= SeatsTimesDuration(float64(r.Seats()), qs.estimatedServiceDuration-actualServiceDuration) qs.boundNextDispatch(r.queue) } } @@ -857,11 +875,11 @@ func (qs *queueSet) boundNextDispatch(queue *queue) { return } var virtualStartBound = oldestReqFromMinQueue.arrivalR - if queue.virtualStart < virtualStartBound { + if queue.nextDispatchR < virtualStartBound { if klog.V(4).Enabled() { - klog.InfoS("AntiWindup tweaked queue", "QS", qs.qCfg.Name, "queue", queue.index, "time", qs.clock.Now().Format(nsTimeFmt), "requestDescr1", oldestReqFromMinQueue.descr1, "requestDescr2", oldestReqFromMinQueue.descr2, "newVirtualStart", virtualStartBound, "deltaVirtualStart", (virtualStartBound - queue.virtualStart)) + klog.InfoS("AntiWindup tweaked queue", "QS", qs.qCfg.Name, "queue", queue.index, "time", qs.clock.Now().Format(nsTimeFmt), "requestDescr1", oldestReqFromMinQueue.descr1, "requestDescr2", oldestReqFromMinQueue.descr2, "newVirtualStart", virtualStartBound, "deltaVirtualStart", (virtualStartBound - queue.nextDispatchR)) } - queue.virtualStart = virtualStartBound + queue.nextDispatchR = virtualStartBound } } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 7a565c49016..7a514a1fa8b 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -247,7 +247,7 @@ type uniformScenarioThread struct { } func (ust *uniformScenarioThread) start() { - initialDelay := time.Duration(11*ust.j + 2*ust.i) + initialDelay := time.Duration(90*ust.j + 20*ust.i) if ust.uc.split && ust.j >= ust.uc.nThreads/2 { initialDelay += ust.uss.evalDuration / 2 ust.nCalls = ust.nCalls / 2 @@ -601,7 +601,7 @@ func TestUniformFlowsHandSize3(t *testing.T) { concurrencyLimit: 4, evalDuration: time.Second * 60, expectedFair: []bool{true}, - expectedFairnessMargin: []float64{0.01}, + expectedFairnessMargin: []float64{0.03}, expectAllRequests: true, evalInqueueMetrics: true, evalExecutingMetrics: true, @@ -647,6 +647,46 @@ func TestDifferentFlowsExpectEqual(t *testing.T) { }.exercise(t) } +// TestSeatSecondsRollover demonstrates that SeatSeconds overflow can cause bad stuff to happen. +func TestSeatSecondsRollover(t *testing.T) { + metrics.Register() + now := time.Now() + + const Quarter = 91 * 24 * time.Hour + + clk, counter := testeventclock.NewFake(now, 0, nil) + qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter)) + qCfg := fq.QueuingConfig{ + Name: "TestSeatSecondsRollover", + DesiredNumQueues: 9, + QueueLengthLimit: 8, + HandSize: 1, + RequestWaitLimit: 40 * Quarter, + } + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + if err != nil { + t.Fatal(err) + } + qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 2000}) + + uniformScenario{name: qCfg.Name, + qs: qs, + clients: []uniformClient{ + newUniformClient(1001001001, 8, 20, Quarter, Quarter).seats(500), + newUniformClient(2002002002, 7, 30, Quarter, Quarter/2).seats(500), + }, + concurrencyLimit: 2000, + evalDuration: Quarter * 40, + expectedFair: []bool{false}, + expectedFairnessMargin: []float64{0.01}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, + }.exercise(t) +} + func TestDifferentFlowsExpectUnequal(t *testing.T) { metrics.Register() now := time.Now() @@ -1073,7 +1113,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) { } func TestFindDispatchQueueLocked(t *testing.T) { - var G float64 = 0.003 + const G = 3 * time.Millisecond tests := []struct { name string robinIndex int @@ -1092,13 +1132,13 @@ func TestFindDispatchQueueLocked(t *testing.T) { robinIndex: -1, queues: []*queue{ { - virtualStart: 200, + nextDispatchR: SeatsTimesDuration(1, 200*time.Second), requests: newFIFO( &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 1}}, ), }, { - virtualStart: 100, + nextDispatchR: SeatsTimesDuration(1, 100*time.Second), requests: newFIFO( &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 1}}, ), @@ -1115,7 +1155,7 @@ func TestFindDispatchQueueLocked(t *testing.T) { robinIndex: -1, queues: []*queue{ { - virtualStart: 200, + nextDispatchR: SeatsTimesDuration(1, 200*time.Second), requests: newFIFO( &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 1}}, ), @@ -1132,13 +1172,13 @@ func TestFindDispatchQueueLocked(t *testing.T) { robinIndex: -1, queues: []*queue{ { - virtualStart: 200, + nextDispatchR: SeatsTimesDuration(1, 200*time.Second), requests: newFIFO( &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 50}}, ), }, { - virtualStart: 100, + nextDispatchR: SeatsTimesDuration(1, 100*time.Second), requests: newFIFO( &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 25}}, ), @@ -1155,13 +1195,13 @@ func TestFindDispatchQueueLocked(t *testing.T) { robinIndex: -1, queues: []*queue{ { - virtualStart: 200, + nextDispatchR: SeatsTimesDuration(1, 200*time.Second), requests: newFIFO( &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 10}}, ), }, { - virtualStart: 100, + nextDispatchR: SeatsTimesDuration(1, 100*time.Second), requests: newFIFO( &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 25}}, ), @@ -1178,13 +1218,13 @@ func TestFindDispatchQueueLocked(t *testing.T) { robinIndex: -1, queues: []*queue{ { - virtualStart: 200, + nextDispatchR: SeatsTimesDuration(1, 200*time.Second), requests: newFIFO( &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 10}}, ), }, { - virtualStart: 100, + nextDispatchR: SeatsTimesDuration(1, 100*time.Second), requests: newFIFO( &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 25}}, ), @@ -1204,10 +1244,10 @@ func TestFindDispatchQueueLocked(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { qs := &queueSet{ - estimatedServiceSeconds: G, - robinIndex: test.robinIndex, - totSeatsInUse: test.totSeatsInUse, - qCfg: fq.QueuingConfig{Name: "TestSelectQueueLocked/" + test.name}, + estimatedServiceDuration: G, + robinIndex: test.robinIndex, + totSeatsInUse: test.totSeatsInUse, + qCfg: fq.QueuingConfig{Name: "TestSelectQueueLocked/" + test.name}, dCfg: fq.DispatchingConfig{ ConcurrencyLimit: test.concurrencyLimit, }, diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/seatsecs_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/seatsecs_test.go new file mode 100644 index 00000000000..b0a98b2570b --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/seatsecs_test.go @@ -0,0 +1,45 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queueset + +import ( + "fmt" + "math" + "testing" + "time" +) + +func TestSeatSecondsString(t *testing.T) { + digits := math.Log10(ssScale) + expectFmt := fmt.Sprintf("%%%d.%dfss", int(digits+2), int(digits)) + testCases := []struct { + ss SeatSeconds + expect string + }{ + {ss: SeatSeconds(1), expect: fmt.Sprintf(expectFmt, 1.0/ssScale)}, + {ss: 0, expect: "0.00000000ss"}, + {ss: SeatsTimesDuration(1, time.Second), expect: "1.00000000ss"}, + {ss: SeatsTimesDuration(123, 100*time.Millisecond), expect: "12.30000000ss"}, + {ss: SeatsTimesDuration(1203, 10*time.Millisecond), expect: "12.03000000ss"}, + } + for _, testCase := range testCases { + actualStr := testCase.ss.String() + if actualStr != testCase.expect { + t.Errorf("SeatSeonds(%d) formatted as %q rather than expected %q", uint64(testCase.ss), actualStr, testCase.expect) + } + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index 7213460bbfb..2a36040566e 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -18,6 +18,8 @@ package queueset import ( "context" + "fmt" + "math" "time" genericrequest "k8s.io/apiserver/pkg/endpoints/request" @@ -60,7 +62,7 @@ type request struct { arrivalTime time.Time // arrivalR is R(arrivalTime). R is, confusingly, also called "virtual time". - arrivalR float64 + arrivalR SeatSeconds // descr1 and descr2 are not used in any logic but they appear in // log messages @@ -82,9 +84,9 @@ type queue struct { // The requests not yet executing in the real world are stored in a FIFO list. requests fifo - // virtualStart is the "virtual time" (R progress meter reading) at + // nextDispatchR is the R progress meter reading at // which the next request will be dispatched in the virtual world. - virtualStart float64 + nextDispatchR SeatSeconds // requestsExecuting is the count in the real world requestsExecuting int @@ -128,9 +130,45 @@ func (q *queue) dump(includeDetails bool) debug.QueueDump { return true }) return debug.QueueDump{ - VirtualStart: q.virtualStart, + VirtualStart: q.nextDispatchR.ToFloat(), // TODO: change QueueDump to use SeatSeconds Requests: digest, ExecutingRequests: q.requestsExecuting, SeatsInUse: q.seatsInUse, } } + +// SeatSeconds is a measure of work, in units of seat-seconds, using a fixed-point representation. +// `SeatSeconds(n)` represents `n/ssScale` seat-seconds. +// The constants `ssScale` and `ssScaleDigits` are private to the implementation here, +// no other code should use them. +type SeatSeconds uint64 + +// MaxSeatsSeconds is the maximum representable value of SeatSeconds +const MaxSeatSeconds = SeatSeconds(math.MaxUint64) + +// MinSeatSeconds is the lowest representable value of SeatSeconds +const MinSeatSeconds = SeatSeconds(0) + +// SeatsTimeDuration produces the SeatSeconds value for the given factors. +// This is intended only to produce small values, increments in work +// rather than amount of work done since process start. +func SeatsTimesDuration(seats float64, duration time.Duration) SeatSeconds { + return SeatSeconds(math.Round(seats * float64(duration/time.Nanosecond) / (1e9 / ssScale))) +} + +// ToFloat converts to a floating-point representation. +// This conversion may lose precision. +func (ss SeatSeconds) ToFloat() float64 { + return float64(ss) / ssScale +} + +// String converts to a string. +// This is suitable for large as well as small values. +func (ss SeatSeconds) String() string { + const div = SeatSeconds(ssScale) + quo := ss / div + rem := ss - quo*div + return fmt.Sprintf("%d.%08dss", quo, rem) +} + +const ssScale = 1e8