diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index db71ad61d24..ffb3549b8e0 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -18,6 +18,7 @@ package queueset import ( "context" + "errors" "fmt" "math" "sync" @@ -101,7 +102,8 @@ type queueSet struct { // queues are still draining. queues []*queue - // virtualTime is the number of virtual seconds since process startup + // virtualTime is the amount of seat-seconds allocated per queue since process startup. + // This is our generalization of the progress meter named R in the original fair queuing work. virtualTime float64 // lastRealTime is what `clock.Now()` yielded when `virtualTime` was last updated @@ -477,13 +479,15 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte // in addition to their seats. // Ideally, this should be based on projected completion time in the // virtual world of the youngest request in the queue. - thisSeatsSum := qs.queues[queueIdx].requests.SeatsSum() - klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of seatsSum %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisSeatsSum) + queue := qs.queues[queueIdx] + waitingSeats := queue.requests.SeatsSum() + thisSeatsSum := waitingSeats // + queue.seatsInUse + klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of %d seats waiting and %d executing", qs.qCfg.Name, descr1, descr2, queueIdx, waitingSeats, queue.seatsInUse) if thisSeatsSum < bestQueueSeatsSum { bestQueueIdx, bestQueueSeatsSum = queueIdx, thisSeatsSum } }) - klog.V(6).Infof("QS(%s) at r=%s v=%.9fs: For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueSeatsSum, qs.queues[bestQueueIdx].requestsExecuting) + klog.V(6).Infof("QS(%s) at r=%s v=%.9fss: For request %#+v %#+v chose queue %d, had seatSum %d & %d requests executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueSeatsSum, qs.queues[bestQueueIdx].requestsExecuting) return bestQueueIdx } @@ -549,10 +553,10 @@ func (qs *queueSet) enqueueLocked(request *request) { queue := request.queue now := qs.clock.Now() if queue.requests.Length() == 0 && queue.requestsExecuting == 0 { - // the queue’s virtual start time is set to the virtual time. + // the queue’s start R is set to the virtual time. queue.virtualStart = qs.virtualTime if klog.V(6).Enabled() { - klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2) + klog.Infof("QS(%s) at r=%s v=%.9fss: initialized queue %d start R due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2) } } queue.Enqueue(request) @@ -598,7 +602,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.Seats()) qs.obsPair.RequestsExecuting.Add(1) if klog.V(5).Enabled() { - klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting) + klog.Infof("QS(%s) at r=%s v=%.9fss: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting) } return req } @@ -634,9 +638,9 @@ func (qs *queueSet) dispatchLocked() bool { qs.obsPair.RequestsWaiting.Add(-1) qs.obsPair.RequestsExecuting.Add(1) if klog.V(6).Enabled() { - klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", + klog.Infof("QS(%s) at r=%s v=%.9fss: dispatching request %#+v %#+v work %v from queue %d with start R %.9fss, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied", qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, - queue.index, queue.virtualStart, queue.requests.Length(), queue.requestsExecuting) + request.workEstimate, queue.index, queue.virtualStart, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse) } // When a request is dequeued for service -> qs.virtualStart += G queue.virtualStart += qs.estimatedServiceTime * float64(request.Seats()) @@ -659,10 +663,6 @@ func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool { } // wait for all "currently" executing requests in this queueSet // to finish before we can execute this request. - if klog.V(4).Enabled() { - klog.Infof("QS(%s): seats (%d) asked for exceeds concurrency limit, waiting for currently executing requests to complete, %d seats are in use (%d are executing) and the limit is %d", - qs.qCfg.Name, seats, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit) - } return false case qs.totSeatsInUse+seats > qs.dCfg.ConcurrencyLimit: return false @@ -692,8 +692,8 @@ func (qs *queueSet) selectQueueLocked() *queue { estimatedWorkInProgress := qs.estimatedServiceTime * float64(queue.seatsInUse) dsMin = math.Min(dsMin, queue.virtualStart-estimatedWorkInProgress) dsMax = math.Max(dsMax, queue.virtualStart-estimatedWorkInProgress) - // the virtual finish time of the oldest request is: - // virtual start time + G + // the finish R of the oldest request is: + // start R + G // we are not taking the width of the request into account when // we calculate the virtual finish time of the request because // it can starve requests with smaller wdith in other queues. @@ -704,12 +704,12 @@ func (qs *queueSet) selectQueueLocked() *queue { // - we have two queues, q1 and q2 // - q1 has an infinite supply of requests with width W=1 // - q2 has one request waiting in the queue with width W=2 - // - virtual start time for both q1 and q2 are at t0 + // - start R for both q1 and q2 are at t0 // - requests complete really fast, S=1ms on q1 // in this scenario we will execute roughly 60,000 requests // from q1 before we pick the request from q2. currentVirtualFinish := queue.virtualStart + qs.estimatedServiceTime - + klog.V(11).InfoS("Considering queue to dispatch", "queueSet", qs.qCfg.Name, "queue", qs.robinIndex, "finishR", currentVirtualFinish) if currentVirtualFinish < minVirtualFinish { minVirtualFinish = currentVirtualFinish minQueue = queue @@ -724,9 +724,18 @@ func (qs *queueSet) selectQueueLocked() *queue { oldestReqFromMinQueue = r return false }) - if oldestReqFromMinQueue == nil || !qs.canAccommodateSeatsLocked(oldestReqFromMinQueue.Seats()) { + if oldestReqFromMinQueue == nil { + // This cannot happen + klog.ErrorS(errors.New("selected queue is empty"), "Impossible", "queueSet", qs.qCfg.Name) + return nil + } + if !qs.canAccommodateSeatsLocked(oldestReqFromMinQueue.Seats()) { // since we have not picked the queue with the minimum virtual finish // time, we are not going to advance the round robin index here. + if klog.V(4).Enabled() { + klog.Infof("QS(%s): request %v %v seats %d cannot be dispatched from queue %d, waiting for currently executing requests to complete, %d requests are occupying %d seats and the limit is %d", + qs.qCfg.Name, oldestReqFromMinQueue.descr1, oldestReqFromMinQueue.descr2, oldestReqFromMinQueue.Seats(), minQueue.index, qs.totRequestsExecuting, qs.totSeatsInUse, qs.dCfg.ConcurrencyLimit) + } return nil } @@ -743,7 +752,7 @@ func (qs *queueSet) selectQueueLocked() *queue { // time. // // hence we're refreshing the per-queue virtual time for the chosen - // queue here. if the last virtual start time (excluded estimated cost) + // queue here. if the last start R (excluded estimated cost) // falls behind the global virtual time, we update the latest virtual // start by: + previouslyEstimatedServiceTime := float64(minQueue.seatsInUse) * qs.estimatedServiceTime @@ -790,12 +799,6 @@ func (qs *queueSet) finishRequestLocked(r *request) { metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.Seats()) if r.queue != nil { r.queue.seatsInUse -= r.Seats() - - if klog.V(6).Enabled() { - klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", - qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index, - r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requestsExecuting) - } } } @@ -803,34 +806,55 @@ func (qs *queueSet) finishRequestLocked(r *request) { if r.workEstimate.AdditionalLatency <= 0 { // release the seats allocated to this request immediately releaseSeatsLocked() + if !klog.V(6).Enabled() { + } else if r.queue != nil { + klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished all use of %d seats, adjusted queue %d start R to %.9fss due to service time %.9fs, queue will have %d requests, %d seats waiting & %d requests occupying %d seats", + qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, r.queue.index, + r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requests.SeatsSum(), r.queue.requestsExecuting, r.queue.seatsInUse) + } else { + klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished all use of %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, qs.totRequestsExecuting, qs.totSeatsInUse) + } return } additionalLatency := r.workEstimate.AdditionalLatency + if !klog.V(6).Enabled() { + } else if r.queue != nil { + klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, adjusted queue %d start R to %.9fss due to service time %.9fs, queue will have %d requests waiting & %d executing, still has %d seats waiting & %d executing", + qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, additionalLatency.Seconds(), r.queue.index, + r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requestsExecuting, r.queue.requests.SeatsSum(), r.queue.seatsInUse) + } else { + klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, additionalLatency.Seconds(), qs.totRequestsExecuting, qs.totSeatsInUse) + } // EventAfterDuration will execute the event func in a new goroutine, // so the seats allocated to this request will be released after // AdditionalLatency elapses, this ensures that the additional // latency has no impact on the user experience. qs.clock.EventAfterDuration(func(_ time.Time) { - qs.lock.Lock() + qs.lockAndSyncTime() defer qs.lock.Unlock() + now := qs.clock.Now() releaseSeatsLocked() + if !klog.V(6).Enabled() { + } else if r.queue != nil { + klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished lingering on %d seats, queue %d will have %d requests, %d seats waiting & %d requests occupying %d seats", + qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, r.queue.index, + r.queue.requests.Length(), r.queue.requests.SeatsSum(), r.queue.requestsExecuting, r.queue.seatsInUse) + } else { + klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished lingering on %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, qs.totRequestsExecuting, qs.totSeatsInUse) + } + qs.dispatchAsMuchAsPossibleLocked() }, additionalLatency) }() - if r.queue == nil { - if klog.V(6).Enabled() { - klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting) - } - return + if r.queue != nil { + // request has finished, remove from requests executing + r.queue.requestsExecuting-- + + // When a request finishes being served, and the actual service time was S, + // the queue’s start R is decremented by (G - S)*width. + r.queue.virtualStart -= (qs.estimatedServiceTime - S) * float64(r.Seats()) } - - // request has finished, remove from requests executing - r.queue.requestsExecuting-- - - // When a request finishes being served, and the actual service time was S, - // the queue’s virtual start time is decremented by (G - S)*width. - r.queue.virtualStart -= (qs.estimatedServiceTime - S) * float64(r.Seats()) } func (qs *queueSet) removeQueueIfEmptyLocked(r *request) { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index b72b66dc867..bc2324dc884 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -108,10 +108,46 @@ type uniformClient struct { // causing a request to be launched a certain amount of time // before the previous one finishes. thinkDuration time.Duration + // padDuration is additional time during which this request occupies its seats. + // This comes at the end of execution, after the reply has been released toward + // the client. + // The evaluation code below can only handle two cases: + // - this padding always keeps another request out of the seats, or + // - this padding never keeps another request out of the seats. + // Set the `padConstrains` field of the scenario accordingly. + padDuration time.Duration // When true indicates that only half the specified number of // threads should run during the first half of the evaluation // period split bool + // width is the number of seats this request occupies while executing + width uint +} + +func newUniformClient(hash uint64, nThreads, nCalls int, execDuration, thinkDuration time.Duration) uniformClient { + return uniformClient{ + hash: hash, + nThreads: nThreads, + nCalls: nCalls, + execDuration: execDuration, + thinkDuration: thinkDuration, + width: 1, + } +} + +func (uc uniformClient) setSplit() uniformClient { + uc.split = true + return uc +} + +func (uc uniformClient) seats(width uint) uniformClient { + uc.width = width + return uc +} + +func (uc uniformClient) pad(duration time.Duration) uniformClient { + uc.padDuration = duration + return uc } // uniformScenario describes a scenario based on the given set of uniform clients. @@ -127,6 +163,8 @@ type uniformClient struct { // fair in the respective halves of a split scenario; // in a non-split scenario this is a singleton with one expectation. // expectAllRequests indicates whether all requests are expected to get dispatched. +// padConstrains indicates whether the execution duration padding, if any, +// is expected to hold up dispatching. type uniformScenario struct { name string qs fq.QueueSet @@ -140,13 +178,14 @@ type uniformScenario struct { rejectReason string clk *testeventclock.Fake counter counter.GoRoutineCounter + padConstrains bool } func (us uniformScenario) exercise(t *testing.T) { uss := uniformScenarioState{ t: t, uniformScenario: us, - startTime: time.Now(), + startTime: us.clk.Now(), integrators: make([]fq.Integrator, len(us.clients)), executions: make([]int32, len(us.clients)), rejects: make([]int32, len(us.clients)), @@ -227,7 +266,7 @@ func (ust *uniformScenarioThread) callK(k int) { if k >= ust.nCalls { return } - req, idle := ust.uss.qs.StartRequest(context.Background(), &fcrequest.WorkEstimate{Seats: 1}, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil) + req, idle := ust.uss.qs.StartRequest(context.Background(), &fcrequest.WorkEstimate{Seats: ust.uc.width, AdditionalLatency: ust.uc.padDuration}, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil) ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle) if req == nil { atomic.AddUint64(&ust.uss.failedCount, 1) @@ -238,20 +277,25 @@ func (ust *uniformScenarioThread) callK(k int) { ust.uss.t.Error("got request but QueueSet reported idle") } var executed bool + var returnTime time.Time idle2 := req.Finish(func() { executed = true execStart := ust.uss.clk.Now() - ust.uss.t.Logf("%s: %d, %d, %d executing", execStart.Format(nsTimeFmt), ust.i, ust.j, k) atomic.AddInt32(&ust.uss.executions[ust.i], 1) - ust.igr.Add(1) + ust.igr.Add(float64(ust.uc.width)) + ust.uss.t.Logf("%s: %d, %d, %d executing; seats=%d", execStart.Format(nsTimeFmt), ust.i, ust.j, k, ust.uc.width) ust.uss.clk.EventAfterDuration(ust.genCallK(k+1), ust.uc.execDuration+ust.uc.thinkDuration) ust.uss.clk.Sleep(ust.uc.execDuration) - ust.igr.Add(-1) + ust.igr.Add(-float64(ust.uc.width)) + returnTime = ust.uss.clk.Now() }) - ust.uss.t.Logf("%s: %d, %d, %d got executed=%v, idle2=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, executed, idle2) + now := ust.uss.clk.Now() + ust.uss.t.Logf("%s: %d, %d, %d got executed=%v, idle2=%v", now.Format(nsTimeFmt), ust.i, ust.j, k, executed, idle2) if !executed { atomic.AddUint64(&ust.uss.failedCount, 1) atomic.AddInt32(&ust.uss.rejects[ust.i], 1) + } else if now != returnTime { + ust.uss.t.Errorf("%s: %d, %d, %d returnTime=%s", now.Format(nsTimeFmt), ust.i, ust.j, k, returnTime.Format(nsTimeFmt)) } } @@ -270,23 +314,28 @@ func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool, ma if uc.split && !last { nThreads = nThreads / 2 } - demands[i] = float64(nThreads) * float64(uc.execDuration) / float64(uc.thinkDuration+uc.execDuration) + sep := uc.thinkDuration + if uss.padConstrains && uc.padDuration > sep { + sep = uc.padDuration + } + demands[i] = float64(nThreads) * float64(uc.width) * float64(uc.execDuration) / float64(sep+uc.execDuration) averages[i] = uss.integrators[i].Reset().Average } fairAverages := fairAlloc(demands, float64(uss.concurrencyLimit)) for i := range uss.clients { + expectedAverage := fairAverages[i] var gotFair bool - if fairAverages[i] > 0 { - relDiff := (averages[i] - fairAverages[i]) / fairAverages[i] + if expectedAverage > 0 { + relDiff := (averages[i] - expectedAverage) / expectedAverage gotFair = math.Abs(relDiff) <= margin } else { gotFair = math.Abs(averages[i]) <= margin } if gotFair != expectFair { - uss.t.Errorf("%s client %d last=%v got an Average of %v but the fair average was %v", uss.name, i, last, averages[i], fairAverages[i]) + uss.t.Errorf("%s client %d last=%v got an Average of %v but the expected average was %v", uss.name, i, last, averages[i], expectedAverage) } else { - uss.t.Logf("%s client %d last=%v got an Average of %v and the fair average was %v", uss.name, i, last, averages[i], fairAverages[i]) + uss.t.Logf("%s client %d last=%v got an Average of %v and the expected average was %v", uss.name, i, last, averages[i], expectedAverage) } } } @@ -376,8 +425,8 @@ func TestNoRestraint(t *testing.T) { uniformScenario{name: "NoRestraint", qs: nr, clients: []uniformClient{ - {1001001001, 5, 10, time.Second, time.Second, false}, - {2002002002, 2, 10, time.Second, time.Second / 2, false}, + newUniformClient(1001001001, 5, 10, time.Second, time.Second), + newUniformClient(2002002002, 2, 10, time.Second, time.Second/2), }, concurrencyLimit: 10, evalDuration: time.Second * 15, @@ -389,6 +438,90 @@ func TestNoRestraint(t *testing.T) { }.exercise(t) } +func TestBaseline(t *testing.T) { + metrics.Register() + now := time.Now() + + clk, counter := testeventclock.NewFake(now, 0, nil) + qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter)) + qCfg := fq.QueuingConfig{ + Name: "TestBaseline", + DesiredNumQueues: 9, + QueueLengthLimit: 8, + HandSize: 3, + RequestWaitLimit: 10 * time.Minute, + } + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + if err != nil { + t.Fatal(err) + } + qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1}) + + uniformScenario{name: qCfg.Name, + qs: qs, + clients: []uniformClient{ + newUniformClient(1001001001, 1, 21, time.Second, 0), + }, + concurrencyLimit: 1, + evalDuration: time.Second * 20, + expectedFair: []bool{true}, + expectedFairnessMargin: []float64{0}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, + }.exercise(t) +} + +func TestSeparations(t *testing.T) { + for _, seps := range []struct{ think, pad time.Duration }{ + {think: time.Second, pad: 0}, + {think: 0, pad: time.Second}, + {think: time.Second, pad: time.Second / 2}, + {think: time.Second / 2, pad: time.Second}, + } { + for conc := 1; conc <= 2; conc++ { + caseName := fmt.Sprintf("seps%v,%v,%v", seps.think, seps.pad, conc) + t.Run(caseName, func(t *testing.T) { + metrics.Register() + now := time.Now() + + clk, counter := testeventclock.NewFake(now, 0, nil) + qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter)) + qCfg := fq.QueuingConfig{ + Name: caseName, + DesiredNumQueues: 9, + QueueLengthLimit: 8, + HandSize: 3, + RequestWaitLimit: 10 * time.Minute, + } + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + if err != nil { + t.Fatal(err) + } + qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: conc}) + uniformScenario{name: qCfg.Name, + qs: qs, + clients: []uniformClient{ + newUniformClient(1001001001, 1, 19, time.Second, seps.think).pad(seps.pad), + }, + concurrencyLimit: conc, + evalDuration: time.Second * 18, // multiple of every period involved, so that margin can be 0 below + expectedFair: []bool{true}, + expectedFairnessMargin: []float64{0}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, + padConstrains: conc == 1, + }.exercise(t) + }) + } + } +} + func TestUniformFlowsHandSize1(t *testing.T) { metrics.Register() now := time.Now() @@ -411,8 +544,8 @@ func TestUniformFlowsHandSize1(t *testing.T) { uniformScenario{name: qCfg.Name, qs: qs, clients: []uniformClient{ - {1001001001, 8, 20, time.Second, time.Second - 1, false}, - {2002002002, 8, 20, time.Second, time.Second - 1, false}, + newUniformClient(1001001001, 8, 20, time.Second, time.Second-1), + newUniformClient(2002002002, 8, 20, time.Second, time.Second-1), }, concurrencyLimit: 4, evalDuration: time.Second * 50, @@ -447,8 +580,8 @@ func TestUniformFlowsHandSize3(t *testing.T) { uniformScenario{name: qCfg.Name, qs: qs, clients: []uniformClient{ - {1001001001, 8, 30, time.Second, time.Second - 1, false}, - {2002002002, 8, 30, time.Second, time.Second - 1, false}, + newUniformClient(1001001001, 8, 30, time.Second, time.Second-1), + newUniformClient(2002002002, 8, 30, time.Second, time.Second-1), }, concurrencyLimit: 4, evalDuration: time.Second * 60, @@ -484,8 +617,8 @@ func TestDifferentFlowsExpectEqual(t *testing.T) { uniformScenario{name: qCfg.Name, qs: qs, clients: []uniformClient{ - {1001001001, 8, 20, time.Second, time.Second, false}, - {2002002002, 7, 30, time.Second, time.Second / 2, false}, + newUniformClient(1001001001, 8, 20, time.Second, time.Second), + newUniformClient(2002002002, 7, 30, time.Second, time.Second/2), }, concurrencyLimit: 4, evalDuration: time.Second * 40, @@ -521,8 +654,8 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) { uniformScenario{name: qCfg.Name, qs: qs, clients: []uniformClient{ - {1001001001, 4, 20, time.Second, time.Second - 1, false}, - {2002002002, 2, 20, time.Second, time.Second - 1, false}, + newUniformClient(1001001001, 4, 20, time.Second, time.Second-1), + newUniformClient(2002002002, 2, 20, time.Second, time.Second-1), }, concurrencyLimit: 3, evalDuration: time.Second * 20, @@ -536,6 +669,81 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) { }.exercise(t) } +func TestDifferentWidths(t *testing.T) { + metrics.Register() + now := time.Now() + + clk, counter := testeventclock.NewFake(now, 0, nil) + qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter)) + qCfg := fq.QueuingConfig{ + Name: "TestDifferentWidths", + DesiredNumQueues: 64, + QueueLengthLimit: 4, + HandSize: 7, + RequestWaitLimit: 10 * time.Minute, + } + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + if err != nil { + t.Fatal(err) + } + qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 6}) + uniformScenario{name: qCfg.Name, + qs: qs, + clients: []uniformClient{ + newUniformClient(10010010010010, 13, 10, time.Second, time.Second-1), + newUniformClient(20020020020020, 7, 10, time.Second, time.Second-1).seats(2), + }, + concurrencyLimit: 6, + evalDuration: time.Second * 20, + expectedFair: []bool{true}, + expectedFairnessMargin: []float64{0.1}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, + }.exercise(t) +} + +func TestTooWide(t *testing.T) { + metrics.Register() + now := time.Now() + + clk, counter := testeventclock.NewFake(now, 0, nil) + qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter)) + qCfg := fq.QueuingConfig{ + Name: "TestTooWide", + DesiredNumQueues: 64, + QueueLengthLimit: 7, + HandSize: 7, + RequestWaitLimit: 10 * time.Minute, + } + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + if err != nil { + t.Fatal(err) + } + qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 6}) + uniformScenario{name: qCfg.Name, + qs: qs, + clients: []uniformClient{ + newUniformClient(40040040040040, 15, 21, time.Second, time.Second-1).seats(2), + newUniformClient(50050050050050, 15, 21, time.Second, time.Second-1).seats(2), + newUniformClient(60060060060060, 15, 21, time.Second, time.Second-1).seats(2), + newUniformClient(70070070070070, 15, 21, time.Second, time.Second-1).seats(2), + newUniformClient(90090090090090, 15, 21, time.Second, time.Second-1).seats(7), + }, + concurrencyLimit: 6, + evalDuration: time.Second * 40, + expectedFair: []bool{true}, + expectedFairnessMargin: []float64{0.35}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, + }.exercise(t) +} + func TestWindup(t *testing.T) { metrics.Register() now := time.Now() @@ -557,8 +765,8 @@ func TestWindup(t *testing.T) { uniformScenario{name: qCfg.Name, qs: qs, clients: []uniformClient{ - {1001001001, 2, 40, time.Second, -1, false}, - {2002002002, 2, 40, time.Second, -1, true}, + newUniformClient(1001001001, 2, 40, time.Second, -1), + newUniformClient(2002002002, 2, 40, time.Second, -1).setSplit(), }, concurrencyLimit: 3, evalDuration: time.Second * 40, @@ -591,8 +799,8 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) { uniformScenario{name: qCfg.Name, qs: qs, clients: []uniformClient{ - {1001001001, 6, 10, time.Second, 57 * time.Millisecond, false}, - {2002002002, 4, 15, time.Second, 750 * time.Millisecond, false}, + newUniformClient(1001001001, 6, 10, time.Second, 57*time.Millisecond), + newUniformClient(2002002002, 4, 15, time.Second, 750*time.Millisecond), }, concurrencyLimit: 4, evalDuration: time.Second * 13, @@ -627,7 +835,7 @@ func TestTimeout(t *testing.T) { uniformScenario{name: qCfg.Name, qs: qs, clients: []uniformClient{ - {1001001001, 5, 100, time.Second, time.Second, false}, + newUniformClient(1001001001, 5, 100, time.Second, time.Second), }, concurrencyLimit: 1, evalDuration: time.Second * 10, diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index 5bfe85c76b1..1363f24d781 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -79,9 +79,8 @@ type queue struct { // The requests are stored in a FIFO list. requests fifo - // virtualStart is the virtual time (virtual seconds since process - // startup) when the oldest request in the queue (if there is any) - // started virtually executing + // virtualStart is the "virtual time" (R progress meter reading) at + // which the next request will be dispatched in the virtual world. virtualStart float64 requestsExecuting int