From d2a27a58f0af20c6185fa1c21890d666e9d3746b Mon Sep 17 00:00:00 2001 From: Mike Spreitzer Date: Thu, 12 Aug 2021 16:48:02 -0400 Subject: [PATCH] Fix extra latency and add tests for that and width Added missing dispatching after delayed release of seats. Updated logging for all six situations of execution completion and seat release. Added behavioral tests for non-zero extra latency and non-unit width. Also added two tests for baseline functionality. Also improved some comments and other logging in `queueset.go`. --- .../fairqueuing/queueset/queueset.go | 100 ++++--- .../fairqueuing/queueset/queueset_test.go | 260 ++++++++++++++++-- .../flowcontrol/fairqueuing/queueset/types.go | 5 +- 3 files changed, 298 insertions(+), 67 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index db71ad61d24..ffb3549b8e0 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -18,6 +18,7 @@ package queueset import ( "context" + "errors" "fmt" "math" "sync" @@ -101,7 +102,8 @@ type queueSet struct { // queues are still draining. queues []*queue - // virtualTime is the number of virtual seconds since process startup + // virtualTime is the amount of seat-seconds allocated per queue since process startup. + // This is our generalization of the progress meter named R in the original fair queuing work. virtualTime float64 // lastRealTime is what `clock.Now()` yielded when `virtualTime` was last updated @@ -477,13 +479,15 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte // in addition to their seats. // Ideally, this should be based on projected completion time in the // virtual world of the youngest request in the queue. - thisSeatsSum := qs.queues[queueIdx].requests.SeatsSum() - klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of seatsSum %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisSeatsSum) + queue := qs.queues[queueIdx] + waitingSeats := queue.requests.SeatsSum() + thisSeatsSum := waitingSeats // + queue.seatsInUse + klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of %d seats waiting and %d executing", qs.qCfg.Name, descr1, descr2, queueIdx, waitingSeats, queue.seatsInUse) if thisSeatsSum < bestQueueSeatsSum { bestQueueIdx, bestQueueSeatsSum = queueIdx, thisSeatsSum } }) - klog.V(6).Infof("QS(%s) at r=%s v=%.9fs: For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueSeatsSum, qs.queues[bestQueueIdx].requestsExecuting) + klog.V(6).Infof("QS(%s) at r=%s v=%.9fss: For request %#+v %#+v chose queue %d, had seatSum %d & %d requests executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueSeatsSum, qs.queues[bestQueueIdx].requestsExecuting) return bestQueueIdx } @@ -549,10 +553,10 @@ func (qs *queueSet) enqueueLocked(request *request) { queue := request.queue now := qs.clock.Now() if queue.requests.Length() == 0 && queue.requestsExecuting == 0 { - // the queue’s virtual start time is set to the virtual time. + // the queue’s start R is set to the virtual time. queue.virtualStart = qs.virtualTime if klog.V(6).Enabled() { - klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2) + klog.Infof("QS(%s) at r=%s v=%.9fss: initialized queue %d start R due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2) } } queue.Enqueue(request) @@ -598,7 +602,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.Seats()) qs.obsPair.RequestsExecuting.Add(1) if klog.V(5).Enabled() { - klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting) + klog.Infof("QS(%s) at r=%s v=%.9fss: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting) } return req } @@ -634,9 +638,9 @@ func (qs *queueSet) dispatchLocked() bool { qs.obsPair.RequestsWaiting.Add(-1) qs.obsPair.RequestsExecuting.Add(1) if klog.V(6).Enabled() { - klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", + klog.Infof("QS(%s) at r=%s v=%.9fss: dispatching request %#+v %#+v work %v from queue %d with start R %.9fss, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied", qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, - queue.index, queue.virtualStart, queue.requests.Length(), queue.requestsExecuting) + request.workEstimate, queue.index, queue.virtualStart, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse) } // When a request is dequeued for service -> qs.virtualStart += G queue.virtualStart += qs.estimatedServiceTime * float64(request.Seats()) @@ -659,10 +663,6 @@ func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool { } // wait for all "currently" executing requests in this queueSet // to finish before we can execute this request. - if klog.V(4).Enabled() { - klog.Infof("QS(%s): seats (%d) asked for exceeds concurrency limit, waiting for currently executing requests to complete, %d seats are in use (%d are executing) and the limit is %d", - qs.qCfg.Name, seats, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit) - } return false case qs.totSeatsInUse+seats > qs.dCfg.ConcurrencyLimit: return false @@ -692,8 +692,8 @@ func (qs *queueSet) selectQueueLocked() *queue { estimatedWorkInProgress := qs.estimatedServiceTime * float64(queue.seatsInUse) dsMin = math.Min(dsMin, queue.virtualStart-estimatedWorkInProgress) dsMax = math.Max(dsMax, queue.virtualStart-estimatedWorkInProgress) - // the virtual finish time of the oldest request is: - // virtual start time + G + // the finish R of the oldest request is: + // start R + G // we are not taking the width of the request into account when // we calculate the virtual finish time of the request because // it can starve requests with smaller wdith in other queues. @@ -704,12 +704,12 @@ func (qs *queueSet) selectQueueLocked() *queue { // - we have two queues, q1 and q2 // - q1 has an infinite supply of requests with width W=1 // - q2 has one request waiting in the queue with width W=2 - // - virtual start time for both q1 and q2 are at t0 + // - start R for both q1 and q2 are at t0 // - requests complete really fast, S=1ms on q1 // in this scenario we will execute roughly 60,000 requests // from q1 before we pick the request from q2. currentVirtualFinish := queue.virtualStart + qs.estimatedServiceTime - + klog.V(11).InfoS("Considering queue to dispatch", "queueSet", qs.qCfg.Name, "queue", qs.robinIndex, "finishR", currentVirtualFinish) if currentVirtualFinish < minVirtualFinish { minVirtualFinish = currentVirtualFinish minQueue = queue @@ -724,9 +724,18 @@ func (qs *queueSet) selectQueueLocked() *queue { oldestReqFromMinQueue = r return false }) - if oldestReqFromMinQueue == nil || !qs.canAccommodateSeatsLocked(oldestReqFromMinQueue.Seats()) { + if oldestReqFromMinQueue == nil { + // This cannot happen + klog.ErrorS(errors.New("selected queue is empty"), "Impossible", "queueSet", qs.qCfg.Name) + return nil + } + if !qs.canAccommodateSeatsLocked(oldestReqFromMinQueue.Seats()) { // since we have not picked the queue with the minimum virtual finish // time, we are not going to advance the round robin index here. + if klog.V(4).Enabled() { + klog.Infof("QS(%s): request %v %v seats %d cannot be dispatched from queue %d, waiting for currently executing requests to complete, %d requests are occupying %d seats and the limit is %d", + qs.qCfg.Name, oldestReqFromMinQueue.descr1, oldestReqFromMinQueue.descr2, oldestReqFromMinQueue.Seats(), minQueue.index, qs.totRequestsExecuting, qs.totSeatsInUse, qs.dCfg.ConcurrencyLimit) + } return nil } @@ -743,7 +752,7 @@ func (qs *queueSet) selectQueueLocked() *queue { // time. // // hence we're refreshing the per-queue virtual time for the chosen - // queue here. if the last virtual start time (excluded estimated cost) + // queue here. if the last start R (excluded estimated cost) // falls behind the global virtual time, we update the latest virtual // start by: + previouslyEstimatedServiceTime := float64(minQueue.seatsInUse) * qs.estimatedServiceTime @@ -790,12 +799,6 @@ func (qs *queueSet) finishRequestLocked(r *request) { metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.Seats()) if r.queue != nil { r.queue.seatsInUse -= r.Seats() - - if klog.V(6).Enabled() { - klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", - qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index, - r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requestsExecuting) - } } } @@ -803,34 +806,55 @@ func (qs *queueSet) finishRequestLocked(r *request) { if r.workEstimate.AdditionalLatency <= 0 { // release the seats allocated to this request immediately releaseSeatsLocked() + if !klog.V(6).Enabled() { + } else if r.queue != nil { + klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished all use of %d seats, adjusted queue %d start R to %.9fss due to service time %.9fs, queue will have %d requests, %d seats waiting & %d requests occupying %d seats", + qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, r.queue.index, + r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requests.SeatsSum(), r.queue.requestsExecuting, r.queue.seatsInUse) + } else { + klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished all use of %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, qs.totRequestsExecuting, qs.totSeatsInUse) + } return } additionalLatency := r.workEstimate.AdditionalLatency + if !klog.V(6).Enabled() { + } else if r.queue != nil { + klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, adjusted queue %d start R to %.9fss due to service time %.9fs, queue will have %d requests waiting & %d executing, still has %d seats waiting & %d executing", + qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, additionalLatency.Seconds(), r.queue.index, + r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requestsExecuting, r.queue.requests.SeatsSum(), r.queue.seatsInUse) + } else { + klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, additionalLatency.Seconds(), qs.totRequestsExecuting, qs.totSeatsInUse) + } // EventAfterDuration will execute the event func in a new goroutine, // so the seats allocated to this request will be released after // AdditionalLatency elapses, this ensures that the additional // latency has no impact on the user experience. qs.clock.EventAfterDuration(func(_ time.Time) { - qs.lock.Lock() + qs.lockAndSyncTime() defer qs.lock.Unlock() + now := qs.clock.Now() releaseSeatsLocked() + if !klog.V(6).Enabled() { + } else if r.queue != nil { + klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished lingering on %d seats, queue %d will have %d requests, %d seats waiting & %d requests occupying %d seats", + qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, r.queue.index, + r.queue.requests.Length(), r.queue.requests.SeatsSum(), r.queue.requestsExecuting, r.queue.seatsInUse) + } else { + klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished lingering on %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, qs.totRequestsExecuting, qs.totSeatsInUse) + } + qs.dispatchAsMuchAsPossibleLocked() }, additionalLatency) }() - if r.queue == nil { - if klog.V(6).Enabled() { - klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting) - } - return + if r.queue != nil { + // request has finished, remove from requests executing + r.queue.requestsExecuting-- + + // When a request finishes being served, and the actual service time was S, + // the queue’s start R is decremented by (G - S)*width. + r.queue.virtualStart -= (qs.estimatedServiceTime - S) * float64(r.Seats()) } - - // request has finished, remove from requests executing - r.queue.requestsExecuting-- - - // When a request finishes being served, and the actual service time was S, - // the queue’s virtual start time is decremented by (G - S)*width. - r.queue.virtualStart -= (qs.estimatedServiceTime - S) * float64(r.Seats()) } func (qs *queueSet) removeQueueIfEmptyLocked(r *request) { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index b72b66dc867..bc2324dc884 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -108,10 +108,46 @@ type uniformClient struct { // causing a request to be launched a certain amount of time // before the previous one finishes. thinkDuration time.Duration + // padDuration is additional time during which this request occupies its seats. + // This comes at the end of execution, after the reply has been released toward + // the client. + // The evaluation code below can only handle two cases: + // - this padding always keeps another request out of the seats, or + // - this padding never keeps another request out of the seats. + // Set the `padConstrains` field of the scenario accordingly. + padDuration time.Duration // When true indicates that only half the specified number of // threads should run during the first half of the evaluation // period split bool + // width is the number of seats this request occupies while executing + width uint +} + +func newUniformClient(hash uint64, nThreads, nCalls int, execDuration, thinkDuration time.Duration) uniformClient { + return uniformClient{ + hash: hash, + nThreads: nThreads, + nCalls: nCalls, + execDuration: execDuration, + thinkDuration: thinkDuration, + width: 1, + } +} + +func (uc uniformClient) setSplit() uniformClient { + uc.split = true + return uc +} + +func (uc uniformClient) seats(width uint) uniformClient { + uc.width = width + return uc +} + +func (uc uniformClient) pad(duration time.Duration) uniformClient { + uc.padDuration = duration + return uc } // uniformScenario describes a scenario based on the given set of uniform clients. @@ -127,6 +163,8 @@ type uniformClient struct { // fair in the respective halves of a split scenario; // in a non-split scenario this is a singleton with one expectation. // expectAllRequests indicates whether all requests are expected to get dispatched. +// padConstrains indicates whether the execution duration padding, if any, +// is expected to hold up dispatching. type uniformScenario struct { name string qs fq.QueueSet @@ -140,13 +178,14 @@ type uniformScenario struct { rejectReason string clk *testeventclock.Fake counter counter.GoRoutineCounter + padConstrains bool } func (us uniformScenario) exercise(t *testing.T) { uss := uniformScenarioState{ t: t, uniformScenario: us, - startTime: time.Now(), + startTime: us.clk.Now(), integrators: make([]fq.Integrator, len(us.clients)), executions: make([]int32, len(us.clients)), rejects: make([]int32, len(us.clients)), @@ -227,7 +266,7 @@ func (ust *uniformScenarioThread) callK(k int) { if k >= ust.nCalls { return } - req, idle := ust.uss.qs.StartRequest(context.Background(), &fcrequest.WorkEstimate{Seats: 1}, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil) + req, idle := ust.uss.qs.StartRequest(context.Background(), &fcrequest.WorkEstimate{Seats: ust.uc.width, AdditionalLatency: ust.uc.padDuration}, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil) ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle) if req == nil { atomic.AddUint64(&ust.uss.failedCount, 1) @@ -238,20 +277,25 @@ func (ust *uniformScenarioThread) callK(k int) { ust.uss.t.Error("got request but QueueSet reported idle") } var executed bool + var returnTime time.Time idle2 := req.Finish(func() { executed = true execStart := ust.uss.clk.Now() - ust.uss.t.Logf("%s: %d, %d, %d executing", execStart.Format(nsTimeFmt), ust.i, ust.j, k) atomic.AddInt32(&ust.uss.executions[ust.i], 1) - ust.igr.Add(1) + ust.igr.Add(float64(ust.uc.width)) + ust.uss.t.Logf("%s: %d, %d, %d executing; seats=%d", execStart.Format(nsTimeFmt), ust.i, ust.j, k, ust.uc.width) ust.uss.clk.EventAfterDuration(ust.genCallK(k+1), ust.uc.execDuration+ust.uc.thinkDuration) ust.uss.clk.Sleep(ust.uc.execDuration) - ust.igr.Add(-1) + ust.igr.Add(-float64(ust.uc.width)) + returnTime = ust.uss.clk.Now() }) - ust.uss.t.Logf("%s: %d, %d, %d got executed=%v, idle2=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, executed, idle2) + now := ust.uss.clk.Now() + ust.uss.t.Logf("%s: %d, %d, %d got executed=%v, idle2=%v", now.Format(nsTimeFmt), ust.i, ust.j, k, executed, idle2) if !executed { atomic.AddUint64(&ust.uss.failedCount, 1) atomic.AddInt32(&ust.uss.rejects[ust.i], 1) + } else if now != returnTime { + ust.uss.t.Errorf("%s: %d, %d, %d returnTime=%s", now.Format(nsTimeFmt), ust.i, ust.j, k, returnTime.Format(nsTimeFmt)) } } @@ -270,23 +314,28 @@ func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool, ma if uc.split && !last { nThreads = nThreads / 2 } - demands[i] = float64(nThreads) * float64(uc.execDuration) / float64(uc.thinkDuration+uc.execDuration) + sep := uc.thinkDuration + if uss.padConstrains && uc.padDuration > sep { + sep = uc.padDuration + } + demands[i] = float64(nThreads) * float64(uc.width) * float64(uc.execDuration) / float64(sep+uc.execDuration) averages[i] = uss.integrators[i].Reset().Average } fairAverages := fairAlloc(demands, float64(uss.concurrencyLimit)) for i := range uss.clients { + expectedAverage := fairAverages[i] var gotFair bool - if fairAverages[i] > 0 { - relDiff := (averages[i] - fairAverages[i]) / fairAverages[i] + if expectedAverage > 0 { + relDiff := (averages[i] - expectedAverage) / expectedAverage gotFair = math.Abs(relDiff) <= margin } else { gotFair = math.Abs(averages[i]) <= margin } if gotFair != expectFair { - uss.t.Errorf("%s client %d last=%v got an Average of %v but the fair average was %v", uss.name, i, last, averages[i], fairAverages[i]) + uss.t.Errorf("%s client %d last=%v got an Average of %v but the expected average was %v", uss.name, i, last, averages[i], expectedAverage) } else { - uss.t.Logf("%s client %d last=%v got an Average of %v and the fair average was %v", uss.name, i, last, averages[i], fairAverages[i]) + uss.t.Logf("%s client %d last=%v got an Average of %v and the expected average was %v", uss.name, i, last, averages[i], expectedAverage) } } } @@ -376,8 +425,8 @@ func TestNoRestraint(t *testing.T) { uniformScenario{name: "NoRestraint", qs: nr, clients: []uniformClient{ - {1001001001, 5, 10, time.Second, time.Second, false}, - {2002002002, 2, 10, time.Second, time.Second / 2, false}, + newUniformClient(1001001001, 5, 10, time.Second, time.Second), + newUniformClient(2002002002, 2, 10, time.Second, time.Second/2), }, concurrencyLimit: 10, evalDuration: time.Second * 15, @@ -389,6 +438,90 @@ func TestNoRestraint(t *testing.T) { }.exercise(t) } +func TestBaseline(t *testing.T) { + metrics.Register() + now := time.Now() + + clk, counter := testeventclock.NewFake(now, 0, nil) + qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter)) + qCfg := fq.QueuingConfig{ + Name: "TestBaseline", + DesiredNumQueues: 9, + QueueLengthLimit: 8, + HandSize: 3, + RequestWaitLimit: 10 * time.Minute, + } + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + if err != nil { + t.Fatal(err) + } + qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1}) + + uniformScenario{name: qCfg.Name, + qs: qs, + clients: []uniformClient{ + newUniformClient(1001001001, 1, 21, time.Second, 0), + }, + concurrencyLimit: 1, + evalDuration: time.Second * 20, + expectedFair: []bool{true}, + expectedFairnessMargin: []float64{0}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, + }.exercise(t) +} + +func TestSeparations(t *testing.T) { + for _, seps := range []struct{ think, pad time.Duration }{ + {think: time.Second, pad: 0}, + {think: 0, pad: time.Second}, + {think: time.Second, pad: time.Second / 2}, + {think: time.Second / 2, pad: time.Second}, + } { + for conc := 1; conc <= 2; conc++ { + caseName := fmt.Sprintf("seps%v,%v,%v", seps.think, seps.pad, conc) + t.Run(caseName, func(t *testing.T) { + metrics.Register() + now := time.Now() + + clk, counter := testeventclock.NewFake(now, 0, nil) + qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter)) + qCfg := fq.QueuingConfig{ + Name: caseName, + DesiredNumQueues: 9, + QueueLengthLimit: 8, + HandSize: 3, + RequestWaitLimit: 10 * time.Minute, + } + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + if err != nil { + t.Fatal(err) + } + qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: conc}) + uniformScenario{name: qCfg.Name, + qs: qs, + clients: []uniformClient{ + newUniformClient(1001001001, 1, 19, time.Second, seps.think).pad(seps.pad), + }, + concurrencyLimit: conc, + evalDuration: time.Second * 18, // multiple of every period involved, so that margin can be 0 below + expectedFair: []bool{true}, + expectedFairnessMargin: []float64{0}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, + padConstrains: conc == 1, + }.exercise(t) + }) + } + } +} + func TestUniformFlowsHandSize1(t *testing.T) { metrics.Register() now := time.Now() @@ -411,8 +544,8 @@ func TestUniformFlowsHandSize1(t *testing.T) { uniformScenario{name: qCfg.Name, qs: qs, clients: []uniformClient{ - {1001001001, 8, 20, time.Second, time.Second - 1, false}, - {2002002002, 8, 20, time.Second, time.Second - 1, false}, + newUniformClient(1001001001, 8, 20, time.Second, time.Second-1), + newUniformClient(2002002002, 8, 20, time.Second, time.Second-1), }, concurrencyLimit: 4, evalDuration: time.Second * 50, @@ -447,8 +580,8 @@ func TestUniformFlowsHandSize3(t *testing.T) { uniformScenario{name: qCfg.Name, qs: qs, clients: []uniformClient{ - {1001001001, 8, 30, time.Second, time.Second - 1, false}, - {2002002002, 8, 30, time.Second, time.Second - 1, false}, + newUniformClient(1001001001, 8, 30, time.Second, time.Second-1), + newUniformClient(2002002002, 8, 30, time.Second, time.Second-1), }, concurrencyLimit: 4, evalDuration: time.Second * 60, @@ -484,8 +617,8 @@ func TestDifferentFlowsExpectEqual(t *testing.T) { uniformScenario{name: qCfg.Name, qs: qs, clients: []uniformClient{ - {1001001001, 8, 20, time.Second, time.Second, false}, - {2002002002, 7, 30, time.Second, time.Second / 2, false}, + newUniformClient(1001001001, 8, 20, time.Second, time.Second), + newUniformClient(2002002002, 7, 30, time.Second, time.Second/2), }, concurrencyLimit: 4, evalDuration: time.Second * 40, @@ -521,8 +654,8 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) { uniformScenario{name: qCfg.Name, qs: qs, clients: []uniformClient{ - {1001001001, 4, 20, time.Second, time.Second - 1, false}, - {2002002002, 2, 20, time.Second, time.Second - 1, false}, + newUniformClient(1001001001, 4, 20, time.Second, time.Second-1), + newUniformClient(2002002002, 2, 20, time.Second, time.Second-1), }, concurrencyLimit: 3, evalDuration: time.Second * 20, @@ -536,6 +669,81 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) { }.exercise(t) } +func TestDifferentWidths(t *testing.T) { + metrics.Register() + now := time.Now() + + clk, counter := testeventclock.NewFake(now, 0, nil) + qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter)) + qCfg := fq.QueuingConfig{ + Name: "TestDifferentWidths", + DesiredNumQueues: 64, + QueueLengthLimit: 4, + HandSize: 7, + RequestWaitLimit: 10 * time.Minute, + } + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + if err != nil { + t.Fatal(err) + } + qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 6}) + uniformScenario{name: qCfg.Name, + qs: qs, + clients: []uniformClient{ + newUniformClient(10010010010010, 13, 10, time.Second, time.Second-1), + newUniformClient(20020020020020, 7, 10, time.Second, time.Second-1).seats(2), + }, + concurrencyLimit: 6, + evalDuration: time.Second * 20, + expectedFair: []bool{true}, + expectedFairnessMargin: []float64{0.1}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, + }.exercise(t) +} + +func TestTooWide(t *testing.T) { + metrics.Register() + now := time.Now() + + clk, counter := testeventclock.NewFake(now, 0, nil) + qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter)) + qCfg := fq.QueuingConfig{ + Name: "TestTooWide", + DesiredNumQueues: 64, + QueueLengthLimit: 7, + HandSize: 7, + RequestWaitLimit: 10 * time.Minute, + } + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + if err != nil { + t.Fatal(err) + } + qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 6}) + uniformScenario{name: qCfg.Name, + qs: qs, + clients: []uniformClient{ + newUniformClient(40040040040040, 15, 21, time.Second, time.Second-1).seats(2), + newUniformClient(50050050050050, 15, 21, time.Second, time.Second-1).seats(2), + newUniformClient(60060060060060, 15, 21, time.Second, time.Second-1).seats(2), + newUniformClient(70070070070070, 15, 21, time.Second, time.Second-1).seats(2), + newUniformClient(90090090090090, 15, 21, time.Second, time.Second-1).seats(7), + }, + concurrencyLimit: 6, + evalDuration: time.Second * 40, + expectedFair: []bool{true}, + expectedFairnessMargin: []float64{0.35}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, + }.exercise(t) +} + func TestWindup(t *testing.T) { metrics.Register() now := time.Now() @@ -557,8 +765,8 @@ func TestWindup(t *testing.T) { uniformScenario{name: qCfg.Name, qs: qs, clients: []uniformClient{ - {1001001001, 2, 40, time.Second, -1, false}, - {2002002002, 2, 40, time.Second, -1, true}, + newUniformClient(1001001001, 2, 40, time.Second, -1), + newUniformClient(2002002002, 2, 40, time.Second, -1).setSplit(), }, concurrencyLimit: 3, evalDuration: time.Second * 40, @@ -591,8 +799,8 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) { uniformScenario{name: qCfg.Name, qs: qs, clients: []uniformClient{ - {1001001001, 6, 10, time.Second, 57 * time.Millisecond, false}, - {2002002002, 4, 15, time.Second, 750 * time.Millisecond, false}, + newUniformClient(1001001001, 6, 10, time.Second, 57*time.Millisecond), + newUniformClient(2002002002, 4, 15, time.Second, 750*time.Millisecond), }, concurrencyLimit: 4, evalDuration: time.Second * 13, @@ -627,7 +835,7 @@ func TestTimeout(t *testing.T) { uniformScenario{name: qCfg.Name, qs: qs, clients: []uniformClient{ - {1001001001, 5, 100, time.Second, time.Second, false}, + newUniformClient(1001001001, 5, 100, time.Second, time.Second), }, concurrencyLimit: 1, evalDuration: time.Second * 10, diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index 5bfe85c76b1..1363f24d781 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -79,9 +79,8 @@ type queue struct { // The requests are stored in a FIFO list. requests fifo - // virtualStart is the virtual time (virtual seconds since process - // startup) when the oldest request in the queue (if there is any) - // started virtually executing + // virtualStart is the "virtual time" (R progress meter reading) at + // which the next request will be dispatched in the virtual world. virtualStart float64 requestsExecuting int