Merge pull request #102843 from tkashem/apf-fix-102299

apf: always include seats in virtual time
This commit is contained in:
Kubernetes Prow Robot 2021-06-15 17:04:06 -07:00 committed by GitHub
commit 9e373ac590
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 85 additions and 12 deletions

View File

@ -674,8 +674,7 @@ func (qs *queueSet) selectQueueLocked() *queue {
qs.robinIndex = (qs.robinIndex + 1) % nq qs.robinIndex = (qs.robinIndex + 1) % nq
queue := qs.queues[qs.robinIndex] queue := qs.queues[qs.robinIndex]
if queue.requests.Length() != 0 { if queue.requests.Length() != 0 {
currentVirtualFinish := queue.GetNextFinish(qs.estimatedServiceTime)
currentVirtualFinish := queue.GetVirtualFinish(0, qs.estimatedServiceTime)
if currentVirtualFinish < minVirtualFinish { if currentVirtualFinish < minVirtualFinish {
minVirtualFinish = currentVirtualFinish minVirtualFinish = currentVirtualFinish
minQueue = queue minQueue = queue
@ -742,8 +741,8 @@ func (qs *queueSet) finishRequestLocked(r *request) {
S := now.Sub(r.startTime).Seconds() S := now.Sub(r.startTime).Seconds()
// When a request finishes being served, and the actual service time was S, // When a request finishes being served, and the actual service time was S,
// the queues virtual start time is decremented by G - S. // the queues virtual start time is decremented by (G - S)*width.
r.queue.virtualStart -= (qs.estimatedServiceTime * float64(r.Seats())) - S r.queue.virtualStart -= (qs.estimatedServiceTime - S) * float64(r.Seats())
// request has finished, remove from requests executing // request has finished, remove from requests executing
r.queue.requestsExecuting-- r.queue.requestsExecuting--

View File

@ -101,15 +101,27 @@ func (q *queue) Dequeue() (*request, bool) {
return request, ok return request, ok
} }
// GetVirtualFinish returns the expected virtual finish time of the request at // GetNextFinish returns the expected virtual finish time of the
// index J in the queue with estimated finish time G // oldest request in the queue with estimated duration G
func (q *queue) GetVirtualFinish(J int, G float64) float64 { func (q *queue) GetNextFinish(G float64) float64 {
// The virtual finish time of request number J in the queue // TODO: if we decide to generalize this function to return virtual finish time
// (counting from J=1 for the head) is J * G + (virtual start time). // for the Nth oldest request waiting in the queue, we need to carefully
// evaluate and potentially improve the performance here.
var oldestReq *request
q.requests.Walk(func(r *request) bool {
oldestReq = r
return false
})
if oldestReq == nil {
// we should never be here, since the caller should ensure
// that this queue has request(s) waiting to be served before
// calling this function.
return q.virtualStart
}
// counting from J=1 for the head (eg: queue.requests[0] -> J=1) - J+1 // the estimated service time of the oldest request is (G * request width)
jg := float64(J+1) * float64(G) estimatedServiceTime := float64(G) * float64(oldestReq.Seats())
return jg + q.virtualStart return q.virtualStart + estimatedServiceTime
} }
func (q *queue) dump(includeDetails bool) debug.QueueDump { func (q *queue) dump(includeDetails bool) debug.QueueDump {

View File

@ -0,0 +1,62 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queueset
import (
"testing"
)
func TestGetNextFinish(t *testing.T) {
var virtualStart float64 = 100
var G float64 = 60
tests := []struct {
name string
requests []*request
virtualFinishExpected float64
}{
{
name: "for the oldest request",
requests: []*request{
{width: 5},
{width: 6},
{width: 7},
},
virtualFinishExpected: virtualStart + (5 * G),
},
{
name: "queue does not have any request waiting",
requests: []*request{},
virtualFinishExpected: virtualStart,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
requests := newRequestFIFO()
for i := range test.requests {
requests.Enqueue(test.requests[i])
}
q := &queue{requests: requests}
q.virtualStart = virtualStart
virtualFinishGot := q.GetNextFinish(G)
if test.virtualFinishExpected != virtualFinishGot {
t.Errorf("Expected virtual finish time: %.9fs, but got: %.9fs", test.virtualFinishExpected, virtualFinishGot)
}
})
}
}