From 2f7456076e0cb29a95d86cd0f54c34a04b4722ab Mon Sep 17 00:00:00 2001 From: Abu Kashem Date: Mon, 14 Jun 2021 09:52:00 -0400 Subject: [PATCH] apf: always include seats in virtual time --- .../fairqueuing/queueset/queueset.go | 7 +-- .../flowcontrol/fairqueuing/queueset/types.go | 28 ++++++--- .../fairqueuing/queueset/types_test.go | 62 +++++++++++++++++++ 3 files changed, 85 insertions(+), 12 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types_test.go diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index cf975f304bf..d28d984469a 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -674,8 +674,7 @@ func (qs *queueSet) selectQueueLocked() *queue { qs.robinIndex = (qs.robinIndex + 1) % nq queue := qs.queues[qs.robinIndex] if queue.requests.Length() != 0 { - - currentVirtualFinish := queue.GetVirtualFinish(0, qs.estimatedServiceTime) + currentVirtualFinish := queue.GetNextFinish(qs.estimatedServiceTime) if currentVirtualFinish < minVirtualFinish { minVirtualFinish = currentVirtualFinish minQueue = queue @@ -742,8 +741,8 @@ func (qs *queueSet) finishRequestLocked(r *request) { S := now.Sub(r.startTime).Seconds() // When a request finishes being served, and the actual service time was S, - // the queue’s virtual start time is decremented by G - S. - r.queue.virtualStart -= (qs.estimatedServiceTime * float64(r.Seats())) - S + // the queue’s virtual start time is decremented by (G - S)*width. + r.queue.virtualStart -= (qs.estimatedServiceTime - S) * float64(r.Seats()) // request has finished, remove from requests executing r.queue.requestsExecuting-- diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index d074b0bfdc7..c7cea0a004f 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -101,15 +101,27 @@ func (q *queue) Dequeue() (*request, bool) { return request, ok } -// GetVirtualFinish returns the expected virtual finish time of the request at -// index J in the queue with estimated finish time G -func (q *queue) GetVirtualFinish(J int, G float64) float64 { - // The virtual finish time of request number J in the queue - // (counting from J=1 for the head) is J * G + (virtual start time). +// GetNextFinish returns the expected virtual finish time of the +// oldest request in the queue with estimated duration G +func (q *queue) GetNextFinish(G float64) float64 { + // TODO: if we decide to generalize this function to return virtual finish time + // for the Nth oldest request waiting in the queue, we need to carefully + // evaluate and potentially improve the performance here. + var oldestReq *request + q.requests.Walk(func(r *request) bool { + oldestReq = r + return false + }) + if oldestReq == nil { + // we should never be here, since the caller should ensure + // that this queue has request(s) waiting to be served before + // calling this function. + return q.virtualStart + } - // counting from J=1 for the head (eg: queue.requests[0] -> J=1) - J+1 - jg := float64(J+1) * float64(G) - return jg + q.virtualStart + // the estimated service time of the oldest request is (G * request width) + estimatedServiceTime := float64(G) * float64(oldestReq.Seats()) + return q.virtualStart + estimatedServiceTime } func (q *queue) dump(includeDetails bool) debug.QueueDump { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types_test.go new file mode 100644 index 00000000000..ffcf7ca2e15 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types_test.go @@ -0,0 +1,62 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queueset + +import ( + "testing" +) + +func TestGetNextFinish(t *testing.T) { + var virtualStart float64 = 100 + var G float64 = 60 + tests := []struct { + name string + requests []*request + virtualFinishExpected float64 + }{ + { + name: "for the oldest request", + requests: []*request{ + {width: 5}, + {width: 6}, + {width: 7}, + }, + virtualFinishExpected: virtualStart + (5 * G), + }, + { + name: "queue does not have any request waiting", + requests: []*request{}, + virtualFinishExpected: virtualStart, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + requests := newRequestFIFO() + for i := range test.requests { + requests.Enqueue(test.requests[i]) + } + + q := &queue{requests: requests} + q.virtualStart = virtualStart + virtualFinishGot := q.GetNextFinish(G) + if test.virtualFinishExpected != virtualFinishGot { + t.Errorf("Expected virtual finish time: %.9fs, but got: %.9fs", test.virtualFinishExpected, virtualFinishGot) + } + }) + } +}