Merge pull request #104806 from MikeSpreitzer/set-g-to-3ms

Change execution duration guess from 1 minute to 3 milliseconds
This commit is contained in:
Kubernetes Prow Robot 2021-09-08 10:05:20 -07:00 committed by GitHub
commit 559808670a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 45 additions and 50 deletions

View File

@ -45,6 +45,9 @@ type fifo interface {
// Dequeue pulls out the oldest request from the list.
Dequeue() (*request, bool)
// Peek returns the oldest request without removing it.
Peek() (*request, bool)
// Length returns the number of requests in the list.
Length() int
@ -97,18 +100,28 @@ func (l *requestFIFO) Enqueue(req *request) removeFromFIFOFunc {
}
func (l *requestFIFO) Dequeue() (*request, bool) {
return l.getFirst(true)
}
func (l *requestFIFO) Peek() (*request, bool) {
return l.getFirst(false)
}
func (l *requestFIFO) getFirst(remove bool) (*request, bool) {
e := l.Front()
if e == nil {
return nil, false
}
defer func() {
l.Remove(e)
e.Value = nil
}()
if remove {
defer func() {
l.Remove(e)
e.Value = nil
}()
}
request, ok := e.Value.(*request)
if ok {
if remove && ok {
l.seatsSum -= request.Seats()
}
return request, ok

View File

@ -76,9 +76,9 @@ type queueSetCompleter struct {
// not end in "Locked" either acquires the lock or does not care about
// locking.
type queueSet struct {
clock eventclock.Interface
estimatedServiceTime float64
obsPair metrics.TimedObserverPair
clock eventclock.Interface
estimatedServiceSeconds float64
obsPair metrics.TimedObserverPair
promiseFactory promiseFactory
@ -170,12 +170,12 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet {
qs := qsc.theSet
if qs == nil {
qs = &queueSet{
clock: qsc.factory.clock,
estimatedServiceTime: 60,
obsPair: qsc.obsPair,
qCfg: qsc.qCfg,
virtualTime: 0,
lastRealTime: qsc.factory.clock.Now(),
clock: qsc.factory.clock,
estimatedServiceSeconds: 0.003,
obsPair: qsc.obsPair,
qCfg: qsc.qCfg,
virtualTime: 0,
lastRealTime: qsc.factory.clock.Now(),
}
qs.promiseFactory = qsc.factory.promiseFactoryFactory(qs)
}
@ -642,8 +642,8 @@ func (qs *queueSet) dispatchLocked() bool {
qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2,
request.workEstimate, queue.index, queue.virtualStart, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse)
}
// When a request is dequeued for service -> qs.virtualStart += G
queue.virtualStart += qs.estimatedServiceTime * float64(request.Seats())
// When a request is dequeued for service -> qs.virtualStart += G * width
queue.virtualStart += qs.estimatedServiceSeconds * float64(request.Seats())
request.decision.Set(decisionExecute)
return ok
}
@ -686,29 +686,14 @@ func (qs *queueSet) selectQueueLocked() *queue {
for range qs.queues {
qs.robinIndex = (qs.robinIndex + 1) % nq
queue := qs.queues[qs.robinIndex]
if queue.requests.Length() != 0 {
oldestWaiting, _ := queue.requests.Peek()
if oldestWaiting != nil {
sMin = math.Min(sMin, queue.virtualStart)
sMax = math.Max(sMax, queue.virtualStart)
estimatedWorkInProgress := qs.estimatedServiceTime * float64(queue.seatsInUse)
estimatedWorkInProgress := qs.estimatedServiceSeconds * float64(queue.seatsInUse)
dsMin = math.Min(dsMin, queue.virtualStart-estimatedWorkInProgress)
dsMax = math.Max(dsMax, queue.virtualStart-estimatedWorkInProgress)
// the finish R of the oldest request is:
// start R + G
// we are not taking the width of the request into account when
// we calculate the virtual finish time of the request because
// it can starve requests with smaller wdith in other queues.
//
// so let's draw an example of the starving scenario:
// - G=60 (estimated service time in seconds)
// - concurrency limit=2
// - we have two queues, q1 and q2
// - q1 has an infinite supply of requests with width W=1
// - q2 has one request waiting in the queue with width W=2
// - start R for both q1 and q2 are at t0
// - requests complete really fast, S=1ms on q1
// in this scenario we will execute roughly 60,000 requests
// from q1 before we pick the request from q2.
currentVirtualFinish := queue.virtualStart + qs.estimatedServiceTime
currentVirtualFinish := queue.virtualStart + qs.estimatedServiceSeconds*float64(oldestWaiting.Seats())
klog.V(11).InfoS("Considering queue to dispatch", "queueSet", qs.qCfg.Name, "queue", qs.robinIndex, "finishR", currentVirtualFinish)
if currentVirtualFinish < minVirtualFinish {
minVirtualFinish = currentVirtualFinish
@ -718,12 +703,7 @@ func (qs *queueSet) selectQueueLocked() *queue {
}
}
// TODO: add a method to fifo that lets us peek at the oldest request
var oldestReqFromMinQueue *request
minQueue.requests.Walk(func(r *request) bool {
oldestReqFromMinQueue = r
return false
})
oldestReqFromMinQueue, _ := minQueue.requests.Peek()
if oldestReqFromMinQueue == nil {
// This cannot happen
klog.ErrorS(errors.New("selected queue is empty"), "Impossible", "queueSet", qs.qCfg.Name)
@ -755,7 +735,7 @@ func (qs *queueSet) selectQueueLocked() *queue {
// queue here. if the last start R (excluded estimated cost)
// falls behind the global virtual time, we update the latest virtual
// start by: <latest global virtual time> + <previously estimated cost>
previouslyEstimatedServiceTime := float64(minQueue.seatsInUse) * qs.estimatedServiceTime
previouslyEstimatedServiceTime := float64(minQueue.seatsInUse) * qs.estimatedServiceSeconds
if qs.virtualTime > minQueue.virtualStart-previouslyEstimatedServiceTime {
// per-queue virtual time should not fall behind the global
minQueue.virtualStart = qs.virtualTime + previouslyEstimatedServiceTime
@ -853,7 +833,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
// When a request finishes being served, and the actual service time was S,
// the queues start R is decremented by (G - S)*width.
r.queue.virtualStart -= (qs.estimatedServiceTime - S) * float64(r.Seats())
r.queue.virtualStart -= (qs.estimatedServiceSeconds - S) * float64(r.Seats())
}
}

View File

@ -1034,7 +1034,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) {
}
func TestSelectQueueLocked(t *testing.T) {
var G float64 = 60
var G float64 = 0.003
tests := []struct {
name string
robinIndex int
@ -1087,7 +1087,7 @@ func TestSelectQueueLocked(t *testing.T) {
robinIndexExpected: []int{0},
},
{
name: "width > 1, seats are available for request with the least finish time, queue is picked",
name: "width > 1, seats are available for request with the least finish R, queue is picked",
concurrencyLimit: 50,
totSeatsInUse: 25,
robinIndex: -1,
@ -1110,7 +1110,7 @@ func TestSelectQueueLocked(t *testing.T) {
robinIndexExpected: []int{1},
},
{
name: "width > 1, seats are not available for request with the least finish time, queue is not picked",
name: "width > 1, seats are not available for request with the least finish R, queue is not picked",
concurrencyLimit: 50,
totSeatsInUse: 26,
robinIndex: -1,
@ -1165,9 +1165,10 @@ func TestSelectQueueLocked(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
qs := &queueSet{
estimatedServiceTime: G,
robinIndex: test.robinIndex,
totSeatsInUse: test.totSeatsInUse,
estimatedServiceSeconds: G,
robinIndex: test.robinIndex,
totSeatsInUse: test.totSeatsInUse,
qCfg: fq.QueuingConfig{Name: "TestSelectQueueLocked/" + test.name},
dCfg: fq.DispatchingConfig{
ConcurrencyLimit: test.concurrencyLimit,
},

View File

@ -76,13 +76,14 @@ type request struct {
// queue is an array of requests with additional metadata required for
// the FQScheduler
type queue struct {
// The requests are stored in a FIFO list.
// The requests not yet executing in the real world are stored in a FIFO list.
requests fifo
// virtualStart is the "virtual time" (R progress meter reading) at
// which the next request will be dispatched in the virtual world.
virtualStart float64
// requestsExecuting is the count in the real world
requestsExecuting int
index int