From 69f9bc181f155ded7c5d5cc0ca9f026a6b42f431 Mon Sep 17 00:00:00 2001 From: Abu Kashem Date: Mon, 29 Mar 2021 11:31:12 -0400 Subject: [PATCH] apf: use a list instead of slice for queueset --- .../fairqueuing/queueset/fifo_list.go | 102 ++++++++++ .../fairqueuing/queueset/fifo_list_test.go | 183 ++++++++++++++++++ .../fairqueuing/queueset/queueset.go | 75 +++---- .../flowcontrol/fairqueuing/queueset/types.go | 29 +-- 4 files changed, 341 insertions(+), 48 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go create mode 100644 staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go new file mode 100644 index 00000000000..78fb777f842 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go @@ -0,0 +1,102 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queueset + +import ( + "container/list" +) + +// removeFromFIFOFunc removes a designated element from the list. +// The complexity of the runtime cost is O(1) +// It returns the request removed from the list. +type removeFromFIFOFunc func() *request + +// walkFunc is called for each request in the list in the +// oldest -> newest order. +// ok: if walkFunc returns false then the iteration stops immediately. +type walkFunc func(*request) (ok bool) + +// Internal interface to abstract out the implementation details +// of the underlying list used to maintain the requests. +// +// Note that the FIFO list is not safe for concurrent use by multiple +// goroutines without additional locking or coordination. It rests with +// the user to ensure that the FIFO list is used with proper locking. +type fifo interface { + // Enqueue enqueues the specified request into the list and + // returns a removeFromFIFOFunc function that can be used to remove the + // request from the list + Enqueue(*request) removeFromFIFOFunc + + // Dequeue pulls out the oldest request from the list. + Dequeue() (*request, bool) + + // Length returns the number of requests in the list. + Length() int + + // Walk iterates through the list in order of oldest -> newest + // and executes the specified walkFunc for each request in that order. + // + // if the specified walkFunc returns false the Walk function + // stops the walk an returns immediately. + Walk(walkFunc) +} + +// the FIFO list implementation is not safe for concurrent use by multiple +// goroutines without additional locking or coordination. +type requestFIFO struct { + *list.List +} + +func newRequestFIFO() fifo { + return &requestFIFO{ + List: list.New(), + } +} + +func (l *requestFIFO) Length() int { + return l.Len() +} + +func (l *requestFIFO) Enqueue(req *request) removeFromFIFOFunc { + e := l.PushBack(req) + return func() *request { + l.Remove(e) + return req + } +} + +func (l *requestFIFO) Dequeue() (*request, bool) { + e := l.Front() + if e == nil { + return nil, false + } + defer l.Remove(e) + + request, ok := e.Value.(*request) + return request, ok +} + +func (l *requestFIFO) Walk(f walkFunc) { + for current := l.Front(); current != nil; current = current.Next() { + if r, ok := current.Value.(*request); ok { + if !f(r) { + return + } + } + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go new file mode 100644 index 00000000000..079f0d770ba --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go @@ -0,0 +1,183 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queueset + +import ( + "math/rand" + "testing" + "time" +) + +func TestFIFOWithEnqueueDequeueSingleRequest(t *testing.T) { + req := &request{} + + list := newRequestFIFO() + list.Enqueue(req) + + reqGot, ok := list.Dequeue() + if !ok { + t.Errorf("Expected true, but got: %t", ok) + } + if req != reqGot { + t.Errorf("Expected dequued request: (%p), but got: (%p)", req, reqGot) + } + if list.Length() != 0 { + t.Errorf("Expected length: %d, but got: %d)", 0, list.Length()) + } +} + +func TestFIFOWithEnqueueDequeueMultipleRequests(t *testing.T) { + arrival := []*request{{}, {}, {}, {}, {}, {}} + + list := newRequestFIFO() + for i := range arrival { + list.Enqueue(arrival[i]) + } + + dequeued := make([]*request, 0) + for list.Length() > 0 { + req, _ := list.Dequeue() + dequeued = append(dequeued, req) + } + + verifyOrder(t, arrival, dequeued) +} + +func TestFIFOWithEnqueueDequeueSomeRequestsRemainInQueue(t *testing.T) { + list := newRequestFIFO() + + arrival := []*request{{}, {}, {}, {}, {}, {}} + half := len(arrival) / 2 + for i := range arrival { + list.Enqueue(arrival[i]) + } + + dequeued := make([]*request, 0) + for i := 0; i < half; i++ { + req, _ := list.Dequeue() + dequeued = append(dequeued, req) + } + + verifyOrder(t, arrival[:half], dequeued) +} + +func TestFIFOWithRemoveMultipleRequestsInArrivalOrder(t *testing.T) { + list := newRequestFIFO() + + arrival := []*request{{}, {}, {}, {}, {}, {}} + removeFn := make([]removeFromFIFOFunc, 0) + for i := range arrival { + removeFn = append(removeFn, list.Enqueue(arrival[i])) + } + + dequeued := make([]*request, 0) + for _, f := range removeFn { + dequeued = append(dequeued, f()) + } + + if list.Length() != 0 { + t.Errorf("Expected length: %d, but got: %d)", 0, list.Length()) + } + + verifyOrder(t, arrival, dequeued) +} + +func TestFIFOWithRemoveMultipleRequestsInRandomOrder(t *testing.T) { + list := newRequestFIFO() + + arrival := []*request{{}, {}, {}, {}, {}, {}} + removeFn := make([]removeFromFIFOFunc, 0) + for i := range arrival { + removeFn = append(removeFn, list.Enqueue(arrival[i])) + } + + dequeued := make([]*request, 0) + r := rand.New(rand.NewSource(time.Now().UnixNano())) + randomIndices := r.Perm(len(removeFn)) + t.Logf("Random remove order: %v", randomIndices) + for i := range randomIndices { + dequeued = append(dequeued, removeFn[i]()) + } + + if list.Length() != 0 { + t.Errorf("Expected length: %d, but got: %d)", 0, list.Length()) + } + + verifyOrder(t, arrival, dequeued) +} + +func TestFIFOWithRemoveIsIdempotent(t *testing.T) { + list := newRequestFIFO() + + arrival := []*request{{}, {}, {}, {}} + removeFn := make([]removeFromFIFOFunc, 0) + for i := range arrival { + removeFn = append(removeFn, list.Enqueue(arrival[i])) + } + + // pick one request to be removed at random + r := rand.New(rand.NewSource(time.Now().UnixNano())) + randomIndex := r.Intn(len(removeFn)) + t.Logf("Random remove index: %d", randomIndex) + + // remove the request from the fifo twice, we expect it to be idempotent + removeFn[randomIndex]() + removeFn[randomIndex]() + + lengthExpected := len(arrival) - 1 + if lengthExpected != list.Length() { + t.Errorf("Expected length: %d, but got: %d)", lengthExpected, list.Length()) + } + + orderExpected := append(arrival[0:randomIndex], arrival[randomIndex+1:]...) + remainingRequests := walkAll(list) + verifyOrder(t, orderExpected, remainingRequests) +} + +func TestFIFOWithWalk(t *testing.T) { + list := newRequestFIFO() + + arrival := []*request{{}, {}, {}, {}, {}, {}} + for i := range arrival { + list.Enqueue(arrival[i]) + } + + visited := walkAll(list) + + verifyOrder(t, arrival, visited) +} + +func verifyOrder(t *testing.T, expected, actual []*request) { + if len(expected) != len(actual) { + t.Fatalf("Expected slice length: %d, but got: %d", len(expected), len(actual)) + } + for i := range expected { + if expected[i] != actual[i] { + t.Errorf("Dequeue order mismatch, expected request: (%p), but got: (%p)", expected[i], actual[i]) + } + } +} + +func walkAll(l fifo) []*request { + visited := make([]*request, 0) + l.Walk(func(req *request) bool { + visited = append(visited, req) + return true + }) + + return visited +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 8229ccbbb4b..d3d1a6b6245 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -165,7 +165,7 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet { func createQueues(n, baseIndex int) []*queue { fqqueues := make([]*queue, n) for i := 0; i < n; i++ { - fqqueues[i] = &queue{index: baseIndex + i, requests: make([]*request, 0)} + fqqueues[i] = &queue{index: baseIndex + i, requests: newRequestFIFO()} } return fqqueues } @@ -407,7 +407,7 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 { reqs := 0 for _, queue := range qs.queues { reqs += queue.requestsExecuting - if len(queue.requests) > 0 || queue.requestsExecuting > 0 { + if queue.requests.Length() > 0 || queue.requestsExecuting > 0 { activeQueues++ } } @@ -453,7 +453,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte if ok := qs.rejectOrEnqueueLocked(req); !ok { return nil } - metrics.ObserveQueueLength(ctx, qs.qCfg.Name, fsName, len(queue.requests)) + metrics.ObserveQueueLength(ctx, qs.qCfg.Name, fsName, queue.requests.Length()) return req } @@ -464,7 +464,7 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte bestQueueLen := int(math.MaxInt32) // the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`. qs.dealer.Deal(hashValue, func(queueIdx int) { - thisLen := len(qs.queues[queueIdx].requests) + thisLen := qs.queues[queueIdx].requests.Length() klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of length %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisLen) if thisLen < bestQueueLen { bestQueueIdx, bestQueueLen = queueIdx, thisLen @@ -477,7 +477,7 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte // removeTimedOutRequestsFromQueueLocked rejects old requests that have been enqueued // past the requestWaitLimit func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName string) { - timeoutIdx := -1 + timeoutCount := 0 now := qs.clock.Now() reqs := queue.requests // reqs are sorted oldest -> newest @@ -486,26 +486,31 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s // now - requestWaitLimit = waitLimit waitLimit := now.Add(-qs.qCfg.RequestWaitLimit) - for i, req := range reqs { + reqs.Walk(func(req *request) bool { if waitLimit.After(req.arrivalTime) { req.decision.SetLocked(decisionReject) - // get index for timed out requests - timeoutIdx = i + timeoutCount++ metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) req.NoteQueued(false) - } else { - break + + // we need to check if the next request has timed out. + return true } - } + + // since reqs are sorted oldest -> newest, we are done here. + return false + }) + // remove timed out requests from queue - if timeoutIdx != -1 { - // timeoutIdx + 1 to remove the last timeout req - removeIdx := timeoutIdx + 1 - // remove all the timeout requests - queue.requests = reqs[removeIdx:] + if timeoutCount > 0 { + // The number of requests we have timed out is timeoutCount, + // so, let's dequeue the exact number of requests for this queue. + for i := 0; i < timeoutCount; i++ { + queue.requests.Dequeue() + } // decrement the # of requestsEnqueued - qs.totRequestsWaiting -= removeIdx - qs.obsPair.RequestsWaiting.Add(float64(-removeIdx)) + qs.totRequestsWaiting -= timeoutCount + qs.obsPair.RequestsWaiting.Add(float64(-timeoutCount)) } } @@ -515,7 +520,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s // Otherwise enqueues and returns true. func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool { queue := request.queue - curQueueLength := len(queue.requests) + curQueueLength := queue.requests.Length() // rejects the newly arrived request if resource criteria not met if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit && curQueueLength >= qs.qCfg.QueueLengthLimit { @@ -530,7 +535,7 @@ func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool { func (qs *queueSet) enqueueLocked(request *request) { queue := request.queue now := qs.clock.Now() - if len(queue.requests) == 0 && queue.requestsExecuting == 0 { + if queue.requests.Length() == 0 && queue.requestsExecuting == 0 { // the queue’s virtual start time is set to the virtual time. queue.virtualStart = qs.virtualTime if klog.V(6).Enabled() { @@ -610,7 +615,9 @@ func (qs *queueSet) dispatchLocked() bool { qs.obsPair.RequestsWaiting.Add(-1) qs.obsPair.RequestsExecuting.Add(1) if klog.V(6).Enabled() { - klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.index, queue.virtualStart, len(queue.requests), queue.requestsExecuting) + klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", + qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, + queue.index, queue.virtualStart, queue.requests.Length(), queue.requestsExecuting) } // When a request is dequeued for service -> qs.virtualStart += G queue.virtualStart += qs.estimatedServiceTime @@ -629,19 +636,13 @@ func (qs *queueSet) cancelWait(req *request) { return } req.decision.SetLocked(decisionCancel) - queue := req.queue + // remove the request from the queue as it has timed out - for i := range queue.requests { - if req == queue.requests[i] { - // remove the request - queue.requests = append(queue.requests[:i], queue.requests[i+1:]...) - qs.totRequestsWaiting-- - metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) - req.NoteQueued(false) - qs.obsPair.RequestsWaiting.Add(-1) - break - } - } + req.removeFromQueueFn() + qs.totRequestsWaiting-- + metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) + req.NoteQueued(false) + qs.obsPair.RequestsWaiting.Add(-1) } // selectQueueLocked examines the queues in round robin order and @@ -655,7 +656,7 @@ func (qs *queueSet) selectQueueLocked() *queue { for range qs.queues { qs.robinIndex = (qs.robinIndex + 1) % nq queue := qs.queues[qs.robinIndex] - if len(queue.requests) != 0 { + if queue.requests.Length() != 0 { currentVirtualFinish := queue.GetVirtualFinish(0, qs.estimatedServiceTime) if currentVirtualFinish < minVirtualFinish { @@ -729,13 +730,15 @@ func (qs *queueSet) finishRequestLocked(r *request) { r.queue.requestsExecuting-- if klog.V(6).Enabled() { - klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index, r.queue.virtualStart, S, len(r.queue.requests), r.queue.requestsExecuting) + klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", + qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index, + r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requestsExecuting) } // If there are more queues than desired and this one has no // requests then remove it if len(qs.queues) > qs.qCfg.DesiredNumQueues && - len(r.queue.requests) == 0 && + r.queue.requests.Length() == 0 && r.queue.requestsExecuting == 0 { qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.queue.index) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index a720230600e..0345f9ed90a 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -61,12 +61,17 @@ type request struct { waitStarted bool queueNoteFn fq.QueueNoteFn + + // Removes this request from its queue. If the request is not put into a + // a queue it will be nil. + removeFromQueueFn removeFromFIFOFunc } // queue is an array of requests with additional metadata required for // the FQScheduler type queue struct { - requests []*request + // The requests are stored in a FIFO list. + requests fifo // virtualStart is the virtual time (virtual seconds since process // startup) when the oldest request in the queue (if there is any) @@ -77,19 +82,16 @@ type queue struct { index int } -// Enqueue enqueues a request into the queue +// Enqueue enqueues a request into the queue and +// sets the removeFromQueueFn of the request appropriately. func (q *queue) Enqueue(request *request) { - q.requests = append(q.requests, request) + request.removeFromQueueFn = q.requests.Enqueue(request) } // Dequeue dequeues a request from the queue func (q *queue) Dequeue() (*request, bool) { - if len(q.requests) == 0 { - return nil, false - } - request := q.requests[0] - q.requests = q.requests[1:] - return request, true + request, ok := q.requests.Dequeue() + return request, ok } // GetVirtualFinish returns the expected virtual finish time of the request at @@ -104,8 +106,9 @@ func (q *queue) GetVirtualFinish(J int, G float64) float64 { } func (q *queue) dump(includeDetails bool) debug.QueueDump { - digest := make([]debug.RequestDump, len(q.requests)) - for i, r := range q.requests { + digest := make([]debug.RequestDump, q.requests.Length()) + i := 0 + q.requests.Walk(func(r *request) bool { // dump requests. digest[i].MatchedFlowSchema = r.fsName digest[i].FlowDistinguisher = r.flowDistinguisher @@ -119,7 +122,9 @@ func (q *queue) dump(includeDetails bool) debug.QueueDump { digest[i].RequestInfo = *requestInfo } } - } + i++ + return true + }) return debug.QueueDump{ VirtualStart: q.virtualStart, Requests: digest,