mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
apf: use a list instead of slice for queueset
This commit is contained in:
parent
677210ba32
commit
69f9bc181f
@ -0,0 +1,102 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2021 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package queueset
|
||||||
|
|
||||||
|
import (
|
||||||
|
"container/list"
|
||||||
|
)
|
||||||
|
|
||||||
|
// removeFromFIFOFunc removes a designated element from the list.
|
||||||
|
// The complexity of the runtime cost is O(1)
|
||||||
|
// It returns the request removed from the list.
|
||||||
|
type removeFromFIFOFunc func() *request
|
||||||
|
|
||||||
|
// walkFunc is called for each request in the list in the
|
||||||
|
// oldest -> newest order.
|
||||||
|
// ok: if walkFunc returns false then the iteration stops immediately.
|
||||||
|
type walkFunc func(*request) (ok bool)
|
||||||
|
|
||||||
|
// Internal interface to abstract out the implementation details
|
||||||
|
// of the underlying list used to maintain the requests.
|
||||||
|
//
|
||||||
|
// Note that the FIFO list is not safe for concurrent use by multiple
|
||||||
|
// goroutines without additional locking or coordination. It rests with
|
||||||
|
// the user to ensure that the FIFO list is used with proper locking.
|
||||||
|
type fifo interface {
|
||||||
|
// Enqueue enqueues the specified request into the list and
|
||||||
|
// returns a removeFromFIFOFunc function that can be used to remove the
|
||||||
|
// request from the list
|
||||||
|
Enqueue(*request) removeFromFIFOFunc
|
||||||
|
|
||||||
|
// Dequeue pulls out the oldest request from the list.
|
||||||
|
Dequeue() (*request, bool)
|
||||||
|
|
||||||
|
// Length returns the number of requests in the list.
|
||||||
|
Length() int
|
||||||
|
|
||||||
|
// Walk iterates through the list in order of oldest -> newest
|
||||||
|
// and executes the specified walkFunc for each request in that order.
|
||||||
|
//
|
||||||
|
// if the specified walkFunc returns false the Walk function
|
||||||
|
// stops the walk an returns immediately.
|
||||||
|
Walk(walkFunc)
|
||||||
|
}
|
||||||
|
|
||||||
|
// the FIFO list implementation is not safe for concurrent use by multiple
|
||||||
|
// goroutines without additional locking or coordination.
|
||||||
|
type requestFIFO struct {
|
||||||
|
*list.List
|
||||||
|
}
|
||||||
|
|
||||||
|
func newRequestFIFO() fifo {
|
||||||
|
return &requestFIFO{
|
||||||
|
List: list.New(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *requestFIFO) Length() int {
|
||||||
|
return l.Len()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *requestFIFO) Enqueue(req *request) removeFromFIFOFunc {
|
||||||
|
e := l.PushBack(req)
|
||||||
|
return func() *request {
|
||||||
|
l.Remove(e)
|
||||||
|
return req
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *requestFIFO) Dequeue() (*request, bool) {
|
||||||
|
e := l.Front()
|
||||||
|
if e == nil {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
defer l.Remove(e)
|
||||||
|
|
||||||
|
request, ok := e.Value.(*request)
|
||||||
|
return request, ok
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *requestFIFO) Walk(f walkFunc) {
|
||||||
|
for current := l.Front(); current != nil; current = current.Next() {
|
||||||
|
if r, ok := current.Value.(*request); ok {
|
||||||
|
if !f(r) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,183 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2021 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package queueset
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/rand"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestFIFOWithEnqueueDequeueSingleRequest(t *testing.T) {
|
||||||
|
req := &request{}
|
||||||
|
|
||||||
|
list := newRequestFIFO()
|
||||||
|
list.Enqueue(req)
|
||||||
|
|
||||||
|
reqGot, ok := list.Dequeue()
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("Expected true, but got: %t", ok)
|
||||||
|
}
|
||||||
|
if req != reqGot {
|
||||||
|
t.Errorf("Expected dequued request: (%p), but got: (%p)", req, reqGot)
|
||||||
|
}
|
||||||
|
if list.Length() != 0 {
|
||||||
|
t.Errorf("Expected length: %d, but got: %d)", 0, list.Length())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFIFOWithEnqueueDequeueMultipleRequests(t *testing.T) {
|
||||||
|
arrival := []*request{{}, {}, {}, {}, {}, {}}
|
||||||
|
|
||||||
|
list := newRequestFIFO()
|
||||||
|
for i := range arrival {
|
||||||
|
list.Enqueue(arrival[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
dequeued := make([]*request, 0)
|
||||||
|
for list.Length() > 0 {
|
||||||
|
req, _ := list.Dequeue()
|
||||||
|
dequeued = append(dequeued, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
verifyOrder(t, arrival, dequeued)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFIFOWithEnqueueDequeueSomeRequestsRemainInQueue(t *testing.T) {
|
||||||
|
list := newRequestFIFO()
|
||||||
|
|
||||||
|
arrival := []*request{{}, {}, {}, {}, {}, {}}
|
||||||
|
half := len(arrival) / 2
|
||||||
|
for i := range arrival {
|
||||||
|
list.Enqueue(arrival[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
dequeued := make([]*request, 0)
|
||||||
|
for i := 0; i < half; i++ {
|
||||||
|
req, _ := list.Dequeue()
|
||||||
|
dequeued = append(dequeued, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
verifyOrder(t, arrival[:half], dequeued)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFIFOWithRemoveMultipleRequestsInArrivalOrder(t *testing.T) {
|
||||||
|
list := newRequestFIFO()
|
||||||
|
|
||||||
|
arrival := []*request{{}, {}, {}, {}, {}, {}}
|
||||||
|
removeFn := make([]removeFromFIFOFunc, 0)
|
||||||
|
for i := range arrival {
|
||||||
|
removeFn = append(removeFn, list.Enqueue(arrival[i]))
|
||||||
|
}
|
||||||
|
|
||||||
|
dequeued := make([]*request, 0)
|
||||||
|
for _, f := range removeFn {
|
||||||
|
dequeued = append(dequeued, f())
|
||||||
|
}
|
||||||
|
|
||||||
|
if list.Length() != 0 {
|
||||||
|
t.Errorf("Expected length: %d, but got: %d)", 0, list.Length())
|
||||||
|
}
|
||||||
|
|
||||||
|
verifyOrder(t, arrival, dequeued)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFIFOWithRemoveMultipleRequestsInRandomOrder(t *testing.T) {
|
||||||
|
list := newRequestFIFO()
|
||||||
|
|
||||||
|
arrival := []*request{{}, {}, {}, {}, {}, {}}
|
||||||
|
removeFn := make([]removeFromFIFOFunc, 0)
|
||||||
|
for i := range arrival {
|
||||||
|
removeFn = append(removeFn, list.Enqueue(arrival[i]))
|
||||||
|
}
|
||||||
|
|
||||||
|
dequeued := make([]*request, 0)
|
||||||
|
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
randomIndices := r.Perm(len(removeFn))
|
||||||
|
t.Logf("Random remove order: %v", randomIndices)
|
||||||
|
for i := range randomIndices {
|
||||||
|
dequeued = append(dequeued, removeFn[i]())
|
||||||
|
}
|
||||||
|
|
||||||
|
if list.Length() != 0 {
|
||||||
|
t.Errorf("Expected length: %d, but got: %d)", 0, list.Length())
|
||||||
|
}
|
||||||
|
|
||||||
|
verifyOrder(t, arrival, dequeued)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFIFOWithRemoveIsIdempotent(t *testing.T) {
|
||||||
|
list := newRequestFIFO()
|
||||||
|
|
||||||
|
arrival := []*request{{}, {}, {}, {}}
|
||||||
|
removeFn := make([]removeFromFIFOFunc, 0)
|
||||||
|
for i := range arrival {
|
||||||
|
removeFn = append(removeFn, list.Enqueue(arrival[i]))
|
||||||
|
}
|
||||||
|
|
||||||
|
// pick one request to be removed at random
|
||||||
|
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
randomIndex := r.Intn(len(removeFn))
|
||||||
|
t.Logf("Random remove index: %d", randomIndex)
|
||||||
|
|
||||||
|
// remove the request from the fifo twice, we expect it to be idempotent
|
||||||
|
removeFn[randomIndex]()
|
||||||
|
removeFn[randomIndex]()
|
||||||
|
|
||||||
|
lengthExpected := len(arrival) - 1
|
||||||
|
if lengthExpected != list.Length() {
|
||||||
|
t.Errorf("Expected length: %d, but got: %d)", lengthExpected, list.Length())
|
||||||
|
}
|
||||||
|
|
||||||
|
orderExpected := append(arrival[0:randomIndex], arrival[randomIndex+1:]...)
|
||||||
|
remainingRequests := walkAll(list)
|
||||||
|
verifyOrder(t, orderExpected, remainingRequests)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFIFOWithWalk(t *testing.T) {
|
||||||
|
list := newRequestFIFO()
|
||||||
|
|
||||||
|
arrival := []*request{{}, {}, {}, {}, {}, {}}
|
||||||
|
for i := range arrival {
|
||||||
|
list.Enqueue(arrival[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
visited := walkAll(list)
|
||||||
|
|
||||||
|
verifyOrder(t, arrival, visited)
|
||||||
|
}
|
||||||
|
|
||||||
|
func verifyOrder(t *testing.T, expected, actual []*request) {
|
||||||
|
if len(expected) != len(actual) {
|
||||||
|
t.Fatalf("Expected slice length: %d, but got: %d", len(expected), len(actual))
|
||||||
|
}
|
||||||
|
for i := range expected {
|
||||||
|
if expected[i] != actual[i] {
|
||||||
|
t.Errorf("Dequeue order mismatch, expected request: (%p), but got: (%p)", expected[i], actual[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func walkAll(l fifo) []*request {
|
||||||
|
visited := make([]*request, 0)
|
||||||
|
l.Walk(func(req *request) bool {
|
||||||
|
visited = append(visited, req)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
return visited
|
||||||
|
}
|
@ -165,7 +165,7 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet {
|
|||||||
func createQueues(n, baseIndex int) []*queue {
|
func createQueues(n, baseIndex int) []*queue {
|
||||||
fqqueues := make([]*queue, n)
|
fqqueues := make([]*queue, n)
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
fqqueues[i] = &queue{index: baseIndex + i, requests: make([]*request, 0)}
|
fqqueues[i] = &queue{index: baseIndex + i, requests: newRequestFIFO()}
|
||||||
}
|
}
|
||||||
return fqqueues
|
return fqqueues
|
||||||
}
|
}
|
||||||
@ -407,7 +407,7 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 {
|
|||||||
reqs := 0
|
reqs := 0
|
||||||
for _, queue := range qs.queues {
|
for _, queue := range qs.queues {
|
||||||
reqs += queue.requestsExecuting
|
reqs += queue.requestsExecuting
|
||||||
if len(queue.requests) > 0 || queue.requestsExecuting > 0 {
|
if queue.requests.Length() > 0 || queue.requestsExecuting > 0 {
|
||||||
activeQueues++
|
activeQueues++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -453,7 +453,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
|
|||||||
if ok := qs.rejectOrEnqueueLocked(req); !ok {
|
if ok := qs.rejectOrEnqueueLocked(req); !ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
metrics.ObserveQueueLength(ctx, qs.qCfg.Name, fsName, len(queue.requests))
|
metrics.ObserveQueueLength(ctx, qs.qCfg.Name, fsName, queue.requests.Length())
|
||||||
return req
|
return req
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -464,7 +464,7 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte
|
|||||||
bestQueueLen := int(math.MaxInt32)
|
bestQueueLen := int(math.MaxInt32)
|
||||||
// the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`.
|
// the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`.
|
||||||
qs.dealer.Deal(hashValue, func(queueIdx int) {
|
qs.dealer.Deal(hashValue, func(queueIdx int) {
|
||||||
thisLen := len(qs.queues[queueIdx].requests)
|
thisLen := qs.queues[queueIdx].requests.Length()
|
||||||
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of length %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisLen)
|
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of length %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisLen)
|
||||||
if thisLen < bestQueueLen {
|
if thisLen < bestQueueLen {
|
||||||
bestQueueIdx, bestQueueLen = queueIdx, thisLen
|
bestQueueIdx, bestQueueLen = queueIdx, thisLen
|
||||||
@ -477,7 +477,7 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte
|
|||||||
// removeTimedOutRequestsFromQueueLocked rejects old requests that have been enqueued
|
// removeTimedOutRequestsFromQueueLocked rejects old requests that have been enqueued
|
||||||
// past the requestWaitLimit
|
// past the requestWaitLimit
|
||||||
func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName string) {
|
func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName string) {
|
||||||
timeoutIdx := -1
|
timeoutCount := 0
|
||||||
now := qs.clock.Now()
|
now := qs.clock.Now()
|
||||||
reqs := queue.requests
|
reqs := queue.requests
|
||||||
// reqs are sorted oldest -> newest
|
// reqs are sorted oldest -> newest
|
||||||
@ -486,26 +486,31 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s
|
|||||||
|
|
||||||
// now - requestWaitLimit = waitLimit
|
// now - requestWaitLimit = waitLimit
|
||||||
waitLimit := now.Add(-qs.qCfg.RequestWaitLimit)
|
waitLimit := now.Add(-qs.qCfg.RequestWaitLimit)
|
||||||
for i, req := range reqs {
|
reqs.Walk(func(req *request) bool {
|
||||||
if waitLimit.After(req.arrivalTime) {
|
if waitLimit.After(req.arrivalTime) {
|
||||||
req.decision.SetLocked(decisionReject)
|
req.decision.SetLocked(decisionReject)
|
||||||
// get index for timed out requests
|
timeoutCount++
|
||||||
timeoutIdx = i
|
|
||||||
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
|
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
|
||||||
req.NoteQueued(false)
|
req.NoteQueued(false)
|
||||||
} else {
|
|
||||||
break
|
// we need to check if the next request has timed out.
|
||||||
}
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// since reqs are sorted oldest -> newest, we are done here.
|
||||||
|
return false
|
||||||
|
})
|
||||||
|
|
||||||
// remove timed out requests from queue
|
// remove timed out requests from queue
|
||||||
if timeoutIdx != -1 {
|
if timeoutCount > 0 {
|
||||||
// timeoutIdx + 1 to remove the last timeout req
|
// The number of requests we have timed out is timeoutCount,
|
||||||
removeIdx := timeoutIdx + 1
|
// so, let's dequeue the exact number of requests for this queue.
|
||||||
// remove all the timeout requests
|
for i := 0; i < timeoutCount; i++ {
|
||||||
queue.requests = reqs[removeIdx:]
|
queue.requests.Dequeue()
|
||||||
|
}
|
||||||
// decrement the # of requestsEnqueued
|
// decrement the # of requestsEnqueued
|
||||||
qs.totRequestsWaiting -= removeIdx
|
qs.totRequestsWaiting -= timeoutCount
|
||||||
qs.obsPair.RequestsWaiting.Add(float64(-removeIdx))
|
qs.obsPair.RequestsWaiting.Add(float64(-timeoutCount))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -515,7 +520,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s
|
|||||||
// Otherwise enqueues and returns true.
|
// Otherwise enqueues and returns true.
|
||||||
func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
|
func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
|
||||||
queue := request.queue
|
queue := request.queue
|
||||||
curQueueLength := len(queue.requests)
|
curQueueLength := queue.requests.Length()
|
||||||
// rejects the newly arrived request if resource criteria not met
|
// rejects the newly arrived request if resource criteria not met
|
||||||
if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit &&
|
if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit &&
|
||||||
curQueueLength >= qs.qCfg.QueueLengthLimit {
|
curQueueLength >= qs.qCfg.QueueLengthLimit {
|
||||||
@ -530,7 +535,7 @@ func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
|
|||||||
func (qs *queueSet) enqueueLocked(request *request) {
|
func (qs *queueSet) enqueueLocked(request *request) {
|
||||||
queue := request.queue
|
queue := request.queue
|
||||||
now := qs.clock.Now()
|
now := qs.clock.Now()
|
||||||
if len(queue.requests) == 0 && queue.requestsExecuting == 0 {
|
if queue.requests.Length() == 0 && queue.requestsExecuting == 0 {
|
||||||
// the queue’s virtual start time is set to the virtual time.
|
// the queue’s virtual start time is set to the virtual time.
|
||||||
queue.virtualStart = qs.virtualTime
|
queue.virtualStart = qs.virtualTime
|
||||||
if klog.V(6).Enabled() {
|
if klog.V(6).Enabled() {
|
||||||
@ -610,7 +615,9 @@ func (qs *queueSet) dispatchLocked() bool {
|
|||||||
qs.obsPair.RequestsWaiting.Add(-1)
|
qs.obsPair.RequestsWaiting.Add(-1)
|
||||||
qs.obsPair.RequestsExecuting.Add(1)
|
qs.obsPair.RequestsExecuting.Add(1)
|
||||||
if klog.V(6).Enabled() {
|
if klog.V(6).Enabled() {
|
||||||
klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.index, queue.virtualStart, len(queue.requests), queue.requestsExecuting)
|
klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing",
|
||||||
|
qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2,
|
||||||
|
queue.index, queue.virtualStart, queue.requests.Length(), queue.requestsExecuting)
|
||||||
}
|
}
|
||||||
// When a request is dequeued for service -> qs.virtualStart += G
|
// When a request is dequeued for service -> qs.virtualStart += G
|
||||||
queue.virtualStart += qs.estimatedServiceTime
|
queue.virtualStart += qs.estimatedServiceTime
|
||||||
@ -629,19 +636,13 @@ func (qs *queueSet) cancelWait(req *request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
req.decision.SetLocked(decisionCancel)
|
req.decision.SetLocked(decisionCancel)
|
||||||
queue := req.queue
|
|
||||||
// remove the request from the queue as it has timed out
|
// remove the request from the queue as it has timed out
|
||||||
for i := range queue.requests {
|
req.removeFromQueueFn()
|
||||||
if req == queue.requests[i] {
|
|
||||||
// remove the request
|
|
||||||
queue.requests = append(queue.requests[:i], queue.requests[i+1:]...)
|
|
||||||
qs.totRequestsWaiting--
|
qs.totRequestsWaiting--
|
||||||
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
|
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
|
||||||
req.NoteQueued(false)
|
req.NoteQueued(false)
|
||||||
qs.obsPair.RequestsWaiting.Add(-1)
|
qs.obsPair.RequestsWaiting.Add(-1)
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// selectQueueLocked examines the queues in round robin order and
|
// selectQueueLocked examines the queues in round robin order and
|
||||||
@ -655,7 +656,7 @@ func (qs *queueSet) selectQueueLocked() *queue {
|
|||||||
for range qs.queues {
|
for range qs.queues {
|
||||||
qs.robinIndex = (qs.robinIndex + 1) % nq
|
qs.robinIndex = (qs.robinIndex + 1) % nq
|
||||||
queue := qs.queues[qs.robinIndex]
|
queue := qs.queues[qs.robinIndex]
|
||||||
if len(queue.requests) != 0 {
|
if queue.requests.Length() != 0 {
|
||||||
|
|
||||||
currentVirtualFinish := queue.GetVirtualFinish(0, qs.estimatedServiceTime)
|
currentVirtualFinish := queue.GetVirtualFinish(0, qs.estimatedServiceTime)
|
||||||
if currentVirtualFinish < minVirtualFinish {
|
if currentVirtualFinish < minVirtualFinish {
|
||||||
@ -729,13 +730,15 @@ func (qs *queueSet) finishRequestLocked(r *request) {
|
|||||||
r.queue.requestsExecuting--
|
r.queue.requestsExecuting--
|
||||||
|
|
||||||
if klog.V(6).Enabled() {
|
if klog.V(6).Enabled() {
|
||||||
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index, r.queue.virtualStart, S, len(r.queue.requests), r.queue.requestsExecuting)
|
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing",
|
||||||
|
qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index,
|
||||||
|
r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requestsExecuting)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If there are more queues than desired and this one has no
|
// If there are more queues than desired and this one has no
|
||||||
// requests then remove it
|
// requests then remove it
|
||||||
if len(qs.queues) > qs.qCfg.DesiredNumQueues &&
|
if len(qs.queues) > qs.qCfg.DesiredNumQueues &&
|
||||||
len(r.queue.requests) == 0 &&
|
r.queue.requests.Length() == 0 &&
|
||||||
r.queue.requestsExecuting == 0 {
|
r.queue.requestsExecuting == 0 {
|
||||||
qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.queue.index)
|
qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.queue.index)
|
||||||
|
|
||||||
|
@ -61,12 +61,17 @@ type request struct {
|
|||||||
waitStarted bool
|
waitStarted bool
|
||||||
|
|
||||||
queueNoteFn fq.QueueNoteFn
|
queueNoteFn fq.QueueNoteFn
|
||||||
|
|
||||||
|
// Removes this request from its queue. If the request is not put into a
|
||||||
|
// a queue it will be nil.
|
||||||
|
removeFromQueueFn removeFromFIFOFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// queue is an array of requests with additional metadata required for
|
// queue is an array of requests with additional metadata required for
|
||||||
// the FQScheduler
|
// the FQScheduler
|
||||||
type queue struct {
|
type queue struct {
|
||||||
requests []*request
|
// The requests are stored in a FIFO list.
|
||||||
|
requests fifo
|
||||||
|
|
||||||
// virtualStart is the virtual time (virtual seconds since process
|
// virtualStart is the virtual time (virtual seconds since process
|
||||||
// startup) when the oldest request in the queue (if there is any)
|
// startup) when the oldest request in the queue (if there is any)
|
||||||
@ -77,19 +82,16 @@ type queue struct {
|
|||||||
index int
|
index int
|
||||||
}
|
}
|
||||||
|
|
||||||
// Enqueue enqueues a request into the queue
|
// Enqueue enqueues a request into the queue and
|
||||||
|
// sets the removeFromQueueFn of the request appropriately.
|
||||||
func (q *queue) Enqueue(request *request) {
|
func (q *queue) Enqueue(request *request) {
|
||||||
q.requests = append(q.requests, request)
|
request.removeFromQueueFn = q.requests.Enqueue(request)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Dequeue dequeues a request from the queue
|
// Dequeue dequeues a request from the queue
|
||||||
func (q *queue) Dequeue() (*request, bool) {
|
func (q *queue) Dequeue() (*request, bool) {
|
||||||
if len(q.requests) == 0 {
|
request, ok := q.requests.Dequeue()
|
||||||
return nil, false
|
return request, ok
|
||||||
}
|
|
||||||
request := q.requests[0]
|
|
||||||
q.requests = q.requests[1:]
|
|
||||||
return request, true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetVirtualFinish returns the expected virtual finish time of the request at
|
// GetVirtualFinish returns the expected virtual finish time of the request at
|
||||||
@ -104,8 +106,9 @@ func (q *queue) GetVirtualFinish(J int, G float64) float64 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (q *queue) dump(includeDetails bool) debug.QueueDump {
|
func (q *queue) dump(includeDetails bool) debug.QueueDump {
|
||||||
digest := make([]debug.RequestDump, len(q.requests))
|
digest := make([]debug.RequestDump, q.requests.Length())
|
||||||
for i, r := range q.requests {
|
i := 0
|
||||||
|
q.requests.Walk(func(r *request) bool {
|
||||||
// dump requests.
|
// dump requests.
|
||||||
digest[i].MatchedFlowSchema = r.fsName
|
digest[i].MatchedFlowSchema = r.fsName
|
||||||
digest[i].FlowDistinguisher = r.flowDistinguisher
|
digest[i].FlowDistinguisher = r.flowDistinguisher
|
||||||
@ -119,7 +122,9 @@ func (q *queue) dump(includeDetails bool) debug.QueueDump {
|
|||||||
digest[i].RequestInfo = *requestInfo
|
digest[i].RequestInfo = *requestInfo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
i++
|
||||||
|
return true
|
||||||
|
})
|
||||||
return debug.QueueDump{
|
return debug.QueueDump{
|
||||||
VirtualStart: q.virtualStart,
|
VirtualStart: q.virtualStart,
|
||||||
Requests: digest,
|
Requests: digest,
|
||||||
|
Loading…
Reference in New Issue
Block a user