Merge pull request #105559 from MikeSpreitzer/refactor-work

Calculate the work in each request just once
This commit is contained in:
Kubernetes Prow Robot 2021-10-08 01:10:58 -07:00 committed by GitHub
commit 0e260a027b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 67 additions and 53 deletions

View File

@ -140,11 +140,11 @@ func (l *requestFIFO) Walk(f walkFunc) {
func addToQueueSum(sum *queueSum, req *request) { func addToQueueSum(sum *queueSum, req *request) {
sum.InitialSeatsSum += req.InitialSeats() sum.InitialSeatsSum += req.InitialSeats()
sum.MaxSeatsSum += req.MaxSeats() sum.MaxSeatsSum += req.MaxSeats()
sum.AdditionalSeatSecondsSum += req.AdditionalSeatSeconds() sum.TotalWorkSum += req.totalWork()
} }
func deductFromQueueSum(sum *queueSum, req *request) { func deductFromQueueSum(sum *queueSum, req *request) {
sum.InitialSeatsSum -= req.InitialSeats() sum.InitialSeatsSum -= req.InitialSeats()
sum.MaxSeatsSum -= req.MaxSeats() sum.MaxSeatsSum -= req.MaxSeats()
sum.AdditionalSeatSecondsSum -= req.AdditionalSeatSeconds() sum.TotalWorkSum -= req.totalWork()
} }

View File

@ -153,12 +153,13 @@ func TestFIFOWithRemoveIsIdempotent(t *testing.T) {
} }
func TestFIFOQueueWorkEstimate(t *testing.T) { func TestFIFOQueueWorkEstimate(t *testing.T) {
qs := &queueSet{estimatedServiceDuration: time.Second}
list := newRequestFIFO() list := newRequestFIFO()
update := func(we *queueSum, req *request, multiplier int) { update := func(we *queueSum, req *request, multiplier int) {
we.InitialSeatsSum += multiplier * req.InitialSeats() we.InitialSeatsSum += multiplier * req.InitialSeats()
we.MaxSeatsSum += multiplier * req.MaxSeats() we.MaxSeatsSum += multiplier * req.MaxSeats()
we.AdditionalSeatSecondsSum += SeatSeconds(multiplier) * req.AdditionalSeatSeconds() we.TotalWorkSum += SeatSeconds(multiplier) * req.totalWork()
} }
assert := func(t *testing.T, want, got *queueSum) { assert := func(t *testing.T, want, got *queueSum) {
@ -168,11 +169,11 @@ func TestFIFOQueueWorkEstimate(t *testing.T) {
} }
newRequest := func(initialSeats, finalSeats uint, additionalLatency time.Duration) *request { newRequest := func(initialSeats, finalSeats uint, additionalLatency time.Duration) *request {
return &request{workEstimate: fcrequest.WorkEstimate{ return &request{workEstimate: qs.completeWorkEstimate(&fcrequest.WorkEstimate{
InitialSeats: initialSeats, InitialSeats: initialSeats,
FinalSeats: finalSeats, FinalSeats: finalSeats,
AdditionalLatency: additionalLatency, AdditionalLatency: additionalLatency,
}} })}
} }
arrival := []*request{ arrival := []*request{
newRequest(1, 3, time.Second), newRequest(1, 3, time.Second),

View File

@ -325,18 +325,6 @@ func (req *request) InitialSeats() int {
return int(req.workEstimate.InitialSeats) return int(req.workEstimate.InitialSeats)
} }
// AdditionalSeatSeconds returns the amount of work in SeatSeconds produced by
// the final seats and the additional latency associated with a request.
func (req *request) AdditionalSeatSeconds() SeatSeconds {
return SeatsTimesDuration(float64(req.workEstimate.FinalSeats), req.workEstimate.AdditionalLatency)
}
// InitialSeatSeconds returns the amount of work in SeatSeconds projected
// by the initial seats for a given estimated service duration.
func (req *request) InitialSeatSeconds(estimatedServiceDuration time.Duration) SeatSeconds {
return SeatsTimesDuration(float64(req.workEstimate.InitialSeats), estimatedServiceDuration)
}
func (req *request) NoteQueued(inQueue bool) { func (req *request) NoteQueued(inQueue bool) {
if req.queueNoteFn != nil { if req.queueNoteFn != nil {
req.queueNoteFn(inQueue) req.queueNoteFn(inQueue)
@ -488,7 +476,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
descr1: descr1, descr1: descr1,
descr2: descr2, descr2: descr2,
queueNoteFn: queueNoteFn, queueNoteFn: queueNoteFn,
workEstimate: *workEstimate, workEstimate: qs.completeWorkEstimate(workEstimate),
} }
if ok := qs.rejectOrEnqueueLocked(req); !ok { if ok := qs.rejectOrEnqueueLocked(req); !ok {
return nil return nil
@ -516,7 +504,7 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac
// this is the total amount of work in seat-seconds for requests // this is the total amount of work in seat-seconds for requests
// waiting in this queue, we will select the queue with the minimum. // waiting in this queue, we will select the queue with the minimum.
thisQueueSeatSeconds := SeatsTimesDuration(float64(queueSum.InitialSeatsSum), qs.estimatedServiceDuration) + queueSum.AdditionalSeatSecondsSum thisQueueSeatSeconds := queueSum.TotalWorkSum
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d with sum: %#v and %d seats in use, nextDispatchR=%v", qs.qCfg.Name, descr1, descr2, queueIdx, queueSum, queue.seatsInUse, queue.nextDispatchR) klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d with sum: %#v and %d seats in use, nextDispatchR=%v", qs.qCfg.Name, descr1, descr2, queueIdx, queueSum, queue.seatsInUse, queue.nextDispatchR)
if thisQueueSeatSeconds < minQueueSeatSeconds { if thisQueueSeatSeconds < minQueueSeatSeconds {
minQueueSeatSeconds = thisQueueSeatSeconds minQueueSeatSeconds = thisQueueSeatSeconds
@ -634,7 +622,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
arrivalR: qs.currentR, arrivalR: qs.currentR,
descr1: descr1, descr1: descr1,
descr2: descr2, descr2: descr2,
workEstimate: *workEstimate, workEstimate: qs.completeWorkEstimate(workEstimate),
} }
qs.totRequestsExecuting++ qs.totRequestsExecuting++
qs.totSeatsInUse += req.MaxSeats() qs.totSeatsInUse += req.MaxSeats()
@ -683,7 +671,7 @@ func (qs *queueSet) dispatchLocked() bool {
request.workEstimate, queue.index, queue.nextDispatchR, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse) request.workEstimate, queue.index, queue.nextDispatchR, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse)
} }
// When a request is dequeued for service -> qs.virtualStart += G * width // When a request is dequeued for service -> qs.virtualStart += G * width
queue.nextDispatchR += request.InitialSeatSeconds(qs.estimatedServiceDuration) + request.AdditionalSeatSeconds() queue.nextDispatchR += request.totalWork()
qs.boundNextDispatch(queue) qs.boundNextDispatch(queue)
request.decision.Set(decisionExecute) request.decision.Set(decisionExecute)
return ok return ok
@ -734,7 +722,7 @@ func (qs *queueSet) findDispatchQueueLocked() *queue {
estimatedWorkInProgress := SeatsTimesDuration(float64(queue.seatsInUse), qs.estimatedServiceDuration) estimatedWorkInProgress := SeatsTimesDuration(float64(queue.seatsInUse), qs.estimatedServiceDuration)
dsMin = ssMin(dsMin, queue.nextDispatchR-estimatedWorkInProgress) dsMin = ssMin(dsMin, queue.nextDispatchR-estimatedWorkInProgress)
dsMax = ssMax(dsMax, queue.nextDispatchR-estimatedWorkInProgress) dsMax = ssMax(dsMax, queue.nextDispatchR-estimatedWorkInProgress)
currentVirtualFinish := queue.nextDispatchR + oldestWaiting.InitialSeatSeconds(qs.estimatedServiceDuration) + oldestWaiting.AdditionalSeatSeconds() currentVirtualFinish := queue.nextDispatchR + oldestWaiting.totalWork()
klog.V(11).InfoS("Considering queue to dispatch", "queueSet", qs.qCfg.Name, "queue", qs.robinIndex, "finishR", currentVirtualFinish) klog.V(11).InfoS("Considering queue to dispatch", "queueSet", qs.qCfg.Name, "queue", qs.robinIndex, "finishR", currentVirtualFinish)
if currentVirtualFinish < minVirtualFinish { if currentVirtualFinish < minVirtualFinish {
minVirtualFinish = currentVirtualFinish minVirtualFinish = currentVirtualFinish

View File

@ -1114,6 +1114,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) {
func TestFindDispatchQueueLocked(t *testing.T) { func TestFindDispatchQueueLocked(t *testing.T) {
const G = 3 * time.Millisecond const G = 3 * time.Millisecond
qs0 := &queueSet{estimatedServiceDuration: G}
tests := []struct { tests := []struct {
name string name string
robinIndex int robinIndex int
@ -1134,13 +1135,13 @@ func TestFindDispatchQueueLocked(t *testing.T) {
{ {
nextDispatchR: SeatsTimesDuration(1, 200*time.Second), nextDispatchR: SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 1}}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})},
), ),
}, },
{ {
nextDispatchR: SeatsTimesDuration(1, 100*time.Second), nextDispatchR: SeatsTimesDuration(1, 100*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 1}}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})},
), ),
}, },
}, },
@ -1157,7 +1158,7 @@ func TestFindDispatchQueueLocked(t *testing.T) {
{ {
nextDispatchR: SeatsTimesDuration(1, 200*time.Second), nextDispatchR: SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 1}}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})},
), ),
}, },
}, },
@ -1174,13 +1175,13 @@ func TestFindDispatchQueueLocked(t *testing.T) {
{ {
nextDispatchR: SeatsTimesDuration(1, 200*time.Second), nextDispatchR: SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 50}}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 50})},
), ),
}, },
{ {
nextDispatchR: SeatsTimesDuration(1, 100*time.Second), nextDispatchR: SeatsTimesDuration(1, 100*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 25}}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})},
), ),
}, },
}, },
@ -1197,13 +1198,13 @@ func TestFindDispatchQueueLocked(t *testing.T) {
{ {
nextDispatchR: SeatsTimesDuration(1, 200*time.Second), nextDispatchR: SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 10}}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 10})},
), ),
}, },
{ {
nextDispatchR: SeatsTimesDuration(1, 100*time.Second), nextDispatchR: SeatsTimesDuration(1, 100*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 25}}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})},
), ),
}, },
}, },
@ -1220,13 +1221,13 @@ func TestFindDispatchQueueLocked(t *testing.T) {
{ {
nextDispatchR: SeatsTimesDuration(1, 200*time.Second), nextDispatchR: SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 10}}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 10})},
), ),
}, },
{ {
nextDispatchR: SeatsTimesDuration(1, 100*time.Second), nextDispatchR: SeatsTimesDuration(1, 100*time.Second),
requests: newFIFO( requests: newFIFO(
&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 25}}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})},
), ),
}, },
}, },
@ -1309,8 +1310,9 @@ func TestFinishRequestLocked(t *testing.T) {
now := time.Now() now := time.Now()
clk, _ := testeventclock.NewFake(now, 0, nil) clk, _ := testeventclock.NewFake(now, 0, nil)
qs := &queueSet{ qs := &queueSet{
clock: clk, clock: clk,
obsPair: newObserverPair(clk), estimatedServiceDuration: time.Second,
obsPair: newObserverPair(clk),
} }
queue := &queue{ queue := &queue{
requests: newRequestFIFO(), requests: newRequestFIFO(),
@ -1318,7 +1320,7 @@ func TestFinishRequestLocked(t *testing.T) {
r := &request{ r := &request{
qs: qs, qs: qs,
queue: queue, queue: queue,
workEstimate: test.workEstimate, workEstimate: qs.completeWorkEstimate(&test.workEstimate),
} }
qs.totRequestsExecuting = 111 qs.totRequestsExecuting = 111
@ -1355,6 +1357,7 @@ func TestFinishRequestLocked(t *testing.T) {
} }
func TestRequestSeats(t *testing.T) { func TestRequestSeats(t *testing.T) {
qs := &queueSet{estimatedServiceDuration: time.Second}
tests := []struct { tests := []struct {
name string name string
request *request request *request
@ -1362,17 +1365,17 @@ func TestRequestSeats(t *testing.T) {
}{ }{
{ {
name: "", name: "",
request: &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 3, FinalSeats: 3}}, request: &request{workEstimate: qs.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 3, FinalSeats: 3})},
expected: 3, expected: 3,
}, },
{ {
name: "", name: "",
request: &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 1, FinalSeats: 3}}, request: &request{workEstimate: qs.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1, FinalSeats: 3})},
expected: 3, expected: 3,
}, },
{ {
name: "", name: "",
request: &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 3, FinalSeats: 1}}, request: &request{workEstimate: qs.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 3, FinalSeats: 1})},
expected: 3, expected: 3,
}, },
} }
@ -1387,19 +1390,20 @@ func TestRequestSeats(t *testing.T) {
} }
} }
func TestRequestAdditionalSeatSeconds(t *testing.T) { func TestRequestWork(t *testing.T) {
qs := &queueSet{estimatedServiceDuration: 2 * time.Second}
request := &request{ request := &request{
workEstimate: fcrequest.WorkEstimate{ workEstimate: qs.completeWorkEstimate(&fcrequest.WorkEstimate{
InitialSeats: 3, InitialSeats: 3,
FinalSeats: 5, FinalSeats: 50,
AdditionalLatency: 3 * time.Second, AdditionalLatency: 70 * time.Second,
}, }),
} }
got := request.AdditionalSeatSeconds() got := request.totalWork()
want := SeatsTimesDuration(5, 3*time.Second) want := SeatsTimesDuration(3, 2*time.Second) + SeatsTimesDuration(50, 70*time.Second)
if want != got { if want != got {
t.Errorf("Expected AdditionalSeatSeconds: %v, but got: %v", want, got) t.Errorf("Expected totalWork: %v, but got: %v", want, got)
} }
} }

View File

@ -47,7 +47,7 @@ type request struct {
startTime time.Time startTime time.Time
// estimated amount of work of the request // estimated amount of work of the request
workEstimate fcrequest.WorkEstimate workEstimate completedWorkEstimate
// decision gets set to a `requestDecision` indicating what to do // decision gets set to a `requestDecision` indicating what to do
// with this request. It gets set exactly once, when the request // with this request. It gets set exactly once, when the request
@ -78,6 +78,11 @@ type request struct {
removeFromQueueFn removeFromFIFOFunc removeFromQueueFn removeFromFIFOFunc
} }
type completedWorkEstimate struct {
fcrequest.WorkEstimate
totalWork SeatSeconds // initial plus final work
}
// queue is an array of requests with additional metadata required for // queue is an array of requests with additional metadata required for
// the FQScheduler // the FQScheduler
type queue struct { type queue struct {
@ -97,8 +102,8 @@ type queue struct {
seatsInUse int seatsInUse int
} }
// queueSum tracks the sum of initial seats, final seats, and // queueSum tracks the sum of initial seats, max seats, and
// additional latency aggregated from all requests in a given queue // totalWork from all requests in a given queue
type queueSum struct { type queueSum struct {
// InitialSeatsSum is the sum of InitialSeats // InitialSeatsSum is the sum of InitialSeats
// associated with all requests in a given queue. // associated with all requests in a given queue.
@ -108,9 +113,23 @@ type queueSum struct {
// associated with all requests in a given queue. // associated with all requests in a given queue.
MaxSeatsSum int MaxSeatsSum int
// AdditionalSeatSecondsSum is sum of AdditionalSeatsSeconds // TotalWorkSum is the sum of totalWork of the waiting requests
// associated with all requests in a given queue. TotalWorkSum SeatSeconds
AdditionalSeatSecondsSum SeatSeconds }
func (req *request) totalWork() SeatSeconds {
return req.workEstimate.totalWork
}
func (qs *queueSet) completeWorkEstimate(we *fcrequest.WorkEstimate) completedWorkEstimate {
return completedWorkEstimate{
WorkEstimate: *we,
totalWork: qs.computeTotalWork(we),
}
}
func (qs *queueSet) computeTotalWork(we *fcrequest.WorkEstimate) SeatSeconds {
return SeatsTimesDuration(float64(we.InitialSeats), qs.estimatedServiceDuration) + SeatsTimesDuration(float64(we.FinalSeats), we.AdditionalLatency)
} }
// Enqueue enqueues a request into the queue and // Enqueue enqueues a request into the queue and

View File

@ -33,6 +33,8 @@ const (
maximumSeats = 10 maximumSeats = 10
) )
// WorkEstimate carries three of the four parameters that determine the work in a request.
// The fourth parameter is the duration of the initial phase of execution.
type WorkEstimate struct { type WorkEstimate struct {
// InitialSeats is the number of seats occupied while the server is // InitialSeats is the number of seats occupied while the server is
// executing this request. // executing this request.
@ -49,8 +51,8 @@ type WorkEstimate struct {
AdditionalLatency time.Duration AdditionalLatency time.Duration
} }
// MaxSeats returns the number of seats this request requires, it is the maximum // MaxSeats returns the maximum number of seats the request occupies over the
// of the two, WorkEstimate.InitialSeats and WorkEstimate.FinalSeats. // phases of being served.
func (we *WorkEstimate) MaxSeats() int { func (we *WorkEstimate) MaxSeats() int {
if we.InitialSeats >= we.FinalSeats { if we.InitialSeats >= we.FinalSeats {
return int(we.InitialSeats) return int(we.InitialSeats)