diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go index 0c66939b8b1..53e10543198 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go @@ -140,11 +140,11 @@ func (l *requestFIFO) Walk(f walkFunc) { func addToQueueSum(sum *queueSum, req *request) { sum.InitialSeatsSum += req.InitialSeats() sum.MaxSeatsSum += req.MaxSeats() - sum.AdditionalSeatSecondsSum += req.AdditionalSeatSeconds() + sum.TotalWorkSum += req.totalWork() } func deductFromQueueSum(sum *queueSum, req *request) { sum.InitialSeatsSum -= req.InitialSeats() sum.MaxSeatsSum -= req.MaxSeats() - sum.AdditionalSeatSecondsSum -= req.AdditionalSeatSeconds() + sum.TotalWorkSum -= req.totalWork() } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go index 5f6da64584b..c3490910bcd 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go @@ -153,12 +153,13 @@ func TestFIFOWithRemoveIsIdempotent(t *testing.T) { } func TestFIFOQueueWorkEstimate(t *testing.T) { + qs := &queueSet{estimatedServiceDuration: time.Second} list := newRequestFIFO() update := func(we *queueSum, req *request, multiplier int) { we.InitialSeatsSum += multiplier * req.InitialSeats() we.MaxSeatsSum += multiplier * req.MaxSeats() - we.AdditionalSeatSecondsSum += SeatSeconds(multiplier) * req.AdditionalSeatSeconds() + we.TotalWorkSum += SeatSeconds(multiplier) * req.totalWork() } assert := func(t *testing.T, want, got *queueSum) { @@ -168,11 +169,11 @@ func TestFIFOQueueWorkEstimate(t *testing.T) { } newRequest := func(initialSeats, finalSeats uint, additionalLatency time.Duration) *request { - return &request{workEstimate: fcrequest.WorkEstimate{ + return &request{workEstimate: qs.completeWorkEstimate(&fcrequest.WorkEstimate{ InitialSeats: initialSeats, FinalSeats: finalSeats, AdditionalLatency: additionalLatency, - }} + })} } arrival := []*request{ newRequest(1, 3, time.Second), diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index f66b7619abc..08a4ab385fd 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -325,18 +325,6 @@ func (req *request) InitialSeats() int { return int(req.workEstimate.InitialSeats) } -// AdditionalSeatSeconds returns the amount of work in SeatSeconds produced by -// the final seats and the additional latency associated with a request. -func (req *request) AdditionalSeatSeconds() SeatSeconds { - return SeatsTimesDuration(float64(req.workEstimate.FinalSeats), req.workEstimate.AdditionalLatency) -} - -// InitialSeatSeconds returns the amount of work in SeatSeconds projected -// by the initial seats for a given estimated service duration. -func (req *request) InitialSeatSeconds(estimatedServiceDuration time.Duration) SeatSeconds { - return SeatsTimesDuration(float64(req.workEstimate.InitialSeats), estimatedServiceDuration) -} - func (req *request) NoteQueued(inQueue bool) { if req.queueNoteFn != nil { req.queueNoteFn(inQueue) @@ -488,7 +476,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte descr1: descr1, descr2: descr2, queueNoteFn: queueNoteFn, - workEstimate: *workEstimate, + workEstimate: qs.completeWorkEstimate(workEstimate), } if ok := qs.rejectOrEnqueueLocked(req); !ok { return nil @@ -516,7 +504,7 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac // this is the total amount of work in seat-seconds for requests // waiting in this queue, we will select the queue with the minimum. - thisQueueSeatSeconds := SeatsTimesDuration(float64(queueSum.InitialSeatsSum), qs.estimatedServiceDuration) + queueSum.AdditionalSeatSecondsSum + thisQueueSeatSeconds := queueSum.TotalWorkSum klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d with sum: %#v and %d seats in use, nextDispatchR=%v", qs.qCfg.Name, descr1, descr2, queueIdx, queueSum, queue.seatsInUse, queue.nextDispatchR) if thisQueueSeatSeconds < minQueueSeatSeconds { minQueueSeatSeconds = thisQueueSeatSeconds @@ -634,7 +622,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f arrivalR: qs.currentR, descr1: descr1, descr2: descr2, - workEstimate: *workEstimate, + workEstimate: qs.completeWorkEstimate(workEstimate), } qs.totRequestsExecuting++ qs.totSeatsInUse += req.MaxSeats() @@ -683,7 +671,7 @@ func (qs *queueSet) dispatchLocked() bool { request.workEstimate, queue.index, queue.nextDispatchR, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse) } // When a request is dequeued for service -> qs.virtualStart += G * width - queue.nextDispatchR += request.InitialSeatSeconds(qs.estimatedServiceDuration) + request.AdditionalSeatSeconds() + queue.nextDispatchR += request.totalWork() qs.boundNextDispatch(queue) request.decision.Set(decisionExecute) return ok @@ -734,7 +722,7 @@ func (qs *queueSet) findDispatchQueueLocked() *queue { estimatedWorkInProgress := SeatsTimesDuration(float64(queue.seatsInUse), qs.estimatedServiceDuration) dsMin = ssMin(dsMin, queue.nextDispatchR-estimatedWorkInProgress) dsMax = ssMax(dsMax, queue.nextDispatchR-estimatedWorkInProgress) - currentVirtualFinish := queue.nextDispatchR + oldestWaiting.InitialSeatSeconds(qs.estimatedServiceDuration) + oldestWaiting.AdditionalSeatSeconds() + currentVirtualFinish := queue.nextDispatchR + oldestWaiting.totalWork() klog.V(11).InfoS("Considering queue to dispatch", "queueSet", qs.qCfg.Name, "queue", qs.robinIndex, "finishR", currentVirtualFinish) if currentVirtualFinish < minVirtualFinish { minVirtualFinish = currentVirtualFinish diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 1c4cb04529b..1614e1b466a 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -1114,6 +1114,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) { func TestFindDispatchQueueLocked(t *testing.T) { const G = 3 * time.Millisecond + qs0 := &queueSet{estimatedServiceDuration: G} tests := []struct { name string robinIndex int @@ -1134,13 +1135,13 @@ func TestFindDispatchQueueLocked(t *testing.T) { { nextDispatchR: SeatsTimesDuration(1, 200*time.Second), requests: newFIFO( - &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 1}}, + &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})}, ), }, { nextDispatchR: SeatsTimesDuration(1, 100*time.Second), requests: newFIFO( - &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 1}}, + &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})}, ), }, }, @@ -1157,7 +1158,7 @@ func TestFindDispatchQueueLocked(t *testing.T) { { nextDispatchR: SeatsTimesDuration(1, 200*time.Second), requests: newFIFO( - &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 1}}, + &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})}, ), }, }, @@ -1174,13 +1175,13 @@ func TestFindDispatchQueueLocked(t *testing.T) { { nextDispatchR: SeatsTimesDuration(1, 200*time.Second), requests: newFIFO( - &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 50}}, + &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 50})}, ), }, { nextDispatchR: SeatsTimesDuration(1, 100*time.Second), requests: newFIFO( - &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 25}}, + &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})}, ), }, }, @@ -1197,13 +1198,13 @@ func TestFindDispatchQueueLocked(t *testing.T) { { nextDispatchR: SeatsTimesDuration(1, 200*time.Second), requests: newFIFO( - &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 10}}, + &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 10})}, ), }, { nextDispatchR: SeatsTimesDuration(1, 100*time.Second), requests: newFIFO( - &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 25}}, + &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})}, ), }, }, @@ -1220,13 +1221,13 @@ func TestFindDispatchQueueLocked(t *testing.T) { { nextDispatchR: SeatsTimesDuration(1, 200*time.Second), requests: newFIFO( - &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 10}}, + &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 10})}, ), }, { nextDispatchR: SeatsTimesDuration(1, 100*time.Second), requests: newFIFO( - &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 25}}, + &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})}, ), }, }, @@ -1309,8 +1310,9 @@ func TestFinishRequestLocked(t *testing.T) { now := time.Now() clk, _ := testeventclock.NewFake(now, 0, nil) qs := &queueSet{ - clock: clk, - obsPair: newObserverPair(clk), + clock: clk, + estimatedServiceDuration: time.Second, + obsPair: newObserverPair(clk), } queue := &queue{ requests: newRequestFIFO(), @@ -1318,7 +1320,7 @@ func TestFinishRequestLocked(t *testing.T) { r := &request{ qs: qs, queue: queue, - workEstimate: test.workEstimate, + workEstimate: qs.completeWorkEstimate(&test.workEstimate), } qs.totRequestsExecuting = 111 @@ -1355,6 +1357,7 @@ func TestFinishRequestLocked(t *testing.T) { } func TestRequestSeats(t *testing.T) { + qs := &queueSet{estimatedServiceDuration: time.Second} tests := []struct { name string request *request @@ -1362,17 +1365,17 @@ func TestRequestSeats(t *testing.T) { }{ { name: "", - request: &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 3, FinalSeats: 3}}, + request: &request{workEstimate: qs.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 3, FinalSeats: 3})}, expected: 3, }, { name: "", - request: &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 1, FinalSeats: 3}}, + request: &request{workEstimate: qs.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1, FinalSeats: 3})}, expected: 3, }, { name: "", - request: &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 3, FinalSeats: 1}}, + request: &request{workEstimate: qs.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 3, FinalSeats: 1})}, expected: 3, }, } @@ -1387,19 +1390,20 @@ func TestRequestSeats(t *testing.T) { } } -func TestRequestAdditionalSeatSeconds(t *testing.T) { +func TestRequestWork(t *testing.T) { + qs := &queueSet{estimatedServiceDuration: 2 * time.Second} request := &request{ - workEstimate: fcrequest.WorkEstimate{ + workEstimate: qs.completeWorkEstimate(&fcrequest.WorkEstimate{ InitialSeats: 3, - FinalSeats: 5, - AdditionalLatency: 3 * time.Second, - }, + FinalSeats: 50, + AdditionalLatency: 70 * time.Second, + }), } - got := request.AdditionalSeatSeconds() - want := SeatsTimesDuration(5, 3*time.Second) + got := request.totalWork() + want := SeatsTimesDuration(3, 2*time.Second) + SeatsTimesDuration(50, 70*time.Second) if want != got { - t.Errorf("Expected AdditionalSeatSeconds: %v, but got: %v", want, got) + t.Errorf("Expected totalWork: %v, but got: %v", want, got) } } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index cbd2af6889d..7e9a3f3bdf1 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -47,7 +47,7 @@ type request struct { startTime time.Time // estimated amount of work of the request - workEstimate fcrequest.WorkEstimate + workEstimate completedWorkEstimate // decision gets set to a `requestDecision` indicating what to do // with this request. It gets set exactly once, when the request @@ -78,6 +78,11 @@ type request struct { removeFromQueueFn removeFromFIFOFunc } +type completedWorkEstimate struct { + fcrequest.WorkEstimate + totalWork SeatSeconds // initial plus final work +} + // queue is an array of requests with additional metadata required for // the FQScheduler type queue struct { @@ -97,8 +102,8 @@ type queue struct { seatsInUse int } -// queueSum tracks the sum of initial seats, final seats, and -// additional latency aggregated from all requests in a given queue +// queueSum tracks the sum of initial seats, max seats, and +// totalWork from all requests in a given queue type queueSum struct { // InitialSeatsSum is the sum of InitialSeats // associated with all requests in a given queue. @@ -108,9 +113,23 @@ type queueSum struct { // associated with all requests in a given queue. MaxSeatsSum int - // AdditionalSeatSecondsSum is sum of AdditionalSeatsSeconds - // associated with all requests in a given queue. - AdditionalSeatSecondsSum SeatSeconds + // TotalWorkSum is the sum of totalWork of the waiting requests + TotalWorkSum SeatSeconds +} + +func (req *request) totalWork() SeatSeconds { + return req.workEstimate.totalWork +} + +func (qs *queueSet) completeWorkEstimate(we *fcrequest.WorkEstimate) completedWorkEstimate { + return completedWorkEstimate{ + WorkEstimate: *we, + totalWork: qs.computeTotalWork(we), + } +} + +func (qs *queueSet) computeTotalWork(we *fcrequest.WorkEstimate) SeatSeconds { + return SeatsTimesDuration(float64(we.InitialSeats), qs.estimatedServiceDuration) + SeatsTimesDuration(float64(we.FinalSeats), we.AdditionalLatency) } // Enqueue enqueues a request into the queue and diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go index 52eb3fc04e1..56899194ed8 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go @@ -33,6 +33,8 @@ const ( maximumSeats = 10 ) +// WorkEstimate carries three of the four parameters that determine the work in a request. +// The fourth parameter is the duration of the initial phase of execution. type WorkEstimate struct { // InitialSeats is the number of seats occupied while the server is // executing this request. @@ -49,8 +51,8 @@ type WorkEstimate struct { AdditionalLatency time.Duration } -// MaxSeats returns the number of seats this request requires, it is the maximum -// of the two, WorkEstimate.InitialSeats and WorkEstimate.FinalSeats. +// MaxSeats returns the maximum number of seats the request occupies over the +// phases of being served. func (we *WorkEstimate) MaxSeats() int { if we.InitialSeats >= we.FinalSeats { return int(we.InitialSeats)