Merge pull request #105160 from MikeSpreitzer/improve-sharding-and-dispatch

Improve sharding and dispatch
This commit is contained in:
Kubernetes Prow Robot 2021-09-22 12:58:32 -07:00 committed by GitHub
commit 752c4b7f0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 98 additions and 65 deletions

View File

@ -125,6 +125,9 @@ type queueSet struct {
// totSeatsInUse is the number of total "seats" in use by all the
// request(s) that are currently executing in this queueset.
totSeatsInUse int
// enqueues is the number of requests that have ever been enqueued
enqueues int
}
// NewQueueSetFactory creates a new QueueSetFactory object
@ -213,8 +216,8 @@ func (qs *queueSet) setConfiguration(qCfg fq.QueuingConfig, dealer *shuffleshard
if qCfg.DesiredNumQueues > 0 {
// Adding queues is the only thing that requires immediate action
// Removing queues is handled by omitting indexes >DesiredNum from
// chooseQueueIndexLocked
// Removing queues is handled by attrition, removing a queue when
// it goes empty and there are too many.
numQueues := len(qs.queues)
if qCfg.DesiredNumQueues > numQueues {
qs.queues = append(qs.queues,
@ -439,7 +442,7 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 {
// the queuelengthlimit has been reached
func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, workEstimate *fqrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request {
// Start with the shuffle sharding, to pick a queue.
queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2)
queueIdx := qs.shuffleShardLocked(hashValue, descr1, descr2)
queue := qs.queues[queueIdx]
// The next step is the logic to reject requests that have been waiting too long
qs.removeTimedOutRequestsFromQueueLocked(queue, fsName)
@ -447,6 +450,8 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
// requests that are in the queue longer than the timeout if there are no new requests
// We prefer the simplicity over the promptness, at least for now.
defer qs.boundNextDispatch(queue)
// Create a request and enqueue
req := &request{
qs: qs,
@ -455,6 +460,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
ctx: ctx,
decision: qs.promiseFactory(nil, ctx.Done(), decisionCancel),
arrivalTime: qs.clock.Now(),
arrivalR: qs.virtualTime,
queue: queue,
descr1: descr1,
descr2: descr2,
@ -468,26 +474,37 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
return req
}
// chooseQueueIndexLocked uses shuffle sharding to select a queue index
// shuffleShardLocked uses shuffle sharding to select a queue index
// using the given hashValue and the shuffle sharding parameters of the queueSet.
func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 interface{}) int {
func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interface{}) int {
var backHand [8]int
// Deal into a data structure, so that the order of visit below is not necessarily the order of the deal.
// This removes bias in the case of flows with overlapping hands.
hand := qs.dealer.DealIntoHand(hashValue, backHand[:])
handSize := len(hand)
offset := qs.enqueues % handSize
qs.enqueues++
bestQueueIdx := -1
bestQueueSeatsSum := int(math.MaxInt32)
// the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`.
qs.dealer.Deal(hashValue, func(queueIdx int) {
for i := 0; i < handSize; i++ {
queueIdx := hand[(offset+i)%handSize]
queue := qs.queues[queueIdx]
waitingSeats := queue.requests.SeatsSum()
// TODO: Consider taking into account `additional latency` of requests
// in addition to their seats.
// Ideally, this should be based on projected completion time in the
// virtual world of the youngest request in the queue.
queue := qs.queues[queueIdx]
waitingSeats := queue.requests.SeatsSum()
thisSeatsSum := waitingSeats // + queue.seatsInUse
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of %d seats waiting and %d executing", qs.qCfg.Name, descr1, descr2, queueIdx, waitingSeats, queue.seatsInUse)
thisSeatsSum := waitingSeats + queue.seatsInUse
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of %d seats waiting and %d executing, virtualStart=%vss", qs.qCfg.Name, descr1, descr2, queueIdx, waitingSeats, queue.seatsInUse, queue.virtualStart)
if thisSeatsSum < bestQueueSeatsSum {
bestQueueIdx, bestQueueSeatsSum = queueIdx, thisSeatsSum
}
})
klog.V(6).Infof("QS(%s) at r=%s v=%.9fss: For request %#+v %#+v chose queue %d, had seatSum %d & %d requests executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueSeatsSum, qs.queues[bestQueueIdx].requestsExecuting)
}
if klog.V(6).Enabled() {
chosenQueue := qs.queues[bestQueueIdx]
klog.V(6).Infof("QS(%s) at r=%s v=%.9fss: For request %#+v %#+v chose queue %d, had seatSum %d & %d requests executing & virtualStart=%vss", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, chosenQueue.requests.SeatsSum(), chosenQueue.requestsExecuting, chosenQueue.virtualStart)
}
return bestQueueIdx
}
@ -592,6 +609,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
startTime: now,
decision: qs.promiseFactory(decisionExecute, ctx.Done(), decisionCancel),
arrivalTime: now,
arrivalR: qs.virtualTime,
descr1: descr1,
descr2: descr2,
workEstimate: *workEstimate,
@ -612,7 +630,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
// return value indicates whether a request was dispatched; this will
// be false when there are no requests waiting in any queue.
func (qs *queueSet) dispatchLocked() bool {
queue := qs.selectQueueLocked()
queue := qs.findDispatchQueueLocked()
if queue == nil {
return false
}
@ -644,6 +662,7 @@ func (qs *queueSet) dispatchLocked() bool {
}
// When a request is dequeued for service -> qs.virtualStart += G * width
queue.virtualStart += qs.estimatedServiceSeconds * float64(request.Seats())
qs.boundNextDispatch(queue)
request.decision.Set(decisionExecute)
return ok
}
@ -671,10 +690,10 @@ func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool {
return true
}
// selectQueueLocked examines the queues in round robin order and
// findDispatchQueueLocked examines the queues in round robin order and
// returns the first one of those for which the virtual finish time of
// the oldest waiting request is minimal.
func (qs *queueSet) selectQueueLocked() *queue {
func (qs *queueSet) findDispatchQueueLocked() *queue {
minVirtualFinish := math.Inf(1)
sMin := math.Inf(1)
dsMin := math.Inf(1)
@ -723,22 +742,9 @@ func (qs *queueSet) selectQueueLocked() *queue {
// for the next round. This way the non-selected queues
// win in the case that the virtual finish times are the same
qs.robinIndex = minIndex
// according to the original FQ formula:
//
// Si = MAX(R(t), Fi-1)
//
// the virtual start (excluding the estimated cost) of the chose
// queue should always be greater or equal to the global virtual
// time.
//
// hence we're refreshing the per-queue virtual time for the chosen
// queue here. if the last start R (excluded estimated cost)
// falls behind the global virtual time, we update the latest virtual
// start by: <latest global virtual time> + <previously estimated cost>
previouslyEstimatedServiceTime := float64(minQueue.seatsInUse) * qs.estimatedServiceSeconds
if qs.virtualTime > minQueue.virtualStart-previouslyEstimatedServiceTime {
// per-queue virtual time should not fall behind the global
minQueue.virtualStart = qs.virtualTime + previouslyEstimatedServiceTime
if minQueue.virtualStart < oldestReqFromMinQueue.arrivalR {
klog.ErrorS(errors.New("dispatch before arrival"), "Inconceivable!", "QS", qs.qCfg.Name, "queue", minQueue.index, "dispatchR", minQueue.virtualStart, "request", oldestReqFromMinQueue)
}
metrics.SetDispatchMetrics(qs.qCfg.Name, qs.virtualTime, minQueue.virtualStart, sMin, sMax, dsMin, dsMax)
return minQueue
@ -834,6 +840,28 @@ func (qs *queueSet) finishRequestLocked(r *request) {
// When a request finishes being served, and the actual service time was S,
// the queues start R is decremented by (G - S)*width.
r.queue.virtualStart -= (qs.estimatedServiceSeconds - S) * float64(r.Seats())
qs.boundNextDispatch(r.queue)
}
}
// boundNextDispatch applies the anti-windup hack.
// We need a hack because all non-empty queues are allocated the same
// number of seats. A queue that can not use all those seats and does
// not go empty accumulates a progresively earlier `virtualStart` compared
// to queues that are using more than they are allocated.
// The following hack addresses the first side of that inequity,
// by insisting that dispatch in the virtual world not precede arrival.
func (qs *queueSet) boundNextDispatch(queue *queue) {
oldestReqFromMinQueue, _ := queue.requests.Peek()
if oldestReqFromMinQueue == nil {
return
}
var virtualStartBound = oldestReqFromMinQueue.arrivalR
if queue.virtualStart < virtualStartBound {
if klog.V(4).Enabled() {
klog.InfoS("AntiWindup tweaked queue", "QS", qs.qCfg.Name, "queue", queue.index, "time", qs.clock.Now().Format(nsTimeFmt), "requestDescr1", oldestReqFromMinQueue.descr1, "requestDescr2", oldestReqFromMinQueue.descr2, "newVirtualStart", virtualStartBound, "deltaVirtualStart", (virtualStartBound - queue.virtualStart))
}
queue.virtualStart = virtualStartBound
}
}

View File

@ -333,9 +333,9 @@ func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool, ma
}
if gotFair != expectFair {
uss.t.Errorf("%s client %d last=%v got an Average of %v but the expected average was %v", uss.name, i, last, averages[i], expectedAverage)
uss.t.Errorf("%s client %d last=%v expectFair=%v margin=%v got an Average of %v but the expected average was %v", uss.name, i, last, expectFair, margin, averages[i], expectedAverage)
} else {
uss.t.Logf("%s client %d last=%v got an Average of %v and the expected average was %v", uss.name, i, last, averages[i], expectedAverage)
uss.t.Logf("%s client %d last=%v expectFair=%v margin=%v got an Average of %v and the expected average was %v", uss.name, i, last, expectFair, margin, averages[i], expectedAverage)
}
}
}
@ -414,18 +414,20 @@ func TestMain(m *testing.M) {
// TestNoRestraint tests whether the no-restraint factory gives every client what it asks for
// even though that is unfair.
// Expects fairness when there is no competition, unfairness when there is competition.
func TestNoRestraint(t *testing.T) {
metrics.Register()
testCases := []struct {
concurrency int
margin float64
fair bool
name string
}{
{concurrency: 10, fair: true},
{concurrency: 2, fair: false},
{concurrency: 10, margin: 0.001, fair: true, name: "no-competition"},
{concurrency: 2, margin: 0.25, fair: false, name: "with-competition"},
}
for _, testCase := range testCases {
subName := fmt.Sprintf("concurrency=%v", testCase.concurrency)
t.Run(subName, func(t *testing.T) {
t.Run(testCase.name, func(t *testing.T) {
now := time.Now()
clk, counter := testeventclock.NewFake(now, 0, nil)
nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newObserverPair(clk))
@ -433,16 +435,16 @@ func TestNoRestraint(t *testing.T) {
t.Fatal(err)
}
nr := nrc.Complete(fq.DispatchingConfig{})
uniformScenario{name: "NoRestraint/" + subName,
uniformScenario{name: "NoRestraint/" + testCase.name,
qs: nr,
clients: []uniformClient{
newUniformClient(1001001001, 5, 10, time.Second, time.Second),
newUniformClient(2002002002, 2, 10, time.Second, time.Second/2),
newUniformClient(1001001001, 5, 15, time.Second, time.Second),
newUniformClient(2002002002, 2, 15, time.Second, time.Second/2),
},
concurrencyLimit: testCase.concurrency,
evalDuration: time.Second * 15,
evalDuration: time.Second * 18,
expectedFair: []bool{testCase.fair},
expectedFairnessMargin: []float64{0.1},
expectedFairnessMargin: []float64{testCase.margin},
expectAllRequests: true,
clk: clk,
counter: counter,
@ -563,7 +565,7 @@ func TestUniformFlowsHandSize1(t *testing.T) {
concurrencyLimit: 4,
evalDuration: time.Second * 50,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.1},
expectedFairnessMargin: []float64{0.01},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
@ -581,7 +583,7 @@ func TestUniformFlowsHandSize3(t *testing.T) {
qCfg := fq.QueuingConfig{
Name: "TestUniformFlowsHandSize3",
DesiredNumQueues: 8,
QueueLengthLimit: 4,
QueueLengthLimit: 16,
HandSize: 3,
RequestWaitLimit: 10 * time.Minute,
}
@ -593,13 +595,13 @@ func TestUniformFlowsHandSize3(t *testing.T) {
uniformScenario{name: qCfg.Name,
qs: qs,
clients: []uniformClient{
newUniformClient(1001001001, 8, 30, time.Second, time.Second-1),
newUniformClient(2002002002, 8, 30, time.Second, time.Second-1),
newUniformClient(400900100100, 8, 30, time.Second, time.Second-1),
newUniformClient(300900200200, 8, 30, time.Second, time.Second-1),
},
concurrencyLimit: 4,
evalDuration: time.Second * 60,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.1},
expectedFairnessMargin: []float64{0.01},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
@ -636,7 +638,7 @@ func TestDifferentFlowsExpectEqual(t *testing.T) {
concurrencyLimit: 4,
evalDuration: time.Second * 40,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.1},
expectedFairnessMargin: []float64{0.01},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
@ -673,7 +675,7 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) {
concurrencyLimit: 3,
evalDuration: time.Second * 20,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.1},
expectedFairnessMargin: []float64{0.01},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
@ -691,7 +693,7 @@ func TestDifferentWidths(t *testing.T) {
qCfg := fq.QueuingConfig{
Name: "TestDifferentWidths",
DesiredNumQueues: 64,
QueueLengthLimit: 4,
QueueLengthLimit: 13,
HandSize: 7,
RequestWaitLimit: 10 * time.Minute,
}
@ -709,7 +711,7 @@ func TestDifferentWidths(t *testing.T) {
concurrencyLimit: 6,
evalDuration: time.Second * 20,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.16},
expectedFairnessMargin: []float64{0.125},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
@ -727,7 +729,7 @@ func TestTooWide(t *testing.T) {
qCfg := fq.QueuingConfig{
Name: "TestTooWide",
DesiredNumQueues: 64,
QueueLengthLimit: 7,
QueueLengthLimit: 35,
HandSize: 7,
RequestWaitLimit: 10 * time.Minute,
}
@ -746,9 +748,9 @@ func TestTooWide(t *testing.T) {
newUniformClient(90090090090090, 15, 21, time.Second, time.Second-1).seats(7),
},
concurrencyLimit: 6,
evalDuration: time.Second * 435,
evalDuration: time.Second * 225,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.375},
expectedFairnessMargin: []float64{0.33},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
@ -775,18 +777,18 @@ func TestWindup(t *testing.T) {
testCases := []struct {
margin2 float64
expectFair2 bool
name string
}{
{margin2: 0.26, expectFair2: true},
{margin2: 0.1, expectFair2: false},
{margin2: 0.26, expectFair2: true, name: "upper-bound"},
{margin2: 0.1, expectFair2: false, name: "lower-bound"},
}
for _, testCase := range testCases {
subName := fmt.Sprintf("m2=%v", testCase.margin2)
t.Run(subName, func(t *testing.T) {
t.Run(testCase.name, func(t *testing.T) {
now := time.Now()
clk, counter := testeventclock.NewFake(now, 0, nil)
qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter))
qCfg := fq.QueuingConfig{
Name: "TestWindup/" + subName,
Name: "TestWindup/" + testCase.name,
DesiredNumQueues: 9,
QueueLengthLimit: 6,
HandSize: 1,
@ -806,7 +808,7 @@ func TestWindup(t *testing.T) {
concurrencyLimit: 3,
evalDuration: time.Second * 40,
expectedFair: []bool{true, testCase.expectFair2},
expectedFairnessMargin: []float64{0.1, testCase.margin2},
expectedFairnessMargin: []float64{0.01, testCase.margin2},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
@ -842,7 +844,7 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) {
concurrencyLimit: 4,
evalDuration: time.Second * 13,
expectedFair: []bool{false},
expectedFairnessMargin: []float64{0.1},
expectedFairnessMargin: []float64{0.20},
evalExecutingMetrics: true,
rejectReason: "concurrency-limit",
clk: clk,
@ -877,7 +879,7 @@ func TestTimeout(t *testing.T) {
concurrencyLimit: 1,
evalDuration: time.Second * 10,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.1},
expectedFairnessMargin: []float64{0.01},
evalInqueueMetrics: true,
evalExecutingMetrics: true,
rejectReason: "time-out",
@ -1070,7 +1072,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) {
}
}
func TestSelectQueueLocked(t *testing.T) {
func TestFindDispatchQueueLocked(t *testing.T) {
var G float64 = 0.003
tests := []struct {
name string
@ -1225,7 +1227,7 @@ func TestSelectQueueLocked(t *testing.T) {
minQueueExpected = test.queues[queueIdx]
}
minQueueGot := qs.selectQueueLocked()
minQueueGot := qs.findDispatchQueueLocked()
if minQueueExpected != minQueueGot {
t.Errorf("Expected queue: %#v, but got: %#v", minQueueExpected, minQueueGot)
}

View File

@ -59,6 +59,9 @@ type request struct {
// arrivalTime is the real time when the request entered this system
arrivalTime time.Time
// arrivalR is R(arrivalTime). R is, confusingly, also called "virtual time".
arrivalR float64
// descr1 and descr2 are not used in any logic but they appear in
// log messages
descr1, descr2 interface{}