Add tracking and reporting of executing requests

Signed-off-by: Mike Spreitzer <mspreitz@us.ibm.com>
This commit is contained in:
Mike Spreitzer 2023-06-30 22:55:35 -04:00
parent 2a91bd1dfd
commit a8a2fb317c
5 changed files with 219 additions and 118 deletions

View File

@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/server/mux" "k8s.io/apiserver/pkg/server/mux"
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
) )
const ( const (
@ -154,25 +155,25 @@ func (cfgCtlr *configController) dumpRequests(w http.ResponseWriter, r *http.Req
"InitialSeats", // 7 "InitialSeats", // 7
"FinalSeats", // 8 "FinalSeats", // 8
"AdditionalLatency", // 9 "AdditionalLatency", // 9
"StartTime", // 10
})) }))
if includeRequestDetails { if includeRequestDetails {
continueLine(tabWriter) continueLine(tabWriter)
tabPrint(tabWriter, rowForHeaders([]string{ tabPrint(tabWriter, rowForHeaders([]string{
"UserName", // 10 "UserName", // 11
"Verb", // 11 "Verb", // 12
"APIPath", // 12 "APIPath", // 13
"Namespace", // 13 "Namespace", // 14
"Name", // 14 "Name", // 15
"APIVersion", // 15 "APIVersion", // 16
"Resource", // 16 "Resource", // 17
"SubResource", // 17 "SubResource", // 18
})) }))
} }
endLine(tabWriter) endLine(tabWriter)
for _, plState := range cfgCtlr.priorityLevelStates { for _, plState := range cfgCtlr.priorityLevelStates {
queueSetDigest := plState.queues.Dump(includeRequestDetails) queueSetDigest := plState.queues.Dump(includeRequestDetails)
for iq, q := range queueSetDigest.Queues { dumpRequest := func(iq, ir int, r debug.RequestDump) {
for ir, r := range q.Requests {
tabPrint(tabWriter, row( tabPrint(tabWriter, row(
plState.pl.Name, // 1 plState.pl.Name, // 1
r.MatchedFlowSchema, // 2 r.MatchedFlowSchema, // 2
@ -183,25 +184,36 @@ func (cfgCtlr *configController) dumpRequests(w http.ResponseWriter, r *http.Req
strconv.Itoa(int(r.WorkEstimate.InitialSeats)), // 7 strconv.Itoa(int(r.WorkEstimate.InitialSeats)), // 7
strconv.Itoa(int(r.WorkEstimate.FinalSeats)), // 8 strconv.Itoa(int(r.WorkEstimate.FinalSeats)), // 8
r.WorkEstimate.AdditionalLatency.String(), // 9 r.WorkEstimate.AdditionalLatency.String(), // 9
r.StartTime.UTC().Format(time.RFC3339Nano), // 10
)) ))
if includeRequestDetails { if includeRequestDetails {
continueLine(tabWriter) continueLine(tabWriter)
tabPrint(tabWriter, rowForRequestDetails( tabPrint(tabWriter, rowForRequestDetails(
r.UserName, // 10 r.UserName, // 11
r.RequestInfo.Verb, // 11 r.RequestInfo.Verb, // 12
r.RequestInfo.Path, // 12 r.RequestInfo.Path, // 13
r.RequestInfo.Namespace, // 13 r.RequestInfo.Namespace, // 14
r.RequestInfo.Name, // 14 r.RequestInfo.Name, // 15
schema.GroupVersion{ schema.GroupVersion{
Group: r.RequestInfo.APIGroup, Group: r.RequestInfo.APIGroup,
Version: r.RequestInfo.APIVersion, Version: r.RequestInfo.APIVersion,
}.String(), // 15 }.String(), // 16
r.RequestInfo.Resource, // 16 r.RequestInfo.Resource, // 17
r.RequestInfo.Subresource, // 17 r.RequestInfo.Subresource, // 18
)) ))
} }
endLine(tabWriter) endLine(tabWriter)
} }
for iq, q := range queueSetDigest.Queues {
for ir, r := range q.Requests {
dumpRequest(iq, ir, r)
}
for _, r := range q.RequestsExecuting {
dumpRequest(iq, -1, r)
}
}
for _, r := range queueSetDigest.QueuelessExecutingRequests {
dumpRequest(-1, -1, r)
} }
} }
runtime.HandleError(tabWriter.Flush()) runtime.HandleError(tabWriter.Flush())

View File

@ -26,6 +26,7 @@ import (
// QueueSetDump is an instant dump of queue-set. // QueueSetDump is an instant dump of queue-set.
type QueueSetDump struct { type QueueSetDump struct {
Queues []QueueDump Queues []QueueDump
QueuelessExecutingRequests []RequestDump
Waiting int Waiting int
Executing int Executing int
SeatsInUse int SeatsInUse int
@ -39,7 +40,8 @@ type QueueSetDump struct {
// QueueDump is an instant dump of one queue in a queue-set. // QueueDump is an instant dump of one queue in a queue-set.
type QueueDump struct { type QueueDump struct {
QueueSum QueueSum QueueSum QueueSum
Requests []RequestDump Requests []RequestDump // just the waiting ones
RequestsExecuting []RequestDump
NextDispatchR string NextDispatchR string
ExecutingRequests int ExecutingRequests int
SeatsInUse int SeatsInUse int

View File

@ -24,6 +24,7 @@ import (
"sync" "sync"
"time" "time"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/util/flowcontrol/debug" "k8s.io/apiserver/pkg/util/flowcontrol/debug"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/eventclock" "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/eventclock"
@ -138,6 +139,10 @@ type queueSet struct {
// from that queue. // from that queue.
totRequestsExecuting int totRequestsExecuting int
// requestsExecutingSet is the set of requests executing in the real world IF
// there are no queues; otherwise the requests are tracked in the queues.
requestsExecutingSet sets.Set[*request]
// totSeatsInUse is the number of total "seats" in use by all the // totSeatsInUse is the number of total "seats" in use by all the
// request(s) that are currently executing in this queueset. // request(s) that are currently executing in this queueset.
totSeatsInUse int totSeatsInUse int
@ -219,6 +224,7 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet {
qCfg: qsc.qCfg, qCfg: qsc.qCfg,
currentR: 0, currentR: 0,
lastRealTime: qsc.factory.clock.Now(), lastRealTime: qsc.factory.clock.Now(),
requestsExecutingSet: sets.New[*request](),
} }
qs.promiseFactory = qsc.factory.promiseFactoryFactory(qs) qs.promiseFactory = qsc.factory.promiseFactoryFactory(qs)
} }
@ -230,7 +236,7 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet {
func createQueues(n, baseIndex int) []*queue { func createQueues(n, baseIndex int) []*queue {
fqqueues := make([]*queue, n) fqqueues := make([]*queue, n)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
fqqueues[i] = &queue{index: baseIndex + i, requests: newRequestFIFO()} fqqueues[i] = &queue{index: baseIndex + i, requestsWaiting: newRequestFIFO(), requestsExecuting: sets.New[*request]()}
} }
return fqqueues return fqqueues
} }
@ -504,7 +510,7 @@ func (qs *queueSet) advanceEpoch(ctx context.Context, now time.Time, incrR fqreq
klog.InfoS("Advancing epoch", "QS", qs.qCfg.Name, "when", now.Format(nsTimeFmt), "oldR", oldR, "newR", qs.currentR, "incrR", incrR) klog.InfoS("Advancing epoch", "QS", qs.qCfg.Name, "when", now.Format(nsTimeFmt), "oldR", oldR, "newR", qs.currentR, "incrR", incrR)
success := true success := true
for qIdx, queue := range qs.queues { for qIdx, queue := range qs.queues {
if queue.requests.Length() == 0 && queue.requestsExecuting == 0 { if queue.requestsWaiting.Length() == 0 && queue.requestsExecuting.Len() == 0 {
// Do not just decrement, the value could be quite outdated. // Do not just decrement, the value could be quite outdated.
// It is safe to reset to zero in this case, because the next request // It is safe to reset to zero in this case, because the next request
// will overwrite the zero with `qs.currentR`. // will overwrite the zero with `qs.currentR`.
@ -517,7 +523,7 @@ func (qs *queueSet) advanceEpoch(ctx context.Context, now time.Time, incrR fqreq
klog.ErrorS(errors.New("queue::nextDispatchR underflow"), "Underflow", "QS", qs.qCfg.Name, "queue", qIdx, "oldNextDispatchR", oldNextDispatchR, "newNextDispatchR", queue.nextDispatchR, "incrR", incrR) klog.ErrorS(errors.New("queue::nextDispatchR underflow"), "Underflow", "QS", qs.qCfg.Name, "queue", qIdx, "oldNextDispatchR", oldNextDispatchR, "newNextDispatchR", queue.nextDispatchR, "incrR", incrR)
success = false success = false
} }
queue.requests.Walk(func(req *request) bool { queue.requestsWaiting.Walk(func(req *request) bool {
oldArrivalR := req.arrivalR oldArrivalR := req.arrivalR
req.arrivalR -= rDecrement req.arrivalR -= rDecrement
if req.arrivalR > oldArrivalR { if req.arrivalR > oldArrivalR {
@ -538,8 +544,8 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 {
for _, queue := range qs.queues { for _, queue := range qs.queues {
// here we want the sum of the maximum width of the requests in this queue since our // here we want the sum of the maximum width of the requests in this queue since our
// goal is to find the maximum rate at which the queue could work. // goal is to find the maximum rate at which the queue could work.
seatsRequested += (queue.seatsInUse + queue.requests.QueueSum().MaxSeatsSum) seatsRequested += (queue.seatsInUse + queue.requestsWaiting.QueueSum().MaxSeatsSum)
if queue.requests.Length() > 0 || queue.requestsExecuting > 0 { if queue.requestsWaiting.Length() > 0 || queue.requestsExecuting.Len() > 0 {
activeQueues++ activeQueues++
} }
} }
@ -589,7 +595,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
if ok := qs.rejectOrEnqueueToBoundLocked(req); !ok { if ok := qs.rejectOrEnqueueToBoundLocked(req); !ok {
return nil return nil
} }
metrics.ObserveQueueLength(ctx, qs.qCfg.Name, fsName, queue.requests.Length()) metrics.ObserveQueueLength(ctx, qs.qCfg.Name, fsName, queue.requestsWaiting.Length())
return req return req
} }
@ -608,7 +614,7 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac
for i := 0; i < handSize; i++ { for i := 0; i < handSize; i++ {
queueIdx := hand[(offset+i)%handSize] queueIdx := hand[(offset+i)%handSize]
queue := qs.queues[queueIdx] queue := qs.queues[queueIdx]
queueSum := queue.requests.QueueSum() queueSum := queue.requestsWaiting.QueueSum()
// this is the total amount of work in seat-seconds for requests // this is the total amount of work in seat-seconds for requests
// waiting in this queue, we will select the queue with the minimum. // waiting in this queue, we will select the queue with the minimum.
@ -621,7 +627,7 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac
} }
if klogV := klog.V(6); klogV.Enabled() { if klogV := klog.V(6); klogV.Enabled() {
chosenQueue := qs.queues[bestQueueIdx] chosenQueue := qs.queues[bestQueueIdx]
klogV.Infof("QS(%s) at t=%s R=%v: For request %#+v %#+v chose queue %d, with sum: %#v & %d seats in use & nextDispatchR=%v", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.currentR, descr1, descr2, bestQueueIdx, chosenQueue.requests.QueueSum(), chosenQueue.seatsInUse, chosenQueue.nextDispatchR) klogV.Infof("QS(%s) at t=%s R=%v: For request %#+v %#+v chose queue %d, with sum: %#v & %d seats in use & nextDispatchR=%v", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.currentR, descr1, descr2, bestQueueIdx, chosenQueue.requestsWaiting.QueueSum(), chosenQueue.seatsInUse, chosenQueue.nextDispatchR)
} }
return bestQueueIdx return bestQueueIdx
} }
@ -632,7 +638,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueToBoundLocked(queue *queue, f
timeoutCount := 0 timeoutCount := 0
disqueueSeats := 0 disqueueSeats := 0
now := qs.clock.Now() now := qs.clock.Now()
reqs := queue.requests reqs := queue.requestsWaiting
// reqs are sorted oldest -> newest // reqs are sorted oldest -> newest
// can short circuit loop (break) if oldest requests are not timing out // can short circuit loop (break) if oldest requests are not timing out
// as newer requests also will not have timed out // as newer requests also will not have timed out
@ -669,7 +675,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueToBoundLocked(queue *queue, f
// Otherwise enqueues and returns true. // Otherwise enqueues and returns true.
func (qs *queueSet) rejectOrEnqueueToBoundLocked(request *request) bool { func (qs *queueSet) rejectOrEnqueueToBoundLocked(request *request) bool {
queue := request.queue queue := request.queue
curQueueLength := queue.requests.Length() curQueueLength := queue.requestsWaiting.Length()
// rejects the newly arrived request if resource criteria not met // rejects the newly arrived request if resource criteria not met
if qs.totSeatsInUse >= qs.dCfg.ConcurrencyLimit && if qs.totSeatsInUse >= qs.dCfg.ConcurrencyLimit &&
curQueueLength >= qs.qCfg.QueueLengthLimit { curQueueLength >= qs.qCfg.QueueLengthLimit {
@ -684,7 +690,7 @@ func (qs *queueSet) rejectOrEnqueueToBoundLocked(request *request) bool {
func (qs *queueSet) enqueueToBoundLocked(request *request) { func (qs *queueSet) enqueueToBoundLocked(request *request) {
queue := request.queue queue := request.queue
now := qs.clock.Now() now := qs.clock.Now()
if queue.requests.Length() == 0 && queue.requestsExecuting == 0 { if queue.requestsWaiting.Length() == 0 && queue.requestsExecuting.Len() == 0 {
// the queues start R is set to the virtual time. // the queues start R is set to the virtual time.
queue.nextDispatchR = qs.currentR queue.nextDispatchR = qs.currentR
klogV := klog.V(6) klogV := klog.V(6)
@ -692,7 +698,7 @@ func (qs *queueSet) enqueueToBoundLocked(request *request) {
klogV.Infof("QS(%s) at t=%s R=%v: initialized queue %d start R due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.nextDispatchR, queue.index, request.descr1, request.descr2) klogV.Infof("QS(%s) at t=%s R=%v: initialized queue %d start R due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.nextDispatchR, queue.index, request.descr1, request.descr2)
} }
} }
request.removeFromQueueLocked = queue.requests.Enqueue(request) request.removeFromQueueLocked = queue.requestsWaiting.Enqueue(request)
qs.totRequestsWaiting++ qs.totRequestsWaiting++
qs.totSeatsWaiting += request.MaxSeats() qs.totSeatsWaiting += request.MaxSeats()
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1) metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1)
@ -725,6 +731,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
} }
qs.totRequestsExecuting++ qs.totRequestsExecuting++
qs.totSeatsInUse += req.MaxSeats() qs.totSeatsInUse += req.MaxSeats()
qs.requestsExecutingSet = qs.requestsExecutingSet.Insert(req)
metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1) metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1)
metrics.AddSeatConcurrencyInUse(qs.qCfg.Name, fsName, req.MaxSeats()) metrics.AddSeatConcurrencyInUse(qs.qCfg.Name, fsName, req.MaxSeats())
qs.reqsGaugePair.RequestsExecuting.Add(1) qs.reqsGaugePair.RequestsExecuting.Add(1)
@ -768,7 +775,7 @@ func (qs *queueSet) dispatchLocked() bool {
// problem because other overhead is also included. // problem because other overhead is also included.
qs.totRequestsExecuting++ qs.totRequestsExecuting++
qs.totSeatsInUse += request.MaxSeats() qs.totSeatsInUse += request.MaxSeats()
queue.requestsExecuting++ queue.requestsExecuting = queue.requestsExecuting.Insert(request)
queue.seatsInUse += request.MaxSeats() queue.seatsInUse += request.MaxSeats()
metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1) metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1)
metrics.AddSeatConcurrencyInUse(qs.qCfg.Name, request.fsName, request.MaxSeats()) metrics.AddSeatConcurrencyInUse(qs.qCfg.Name, request.fsName, request.MaxSeats())
@ -779,7 +786,7 @@ func (qs *queueSet) dispatchLocked() bool {
if klogV.Enabled() { if klogV.Enabled() {
klogV.Infof("QS(%s) at t=%s R=%v: dispatching request %#+v %#+v work %v from queue %d with start R %v, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied", klogV.Infof("QS(%s) at t=%s R=%v: dispatching request %#+v %#+v work %v from queue %d with start R %v, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied",
qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.currentR, request.descr1, request.descr2, qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.currentR, request.descr1, request.descr2,
request.workEstimate, queue.index, queue.nextDispatchR, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse) request.workEstimate, queue.index, queue.nextDispatchR, queue.requestsWaiting.Length(), queue.requestsExecuting.Len(), queue.seatsInUse, qs.totSeatsInUse)
} }
// When a request is dequeued for service -> qs.virtualStart += G * width // When a request is dequeued for service -> qs.virtualStart += G * width
if request.totalWork() > rDecrement/100 { // A single increment should never be so big if request.totalWork() > rDecrement/100 { // A single increment should never be so big
@ -834,7 +841,7 @@ func (qs *queueSet) findDispatchQueueToBoundLocked() (*queue, *request) {
for range qs.queues { for range qs.queues {
qs.robinIndex = (qs.robinIndex + 1) % nq qs.robinIndex = (qs.robinIndex + 1) % nq
queue := qs.queues[qs.robinIndex] queue := qs.queues[qs.robinIndex]
oldestWaiting, _ := queue.requests.Peek() oldestWaiting, _ := queue.requestsWaiting.Peek()
if oldestWaiting != nil { if oldestWaiting != nil {
sMin = ssMin(sMin, queue.nextDispatchR) sMin = ssMin(sMin, queue.nextDispatchR)
sMax = ssMax(sMax, queue.nextDispatchR) sMax = ssMax(sMax, queue.nextDispatchR)
@ -851,7 +858,7 @@ func (qs *queueSet) findDispatchQueueToBoundLocked() (*queue, *request) {
} }
} }
oldestReqFromMinQueue, _ := minQueue.requests.Peek() oldestReqFromMinQueue, _ := minQueue.requestsWaiting.Peek()
if oldestReqFromMinQueue == nil { if oldestReqFromMinQueue == nil {
// This cannot happen // This cannot happen
klog.ErrorS(errors.New("selected queue is empty"), "Impossible", "queueSet", qs.qCfg.Name) klog.ErrorS(errors.New("selected queue is empty"), "Impossible", "queueSet", qs.qCfg.Name)
@ -955,7 +962,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
} else if r.queue != nil { } else if r.queue != nil {
klogV.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished all use of %d seats, adjusted queue %d start R to %v due to service time %.9fs, queue will have %d requests with %#v waiting & %d requests occupying %d seats", klogV.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished all use of %d seats, adjusted queue %d start R to %v due to service time %.9fs, queue will have %d requests with %#v waiting & %d requests occupying %d seats",
qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.MaxSeats(), r.queue.index, qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.MaxSeats(), r.queue.index,
r.queue.nextDispatchR, actualServiceDuration.Seconds(), r.queue.requests.Length(), r.queue.requests.QueueSum(), r.queue.requestsExecuting, r.queue.seatsInUse) r.queue.nextDispatchR, actualServiceDuration.Seconds(), r.queue.requestsWaiting.Length(), r.queue.requestsWaiting.QueueSum(), r.queue.requestsExecuting.Len(), r.queue.seatsInUse)
} else { } else {
klogV.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished all use of %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, qs.totRequestsExecuting, qs.totSeatsInUse) klogV.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished all use of %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, qs.totRequestsExecuting, qs.totSeatsInUse)
} }
@ -967,7 +974,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
} else if r.queue != nil { } else if r.queue != nil {
klogV.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished main use of %d seats but lingering on %d seats for %v seconds, adjusted queue %d start R to %v due to service time %.9fs, queue will have %d requests with %#v waiting & %d requests occupying %d seats", klogV.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished main use of %d seats but lingering on %d seats for %v seconds, adjusted queue %d start R to %v due to service time %.9fs, queue will have %d requests with %#v waiting & %d requests occupying %d seats",
qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, r.workEstimate.FinalSeats, additionalLatency.Seconds(), r.queue.index, qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, r.workEstimate.FinalSeats, additionalLatency.Seconds(), r.queue.index,
r.queue.nextDispatchR, actualServiceDuration.Seconds(), r.queue.requests.Length(), r.queue.requests.QueueSum(), r.queue.requestsExecuting, r.queue.seatsInUse) r.queue.nextDispatchR, actualServiceDuration.Seconds(), r.queue.requestsWaiting.Length(), r.queue.requestsWaiting.QueueSum(), r.queue.requestsExecuting.Len(), r.queue.seatsInUse)
} else { } else {
klogV.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished main use of %d seats but lingering on %d seats for %v seconds, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, r.workEstimate.FinalSeats, additionalLatency.Seconds(), qs.totRequestsExecuting, qs.totSeatsInUse) klogV.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished main use of %d seats but lingering on %d seats for %v seconds, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, r.workEstimate.FinalSeats, additionalLatency.Seconds(), qs.totRequestsExecuting, qs.totSeatsInUse)
} }
@ -984,7 +991,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
} else if r.queue != nil { } else if r.queue != nil {
klogV.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished lingering on %d seats, queue %d will have %d requests with %#v waiting & %d requests occupying %d seats", klogV.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished lingering on %d seats, queue %d will have %d requests with %#v waiting & %d requests occupying %d seats",
qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.FinalSeats, r.queue.index, qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.FinalSeats, r.queue.index,
r.queue.requests.Length(), r.queue.requests.QueueSum(), r.queue.requestsExecuting, r.queue.seatsInUse) r.queue.requestsWaiting.Length(), r.queue.requestsWaiting.QueueSum(), r.queue.requestsExecuting.Len(), r.queue.seatsInUse)
} else { } else {
klogV.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished lingering on %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.FinalSeats, qs.totRequestsExecuting, qs.totSeatsInUse) klogV.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished lingering on %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.FinalSeats, qs.totRequestsExecuting, qs.totSeatsInUse)
} }
@ -994,12 +1001,14 @@ func (qs *queueSet) finishRequestLocked(r *request) {
if r.queue != nil { if r.queue != nil {
// request has finished, remove from requests executing // request has finished, remove from requests executing
r.queue.requestsExecuting-- r.queue.requestsExecuting = r.queue.requestsExecuting.Delete(r)
// When a request finishes being served, and the actual service time was S, // When a request finishes being served, and the actual service time was S,
// the queues start R is decremented by (G - S)*width. // the queues start R is decremented by (G - S)*width.
r.queue.nextDispatchR -= fqrequest.SeatsTimesDuration(float64(r.InitialSeats()), qs.estimatedServiceDuration-actualServiceDuration) r.queue.nextDispatchR -= fqrequest.SeatsTimesDuration(float64(r.InitialSeats()), qs.estimatedServiceDuration-actualServiceDuration)
qs.boundNextDispatchLocked(r.queue) qs.boundNextDispatchLocked(r.queue)
} else {
qs.requestsExecutingSet = qs.requestsExecutingSet.Delete(r)
} }
} }
@ -1011,7 +1020,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
// The following hack addresses the first side of that inequity, // The following hack addresses the first side of that inequity,
// by insisting that dispatch in the virtual world not precede arrival. // by insisting that dispatch in the virtual world not precede arrival.
func (qs *queueSet) boundNextDispatchLocked(queue *queue) { func (qs *queueSet) boundNextDispatchLocked(queue *queue) {
oldestReqFromMinQueue, _ := queue.requests.Peek() oldestReqFromMinQueue, _ := queue.requestsWaiting.Peek()
if oldestReqFromMinQueue == nil { if oldestReqFromMinQueue == nil {
return return
} }
@ -1032,8 +1041,8 @@ func (qs *queueSet) removeQueueIfEmptyLocked(r *request) {
// If there are more queues than desired and this one has no // If there are more queues than desired and this one has no
// requests then remove it // requests then remove it
if len(qs.queues) > qs.qCfg.DesiredNumQueues && if len(qs.queues) > qs.qCfg.DesiredNumQueues &&
r.queue.requests.Length() == 0 && r.queue.requestsWaiting.Length() == 0 &&
r.queue.requestsExecuting == 0 { r.queue.requestsExecuting.Len() == 0 {
qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.queue.index) qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.queue.index)
// decrement here to maintain the invariant that (qs.robinIndex+1) % numQueues // decrement here to maintain the invariant that (qs.robinIndex+1) % numQueues
@ -1059,6 +1068,7 @@ func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump {
defer qs.lock.Unlock() defer qs.lock.Unlock()
d := debug.QueueSetDump{ d := debug.QueueSetDump{
Queues: make([]debug.QueueDump, len(qs.queues)), Queues: make([]debug.QueueDump, len(qs.queues)),
QueuelessExecutingRequests: SetMapReduce(dumpRequest(includeRequestDetails), append1[debug.RequestDump])(qs.requestsExecutingSet),
Waiting: qs.totRequestsWaiting, Waiting: qs.totRequestsWaiting,
Executing: qs.totRequestsExecuting, Executing: qs.totRequestsExecuting,
SeatsInUse: qs.totSeatsInUse, SeatsInUse: qs.totSeatsInUse,

View File

@ -30,9 +30,13 @@ import (
"testing" "testing"
"time" "time"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/clock" "k8s.io/utils/clock"
"k8s.io/apiserver/pkg/authentication/user"
genericrequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/util/flowcontrol/counter" "k8s.io/apiserver/pkg/util/flowcontrol/counter"
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise" "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise"
test "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing" test "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing"
@ -183,6 +187,7 @@ type uniformScenario struct {
expectedAverages []float64 expectedAverages []float64
expectedEpochAdvances int expectedEpochAdvances int
seatDemandIntegratorSubject fq.Integrator seatDemandIntegratorSubject fq.Integrator
dontDump bool
} }
func (us uniformScenario) exercise(t *testing.T) { func (us uniformScenario) exercise(t *testing.T) {
@ -275,7 +280,10 @@ func (ust *uniformScenarioThread) callK(k int) {
maxWidth := float64(uint64max(ust.uc.initialSeats, ust.uc.finalSeats)) maxWidth := float64(uint64max(ust.uc.initialSeats, ust.uc.finalSeats))
ust.uss.seatDemandIntegratorCheck.Add(maxWidth) ust.uss.seatDemandIntegratorCheck.Add(maxWidth)
returnSeatDemand := func(time.Time) { ust.uss.seatDemandIntegratorCheck.Add(-maxWidth) } returnSeatDemand := func(time.Time) { ust.uss.seatDemandIntegratorCheck.Add(-maxWidth) }
req, idle := ust.uss.qs.StartRequest(context.Background(), &fcrequest.WorkEstimate{InitialSeats: ust.uc.initialSeats, FinalSeats: ust.uc.finalSeats, AdditionalLatency: ust.uc.padDuration}, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil) ctx := context.Background()
username := fmt.Sprintf("%d:%d:%d", ust.i, ust.j, k)
ctx = genericrequest.WithUser(ctx, &user.DefaultInfo{Name: username})
req, idle := ust.uss.qs.StartRequest(ctx, &fcrequest.WorkEstimate{InitialSeats: ust.uc.initialSeats, FinalSeats: ust.uc.finalSeats, AdditionalLatency: ust.uc.padDuration}, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil)
ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle) ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle)
if req == nil { if req == nil {
atomic.AddUint64(&ust.uss.failedCount, 1) atomic.AddUint64(&ust.uss.failedCount, 1)
@ -286,6 +294,9 @@ func (ust *uniformScenarioThread) callK(k int) {
if idle { if idle {
ust.uss.t.Error("got request but QueueSet reported idle") ust.uss.t.Error("got request but QueueSet reported idle")
} }
if (!ust.uss.dontDump) && k%100 == 0 {
insistRequestFromUser(ust.uss.t, ust.uss.qs, username)
}
var executed bool var executed bool
var returnTime time.Time var returnTime time.Time
idle2 := req.Finish(func() { idle2 := req.Finish(func() {
@ -311,6 +322,26 @@ func (ust *uniformScenarioThread) callK(k int) {
} }
} }
func insistRequestFromUser(t *testing.T, qs fq.QueueSet, username string) {
qsd := qs.Dump(true)
goodRequest := func(rd debug.RequestDump) bool {
return rd.UserName == username
}
goodSliceOfRequests := SliceMapReduce(goodRequest, or)
if goodSliceOfRequests(qsd.QueuelessExecutingRequests) {
t.Logf("Found user %s among queueless requests", username)
return
}
goodQueueDump := func(qd debug.QueueDump) bool {
return goodSliceOfRequests(qd.Requests) || goodSliceOfRequests(qd.RequestsExecuting)
}
if SliceMapReduce(goodQueueDump, or)(qsd.Queues) {
t.Logf("Found user %s among queued requests", username)
return
}
t.Errorf("Failed to find request from user %s", username)
}
func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool, margin float64) { func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool, margin float64) {
uss.clk.Run(&lim) uss.clk.Run(&lim)
uss.clk.SetTime(lim) uss.clk.SetTime(lim)
@ -491,6 +522,7 @@ func TestNoRestraint(t *testing.T) {
expectAllRequests: true, expectAllRequests: true,
clk: clk, clk: clk,
counter: counter, counter: counter,
dontDump: true,
}.exercise(t) }.exercise(t)
}) })
} }
@ -1268,13 +1300,14 @@ func TestFindDispatchQueueLocked(t *testing.T) {
queues: []*queue{ queues: []*queue{
{ {
nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second), nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO( requestsWaiting: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})},
), ),
requestsExecuting: sets.New[*request](),
}, },
{ {
nextDispatchR: fcrequest.SeatsTimesDuration(1, 100*time.Second), nextDispatchR: fcrequest.SeatsTimesDuration(1, 100*time.Second),
requests: newFIFO( requestsWaiting: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})},
), ),
}, },
@ -1291,9 +1324,10 @@ func TestFindDispatchQueueLocked(t *testing.T) {
queues: []*queue{ queues: []*queue{
{ {
nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second), nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO( requestsWaiting: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})},
), ),
requestsExecuting: sets.New[*request](),
}, },
}, },
attempts: 1, attempts: 1,
@ -1308,15 +1342,17 @@ func TestFindDispatchQueueLocked(t *testing.T) {
queues: []*queue{ queues: []*queue{
{ {
nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second), nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO( requestsWaiting: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 50})}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 50})},
), ),
requestsExecuting: sets.New[*request](),
}, },
{ {
nextDispatchR: fcrequest.SeatsTimesDuration(1, 100*time.Second), nextDispatchR: fcrequest.SeatsTimesDuration(1, 100*time.Second),
requests: newFIFO( requestsWaiting: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})},
), ),
requestsExecuting: sets.New[*request](),
}, },
}, },
attempts: 1, attempts: 1,
@ -1331,15 +1367,17 @@ func TestFindDispatchQueueLocked(t *testing.T) {
queues: []*queue{ queues: []*queue{
{ {
nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second), nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO( requestsWaiting: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 10})}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 10})},
), ),
requestsExecuting: sets.New[*request](),
}, },
{ {
nextDispatchR: fcrequest.SeatsTimesDuration(1, 100*time.Second), nextDispatchR: fcrequest.SeatsTimesDuration(1, 100*time.Second),
requests: newFIFO( requestsWaiting: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})},
), ),
requestsExecuting: sets.New[*request](),
}, },
}, },
attempts: 3, attempts: 3,
@ -1354,15 +1392,17 @@ func TestFindDispatchQueueLocked(t *testing.T) {
queues: []*queue{ queues: []*queue{
{ {
nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second), nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO( requestsWaiting: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 10})}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 10})},
), ),
requestsExecuting: sets.New[*request](),
}, },
{ {
nextDispatchR: fcrequest.SeatsTimesDuration(1, 100*time.Second), nextDispatchR: fcrequest.SeatsTimesDuration(1, 100*time.Second),
requests: newFIFO( requestsWaiting: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})}, &request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})},
), ),
requestsExecuting: sets.New[*request](),
}, },
}, },
beforeSelectQueueLocked: func(attempt int, qs *queueSet) { beforeSelectQueueLocked: func(attempt int, qs *queueSet) {
@ -1457,23 +1497,25 @@ func TestFinishRequestLocked(t *testing.T) {
seatDemandIntegrator: fq.NewNamedIntegrator(clk, "seatDemandSubject"), seatDemandIntegrator: fq.NewNamedIntegrator(clk, "seatDemandSubject"),
} }
queue := &queue{ queue := &queue{
requests: newRequestFIFO(), requestsWaiting: newRequestFIFO(),
requestsExecuting: sets.New[*request](),
} }
r := &request{ r := &request{
qs: qs, qs: qs,
queue: queue, queue: queue,
workEstimate: qs.completeWorkEstimate(&test.workEstimate), workEstimate: qs.completeWorkEstimate(&test.workEstimate),
} }
rOther := &request{qs: qs, queue: queue}
qs.totRequestsExecuting = 111 qs.totRequestsExecuting = 111
qs.totSeatsInUse = 222 qs.totSeatsInUse = 222
queue.requestsExecuting = 11 queue.requestsExecuting = sets.New(r, rOther)
queue.seatsInUse = 22 queue.seatsInUse = 22
var ( var (
queuesetTotalRequestsExecutingExpected = qs.totRequestsExecuting - 1 queuesetTotalRequestsExecutingExpected = qs.totRequestsExecuting - 1
queuesetTotalSeatsInUseExpected = qs.totSeatsInUse - test.workEstimate.MaxSeats() queuesetTotalSeatsInUseExpected = qs.totSeatsInUse - test.workEstimate.MaxSeats()
queueRequestsExecutingExpected = queue.requestsExecuting - 1 queueRequestsExecutingExpected = sets.New(rOther)
queueSeatsInUseExpected = queue.seatsInUse - test.workEstimate.MaxSeats() queueSeatsInUseExpected = queue.seatsInUse - test.workEstimate.MaxSeats()
) )
@ -1488,8 +1530,8 @@ func TestFinishRequestLocked(t *testing.T) {
if queuesetTotalSeatsInUseExpected != qs.totSeatsInUse { if queuesetTotalSeatsInUseExpected != qs.totSeatsInUse {
t.Errorf("Expected total seats in use: %d, but got: %d", queuesetTotalSeatsInUseExpected, qs.totSeatsInUse) t.Errorf("Expected total seats in use: %d, but got: %d", queuesetTotalSeatsInUseExpected, qs.totSeatsInUse)
} }
if queueRequestsExecutingExpected != queue.requestsExecuting { if !queueRequestsExecutingExpected.Equal(queue.requestsExecuting) {
t.Errorf("Expected requests executing for queue: %d, but got: %d", queueRequestsExecutingExpected, queue.requestsExecuting) t.Errorf("Expected requests executing for queue: %v, but got: %v", queueRequestsExecutingExpected, queue.requestsExecuting)
} }
if queueSeatsInUseExpected != queue.seatsInUse { if queueSeatsInUseExpected != queue.seatsInUse {
t.Errorf("Expected seats in use for queue: %d, but got: %d", queueSeatsInUseExpected, queue.seatsInUse) t.Errorf("Expected seats in use for queue: %d, but got: %d", queueSeatsInUseExpected, queue.seatsInUse)

View File

@ -20,6 +20,7 @@ import (
"context" "context"
"time" "time"
"k8s.io/apimachinery/pkg/util/sets"
genericrequest "k8s.io/apiserver/pkg/endpoints/request" genericrequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/util/flowcontrol/debug" "k8s.io/apiserver/pkg/util/flowcontrol/debug"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
@ -90,15 +91,15 @@ type completedWorkEstimate struct {
// queue is a sequence of requests that have arrived but not yet finished // queue is a sequence of requests that have arrived but not yet finished
// execution in both the real and virtual worlds. // execution in both the real and virtual worlds.
type queue struct { type queue struct {
// The requests not yet executing in the real world are stored in a FIFO list. // The requestsWaiting not yet executing in the real world are stored in a FIFO list.
requests fifo requestsWaiting fifo
// nextDispatchR is the R progress meter reading at // nextDispatchR is the R progress meter reading at
// which the next request will be dispatched in the virtual world. // which the next request will be dispatched in the virtual world.
nextDispatchR fcrequest.SeatSeconds nextDispatchR fcrequest.SeatSeconds
// requestsExecuting is the count in the real world. // requestsExecuting is the set of requests executing in the real world.
requestsExecuting int requestsExecuting sets.Set[*request]
// index is the position of this queue among those in its queueSet. // index is the position of this queue among those in its queueSet.
index int index int
@ -145,28 +146,14 @@ func (qs *queueSet) computeFinalWork(we *fcrequest.WorkEstimate) fcrequest.SeatS
} }
func (q *queue) dumpLocked(includeDetails bool) debug.QueueDump { func (q *queue) dumpLocked(includeDetails bool) debug.QueueDump {
digest := make([]debug.RequestDump, q.requests.Length()) waitingDigest := make([]debug.RequestDump, 0, q.requestsWaiting.Length())
i := 0 q.requestsWaiting.Walk(func(r *request) bool {
q.requests.Walk(func(r *request) bool { waitingDigest = append(waitingDigest, dumpRequest(includeDetails)(r))
// dump requests.
digest[i].MatchedFlowSchema = r.fsName
digest[i].FlowDistinguisher = r.flowDistinguisher
digest[i].ArriveTime = r.arrivalTime
digest[i].StartTime = r.startTime
digest[i].WorkEstimate = r.workEstimate.WorkEstimate
if includeDetails {
userInfo, _ := genericrequest.UserFrom(r.ctx)
digest[i].UserName = userInfo.GetName()
requestInfo, ok := genericrequest.RequestInfoFrom(r.ctx)
if ok {
digest[i].RequestInfo = *requestInfo
}
}
i++
return true return true
}) })
executingDigest := SetMapReduce(dumpRequest(includeDetails), append1[debug.RequestDump])(q.requestsExecuting)
sum := q.requests.QueueSum() sum := q.requestsWaiting.QueueSum()
queueSum := debug.QueueSum{ queueSum := debug.QueueSum{
InitialSeatsSum: sum.InitialSeatsSum, InitialSeatsSum: sum.InitialSeatsSum,
MaxSeatsSum: sum.MaxSeatsSum, MaxSeatsSum: sum.MaxSeatsSum,
@ -175,9 +162,57 @@ func (q *queue) dumpLocked(includeDetails bool) debug.QueueDump {
return debug.QueueDump{ return debug.QueueDump{
NextDispatchR: q.nextDispatchR.String(), NextDispatchR: q.nextDispatchR.String(),
Requests: digest, Requests: waitingDigest,
ExecutingRequests: q.requestsExecuting, RequestsExecuting: executingDigest,
ExecutingRequests: q.requestsExecuting.Len(),
SeatsInUse: q.seatsInUse, SeatsInUse: q.seatsInUse,
QueueSum: queueSum, QueueSum: queueSum,
} }
} }
func dumpRequest(includeDetails bool) func(*request) debug.RequestDump {
return func(r *request) debug.RequestDump {
ans := debug.RequestDump{
MatchedFlowSchema: r.fsName,
FlowDistinguisher: r.flowDistinguisher,
ArriveTime: r.arrivalTime,
StartTime: r.startTime,
WorkEstimate: r.workEstimate.WorkEstimate,
}
if includeDetails {
userInfo, _ := genericrequest.UserFrom(r.ctx)
ans.UserName = userInfo.GetName()
requestInfo, ok := genericrequest.RequestInfoFrom(r.ctx)
if ok {
ans.RequestInfo = *requestInfo
}
}
return ans
}
}
// SetMapReduce is map-reduce starting from a set type in the sets package.
func SetMapReduce[Elt comparable, Result, Accumulator any](mapFn func(Elt) Result, reduceFn func(Accumulator, Result) Accumulator) func(map[Elt]sets.Empty) Accumulator {
return func(set map[Elt]sets.Empty) Accumulator {
var ans Accumulator
for elt := range set {
ans = reduceFn(ans, mapFn(elt))
}
return ans
}
}
// SliceMapReduce is map-reduce starting from a slice.
func SliceMapReduce[Elt, Result, Accumulator any](mapFn func(Elt) Result, reduceFn func(Accumulator, Result) Accumulator) func([]Elt) Accumulator {
return func(slice []Elt) Accumulator {
var ans Accumulator
for _, elt := range slice {
ans = reduceFn(ans, mapFn(elt))
}
return ans
}
}
func or(x, y bool) bool { return x || y }
func append1[Elt any](slice []Elt, next Elt) []Elt { return append(slice, next) }