Merge pull request #101484 from tkashem/apf-queueset-use-list

apf: use a list instead of slice for queueset
This commit is contained in:
Kubernetes Prow Robot 2021-04-30 04:25:58 -07:00 committed by GitHub
commit a108dc498b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 341 additions and 48 deletions

View File

@ -0,0 +1,102 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queueset
import (
"container/list"
)
// removeFromFIFOFunc removes a designated element from the list.
// The complexity of the runtime cost is O(1)
// It returns the request removed from the list.
type removeFromFIFOFunc func() *request
// walkFunc is called for each request in the list in the
// oldest -> newest order.
// ok: if walkFunc returns false then the iteration stops immediately.
type walkFunc func(*request) (ok bool)
// Internal interface to abstract out the implementation details
// of the underlying list used to maintain the requests.
//
// Note that the FIFO list is not safe for concurrent use by multiple
// goroutines without additional locking or coordination. It rests with
// the user to ensure that the FIFO list is used with proper locking.
type fifo interface {
// Enqueue enqueues the specified request into the list and
// returns a removeFromFIFOFunc function that can be used to remove the
// request from the list
Enqueue(*request) removeFromFIFOFunc
// Dequeue pulls out the oldest request from the list.
Dequeue() (*request, bool)
// Length returns the number of requests in the list.
Length() int
// Walk iterates through the list in order of oldest -> newest
// and executes the specified walkFunc for each request in that order.
//
// if the specified walkFunc returns false the Walk function
// stops the walk an returns immediately.
Walk(walkFunc)
}
// the FIFO list implementation is not safe for concurrent use by multiple
// goroutines without additional locking or coordination.
type requestFIFO struct {
*list.List
}
func newRequestFIFO() fifo {
return &requestFIFO{
List: list.New(),
}
}
func (l *requestFIFO) Length() int {
return l.Len()
}
func (l *requestFIFO) Enqueue(req *request) removeFromFIFOFunc {
e := l.PushBack(req)
return func() *request {
l.Remove(e)
return req
}
}
func (l *requestFIFO) Dequeue() (*request, bool) {
e := l.Front()
if e == nil {
return nil, false
}
defer l.Remove(e)
request, ok := e.Value.(*request)
return request, ok
}
func (l *requestFIFO) Walk(f walkFunc) {
for current := l.Front(); current != nil; current = current.Next() {
if r, ok := current.Value.(*request); ok {
if !f(r) {
return
}
}
}
}

View File

@ -0,0 +1,183 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queueset
import (
"math/rand"
"testing"
"time"
)
func TestFIFOWithEnqueueDequeueSingleRequest(t *testing.T) {
req := &request{}
list := newRequestFIFO()
list.Enqueue(req)
reqGot, ok := list.Dequeue()
if !ok {
t.Errorf("Expected true, but got: %t", ok)
}
if req != reqGot {
t.Errorf("Expected dequued request: (%p), but got: (%p)", req, reqGot)
}
if list.Length() != 0 {
t.Errorf("Expected length: %d, but got: %d)", 0, list.Length())
}
}
func TestFIFOWithEnqueueDequeueMultipleRequests(t *testing.T) {
arrival := []*request{{}, {}, {}, {}, {}, {}}
list := newRequestFIFO()
for i := range arrival {
list.Enqueue(arrival[i])
}
dequeued := make([]*request, 0)
for list.Length() > 0 {
req, _ := list.Dequeue()
dequeued = append(dequeued, req)
}
verifyOrder(t, arrival, dequeued)
}
func TestFIFOWithEnqueueDequeueSomeRequestsRemainInQueue(t *testing.T) {
list := newRequestFIFO()
arrival := []*request{{}, {}, {}, {}, {}, {}}
half := len(arrival) / 2
for i := range arrival {
list.Enqueue(arrival[i])
}
dequeued := make([]*request, 0)
for i := 0; i < half; i++ {
req, _ := list.Dequeue()
dequeued = append(dequeued, req)
}
verifyOrder(t, arrival[:half], dequeued)
}
func TestFIFOWithRemoveMultipleRequestsInArrivalOrder(t *testing.T) {
list := newRequestFIFO()
arrival := []*request{{}, {}, {}, {}, {}, {}}
removeFn := make([]removeFromFIFOFunc, 0)
for i := range arrival {
removeFn = append(removeFn, list.Enqueue(arrival[i]))
}
dequeued := make([]*request, 0)
for _, f := range removeFn {
dequeued = append(dequeued, f())
}
if list.Length() != 0 {
t.Errorf("Expected length: %d, but got: %d)", 0, list.Length())
}
verifyOrder(t, arrival, dequeued)
}
func TestFIFOWithRemoveMultipleRequestsInRandomOrder(t *testing.T) {
list := newRequestFIFO()
arrival := []*request{{}, {}, {}, {}, {}, {}}
removeFn := make([]removeFromFIFOFunc, 0)
for i := range arrival {
removeFn = append(removeFn, list.Enqueue(arrival[i]))
}
dequeued := make([]*request, 0)
r := rand.New(rand.NewSource(time.Now().UnixNano()))
randomIndices := r.Perm(len(removeFn))
t.Logf("Random remove order: %v", randomIndices)
for i := range randomIndices {
dequeued = append(dequeued, removeFn[i]())
}
if list.Length() != 0 {
t.Errorf("Expected length: %d, but got: %d)", 0, list.Length())
}
verifyOrder(t, arrival, dequeued)
}
func TestFIFOWithRemoveIsIdempotent(t *testing.T) {
list := newRequestFIFO()
arrival := []*request{{}, {}, {}, {}}
removeFn := make([]removeFromFIFOFunc, 0)
for i := range arrival {
removeFn = append(removeFn, list.Enqueue(arrival[i]))
}
// pick one request to be removed at random
r := rand.New(rand.NewSource(time.Now().UnixNano()))
randomIndex := r.Intn(len(removeFn))
t.Logf("Random remove index: %d", randomIndex)
// remove the request from the fifo twice, we expect it to be idempotent
removeFn[randomIndex]()
removeFn[randomIndex]()
lengthExpected := len(arrival) - 1
if lengthExpected != list.Length() {
t.Errorf("Expected length: %d, but got: %d)", lengthExpected, list.Length())
}
orderExpected := append(arrival[0:randomIndex], arrival[randomIndex+1:]...)
remainingRequests := walkAll(list)
verifyOrder(t, orderExpected, remainingRequests)
}
func TestFIFOWithWalk(t *testing.T) {
list := newRequestFIFO()
arrival := []*request{{}, {}, {}, {}, {}, {}}
for i := range arrival {
list.Enqueue(arrival[i])
}
visited := walkAll(list)
verifyOrder(t, arrival, visited)
}
func verifyOrder(t *testing.T, expected, actual []*request) {
if len(expected) != len(actual) {
t.Fatalf("Expected slice length: %d, but got: %d", len(expected), len(actual))
}
for i := range expected {
if expected[i] != actual[i] {
t.Errorf("Dequeue order mismatch, expected request: (%p), but got: (%p)", expected[i], actual[i])
}
}
}
func walkAll(l fifo) []*request {
visited := make([]*request, 0)
l.Walk(func(req *request) bool {
visited = append(visited, req)
return true
})
return visited
}

View File

@ -165,7 +165,7 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet {
func createQueues(n, baseIndex int) []*queue { func createQueues(n, baseIndex int) []*queue {
fqqueues := make([]*queue, n) fqqueues := make([]*queue, n)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
fqqueues[i] = &queue{index: baseIndex + i, requests: make([]*request, 0)} fqqueues[i] = &queue{index: baseIndex + i, requests: newRequestFIFO()}
} }
return fqqueues return fqqueues
} }
@ -407,7 +407,7 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 {
reqs := 0 reqs := 0
for _, queue := range qs.queues { for _, queue := range qs.queues {
reqs += queue.requestsExecuting reqs += queue.requestsExecuting
if len(queue.requests) > 0 || queue.requestsExecuting > 0 { if queue.requests.Length() > 0 || queue.requestsExecuting > 0 {
activeQueues++ activeQueues++
} }
} }
@ -453,7 +453,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
if ok := qs.rejectOrEnqueueLocked(req); !ok { if ok := qs.rejectOrEnqueueLocked(req); !ok {
return nil return nil
} }
metrics.ObserveQueueLength(ctx, qs.qCfg.Name, fsName, len(queue.requests)) metrics.ObserveQueueLength(ctx, qs.qCfg.Name, fsName, queue.requests.Length())
return req return req
} }
@ -464,7 +464,7 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte
bestQueueLen := int(math.MaxInt32) bestQueueLen := int(math.MaxInt32)
// the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`. // the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`.
qs.dealer.Deal(hashValue, func(queueIdx int) { qs.dealer.Deal(hashValue, func(queueIdx int) {
thisLen := len(qs.queues[queueIdx].requests) thisLen := qs.queues[queueIdx].requests.Length()
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of length %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisLen) klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of length %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisLen)
if thisLen < bestQueueLen { if thisLen < bestQueueLen {
bestQueueIdx, bestQueueLen = queueIdx, thisLen bestQueueIdx, bestQueueLen = queueIdx, thisLen
@ -477,7 +477,7 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte
// removeTimedOutRequestsFromQueueLocked rejects old requests that have been enqueued // removeTimedOutRequestsFromQueueLocked rejects old requests that have been enqueued
// past the requestWaitLimit // past the requestWaitLimit
func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName string) { func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName string) {
timeoutIdx := -1 timeoutCount := 0
now := qs.clock.Now() now := qs.clock.Now()
reqs := queue.requests reqs := queue.requests
// reqs are sorted oldest -> newest // reqs are sorted oldest -> newest
@ -486,26 +486,31 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s
// now - requestWaitLimit = waitLimit // now - requestWaitLimit = waitLimit
waitLimit := now.Add(-qs.qCfg.RequestWaitLimit) waitLimit := now.Add(-qs.qCfg.RequestWaitLimit)
for i, req := range reqs { reqs.Walk(func(req *request) bool {
if waitLimit.After(req.arrivalTime) { if waitLimit.After(req.arrivalTime) {
req.decision.SetLocked(decisionReject) req.decision.SetLocked(decisionReject)
// get index for timed out requests timeoutCount++
timeoutIdx = i
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
req.NoteQueued(false) req.NoteQueued(false)
} else {
break // we need to check if the next request has timed out.
return true
} }
}
// since reqs are sorted oldest -> newest, we are done here.
return false
})
// remove timed out requests from queue // remove timed out requests from queue
if timeoutIdx != -1 { if timeoutCount > 0 {
// timeoutIdx + 1 to remove the last timeout req // The number of requests we have timed out is timeoutCount,
removeIdx := timeoutIdx + 1 // so, let's dequeue the exact number of requests for this queue.
// remove all the timeout requests for i := 0; i < timeoutCount; i++ {
queue.requests = reqs[removeIdx:] queue.requests.Dequeue()
}
// decrement the # of requestsEnqueued // decrement the # of requestsEnqueued
qs.totRequestsWaiting -= removeIdx qs.totRequestsWaiting -= timeoutCount
qs.obsPair.RequestsWaiting.Add(float64(-removeIdx)) qs.obsPair.RequestsWaiting.Add(float64(-timeoutCount))
} }
} }
@ -515,7 +520,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s
// Otherwise enqueues and returns true. // Otherwise enqueues and returns true.
func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool { func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
queue := request.queue queue := request.queue
curQueueLength := len(queue.requests) curQueueLength := queue.requests.Length()
// rejects the newly arrived request if resource criteria not met // rejects the newly arrived request if resource criteria not met
if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit && if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit &&
curQueueLength >= qs.qCfg.QueueLengthLimit { curQueueLength >= qs.qCfg.QueueLengthLimit {
@ -530,7 +535,7 @@ func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
func (qs *queueSet) enqueueLocked(request *request) { func (qs *queueSet) enqueueLocked(request *request) {
queue := request.queue queue := request.queue
now := qs.clock.Now() now := qs.clock.Now()
if len(queue.requests) == 0 && queue.requestsExecuting == 0 { if queue.requests.Length() == 0 && queue.requestsExecuting == 0 {
// the queues virtual start time is set to the virtual time. // the queues virtual start time is set to the virtual time.
queue.virtualStart = qs.virtualTime queue.virtualStart = qs.virtualTime
if klog.V(6).Enabled() { if klog.V(6).Enabled() {
@ -610,7 +615,9 @@ func (qs *queueSet) dispatchLocked() bool {
qs.obsPair.RequestsWaiting.Add(-1) qs.obsPair.RequestsWaiting.Add(-1)
qs.obsPair.RequestsExecuting.Add(1) qs.obsPair.RequestsExecuting.Add(1)
if klog.V(6).Enabled() { if klog.V(6).Enabled() {
klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.index, queue.virtualStart, len(queue.requests), queue.requestsExecuting) klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing",
qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2,
queue.index, queue.virtualStart, queue.requests.Length(), queue.requestsExecuting)
} }
// When a request is dequeued for service -> qs.virtualStart += G // When a request is dequeued for service -> qs.virtualStart += G
queue.virtualStart += qs.estimatedServiceTime queue.virtualStart += qs.estimatedServiceTime
@ -629,19 +636,13 @@ func (qs *queueSet) cancelWait(req *request) {
return return
} }
req.decision.SetLocked(decisionCancel) req.decision.SetLocked(decisionCancel)
queue := req.queue
// remove the request from the queue as it has timed out // remove the request from the queue as it has timed out
for i := range queue.requests { req.removeFromQueueFn()
if req == queue.requests[i] { qs.totRequestsWaiting--
// remove the request metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
queue.requests = append(queue.requests[:i], queue.requests[i+1:]...) req.NoteQueued(false)
qs.totRequestsWaiting-- qs.obsPair.RequestsWaiting.Add(-1)
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
req.NoteQueued(false)
qs.obsPair.RequestsWaiting.Add(-1)
break
}
}
} }
// selectQueueLocked examines the queues in round robin order and // selectQueueLocked examines the queues in round robin order and
@ -655,7 +656,7 @@ func (qs *queueSet) selectQueueLocked() *queue {
for range qs.queues { for range qs.queues {
qs.robinIndex = (qs.robinIndex + 1) % nq qs.robinIndex = (qs.robinIndex + 1) % nq
queue := qs.queues[qs.robinIndex] queue := qs.queues[qs.robinIndex]
if len(queue.requests) != 0 { if queue.requests.Length() != 0 {
currentVirtualFinish := queue.GetVirtualFinish(0, qs.estimatedServiceTime) currentVirtualFinish := queue.GetVirtualFinish(0, qs.estimatedServiceTime)
if currentVirtualFinish < minVirtualFinish { if currentVirtualFinish < minVirtualFinish {
@ -729,13 +730,15 @@ func (qs *queueSet) finishRequestLocked(r *request) {
r.queue.requestsExecuting-- r.queue.requestsExecuting--
if klog.V(6).Enabled() { if klog.V(6).Enabled() {
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index, r.queue.virtualStart, S, len(r.queue.requests), r.queue.requestsExecuting) klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing",
qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index,
r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requestsExecuting)
} }
// If there are more queues than desired and this one has no // If there are more queues than desired and this one has no
// requests then remove it // requests then remove it
if len(qs.queues) > qs.qCfg.DesiredNumQueues && if len(qs.queues) > qs.qCfg.DesiredNumQueues &&
len(r.queue.requests) == 0 && r.queue.requests.Length() == 0 &&
r.queue.requestsExecuting == 0 { r.queue.requestsExecuting == 0 {
qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.queue.index) qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.queue.index)

View File

@ -61,12 +61,17 @@ type request struct {
waitStarted bool waitStarted bool
queueNoteFn fq.QueueNoteFn queueNoteFn fq.QueueNoteFn
// Removes this request from its queue. If the request is not put into a
// a queue it will be nil.
removeFromQueueFn removeFromFIFOFunc
} }
// queue is an array of requests with additional metadata required for // queue is an array of requests with additional metadata required for
// the FQScheduler // the FQScheduler
type queue struct { type queue struct {
requests []*request // The requests are stored in a FIFO list.
requests fifo
// virtualStart is the virtual time (virtual seconds since process // virtualStart is the virtual time (virtual seconds since process
// startup) when the oldest request in the queue (if there is any) // startup) when the oldest request in the queue (if there is any)
@ -77,19 +82,16 @@ type queue struct {
index int index int
} }
// Enqueue enqueues a request into the queue // Enqueue enqueues a request into the queue and
// sets the removeFromQueueFn of the request appropriately.
func (q *queue) Enqueue(request *request) { func (q *queue) Enqueue(request *request) {
q.requests = append(q.requests, request) request.removeFromQueueFn = q.requests.Enqueue(request)
} }
// Dequeue dequeues a request from the queue // Dequeue dequeues a request from the queue
func (q *queue) Dequeue() (*request, bool) { func (q *queue) Dequeue() (*request, bool) {
if len(q.requests) == 0 { request, ok := q.requests.Dequeue()
return nil, false return request, ok
}
request := q.requests[0]
q.requests = q.requests[1:]
return request, true
} }
// GetVirtualFinish returns the expected virtual finish time of the request at // GetVirtualFinish returns the expected virtual finish time of the request at
@ -104,8 +106,9 @@ func (q *queue) GetVirtualFinish(J int, G float64) float64 {
} }
func (q *queue) dump(includeDetails bool) debug.QueueDump { func (q *queue) dump(includeDetails bool) debug.QueueDump {
digest := make([]debug.RequestDump, len(q.requests)) digest := make([]debug.RequestDump, q.requests.Length())
for i, r := range q.requests { i := 0
q.requests.Walk(func(r *request) bool {
// dump requests. // dump requests.
digest[i].MatchedFlowSchema = r.fsName digest[i].MatchedFlowSchema = r.fsName
digest[i].FlowDistinguisher = r.flowDistinguisher digest[i].FlowDistinguisher = r.flowDistinguisher
@ -119,7 +122,9 @@ func (q *queue) dump(includeDetails bool) debug.QueueDump {
digest[i].RequestInfo = *requestInfo digest[i].RequestInfo = *requestInfo
} }
} }
} i++
return true
})
return debug.QueueDump{ return debug.QueueDump{
VirtualStart: q.virtualStart, VirtualStart: q.virtualStart,
Requests: digest, Requests: digest,