Merge pull request #119009 from MikeSpreitzer/track-executing-requests

Track executing requests
This commit is contained in:
Kubernetes Prow Robot 2023-07-18 13:44:18 -07:00 committed by GitHub
commit 31d662e58e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 219 additions and 118 deletions

View File

@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/server/mux"
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
)
const (
@ -154,54 +155,65 @@ func (cfgCtlr *configController) dumpRequests(w http.ResponseWriter, r *http.Req
"InitialSeats", // 7
"FinalSeats", // 8
"AdditionalLatency", // 9
"StartTime", // 10
}))
if includeRequestDetails {
continueLine(tabWriter)
tabPrint(tabWriter, rowForHeaders([]string{
"UserName", // 10
"Verb", // 11
"APIPath", // 12
"Namespace", // 13
"Name", // 14
"APIVersion", // 15
"Resource", // 16
"SubResource", // 17
"UserName", // 11
"Verb", // 12
"APIPath", // 13
"Namespace", // 14
"Name", // 15
"APIVersion", // 16
"Resource", // 17
"SubResource", // 18
}))
}
endLine(tabWriter)
for _, plState := range cfgCtlr.priorityLevelStates {
queueSetDigest := plState.queues.Dump(includeRequestDetails)
dumpRequest := func(iq, ir int, r debug.RequestDump) {
tabPrint(tabWriter, row(
plState.pl.Name, // 1
r.MatchedFlowSchema, // 2
strconv.Itoa(iq), // 3
strconv.Itoa(ir), // 4
r.FlowDistinguisher, // 5
r.ArriveTime.UTC().Format(time.RFC3339Nano), // 6
strconv.Itoa(int(r.WorkEstimate.InitialSeats)), // 7
strconv.Itoa(int(r.WorkEstimate.FinalSeats)), // 8
r.WorkEstimate.AdditionalLatency.String(), // 9
r.StartTime.UTC().Format(time.RFC3339Nano), // 10
))
if includeRequestDetails {
continueLine(tabWriter)
tabPrint(tabWriter, rowForRequestDetails(
r.UserName, // 11
r.RequestInfo.Verb, // 12
r.RequestInfo.Path, // 13
r.RequestInfo.Namespace, // 14
r.RequestInfo.Name, // 15
schema.GroupVersion{
Group: r.RequestInfo.APIGroup,
Version: r.RequestInfo.APIVersion,
}.String(), // 16
r.RequestInfo.Resource, // 17
r.RequestInfo.Subresource, // 18
))
}
endLine(tabWriter)
}
for iq, q := range queueSetDigest.Queues {
for ir, r := range q.Requests {
tabPrint(tabWriter, row(
plState.pl.Name, // 1
r.MatchedFlowSchema, // 2
strconv.Itoa(iq), // 3
strconv.Itoa(ir), // 4
r.FlowDistinguisher, // 5
r.ArriveTime.UTC().Format(time.RFC3339Nano), // 6
strconv.Itoa(int(r.WorkEstimate.InitialSeats)), // 7
strconv.Itoa(int(r.WorkEstimate.FinalSeats)), // 8
r.WorkEstimate.AdditionalLatency.String(), // 9
))
if includeRequestDetails {
continueLine(tabWriter)
tabPrint(tabWriter, rowForRequestDetails(
r.UserName, // 10
r.RequestInfo.Verb, // 11
r.RequestInfo.Path, // 12
r.RequestInfo.Namespace, // 13
r.RequestInfo.Name, // 14
schema.GroupVersion{
Group: r.RequestInfo.APIGroup,
Version: r.RequestInfo.APIVersion,
}.String(), // 15
r.RequestInfo.Resource, // 16
r.RequestInfo.Subresource, // 17
))
}
endLine(tabWriter)
dumpRequest(iq, ir, r)
}
for _, r := range q.RequestsExecuting {
dumpRequest(iq, -1, r)
}
}
for _, r := range queueSetDigest.QueuelessExecutingRequests {
dumpRequest(-1, -1, r)
}
}
runtime.HandleError(tabWriter.Flush())

View File

@ -25,21 +25,23 @@ import (
// QueueSetDump is an instant dump of queue-set.
type QueueSetDump struct {
Queues []QueueDump
Waiting int
Executing int
SeatsInUse int
SeatsWaiting int
Dispatched int
Rejected int
Timedout int
Cancelled int
Queues []QueueDump
QueuelessExecutingRequests []RequestDump
Waiting int
Executing int
SeatsInUse int
SeatsWaiting int
Dispatched int
Rejected int
Timedout int
Cancelled int
}
// QueueDump is an instant dump of one queue in a queue-set.
type QueueDump struct {
QueueSum QueueSum
Requests []RequestDump
Requests []RequestDump // just the waiting ones
RequestsExecuting []RequestDump
NextDispatchR string
ExecutingRequests int
SeatsInUse int

View File

@ -24,6 +24,7 @@ import (
"sync"
"time"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/eventclock"
@ -138,6 +139,10 @@ type queueSet struct {
// from that queue.
totRequestsExecuting int
// requestsExecutingSet is the set of requests executing in the real world IF
// there are no queues; otherwise the requests are tracked in the queues.
requestsExecutingSet sets.Set[*request]
// totSeatsInUse is the number of total "seats" in use by all the
// request(s) that are currently executing in this queueset.
totSeatsInUse int
@ -219,6 +224,7 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet {
qCfg: qsc.qCfg,
currentR: 0,
lastRealTime: qsc.factory.clock.Now(),
requestsExecutingSet: sets.New[*request](),
}
qs.promiseFactory = qsc.factory.promiseFactoryFactory(qs)
}
@ -230,7 +236,7 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet {
func createQueues(n, baseIndex int) []*queue {
fqqueues := make([]*queue, n)
for i := 0; i < n; i++ {
fqqueues[i] = &queue{index: baseIndex + i, requests: newRequestFIFO()}
fqqueues[i] = &queue{index: baseIndex + i, requestsWaiting: newRequestFIFO(), requestsExecuting: sets.New[*request]()}
}
return fqqueues
}
@ -504,7 +510,7 @@ func (qs *queueSet) advanceEpoch(ctx context.Context, now time.Time, incrR fqreq
klog.InfoS("Advancing epoch", "QS", qs.qCfg.Name, "when", now.Format(nsTimeFmt), "oldR", oldR, "newR", qs.currentR, "incrR", incrR)
success := true
for qIdx, queue := range qs.queues {
if queue.requests.Length() == 0 && queue.requestsExecuting == 0 {
if queue.requestsWaiting.Length() == 0 && queue.requestsExecuting.Len() == 0 {
// Do not just decrement, the value could be quite outdated.
// It is safe to reset to zero in this case, because the next request
// will overwrite the zero with `qs.currentR`.
@ -517,7 +523,7 @@ func (qs *queueSet) advanceEpoch(ctx context.Context, now time.Time, incrR fqreq
klog.ErrorS(errors.New("queue::nextDispatchR underflow"), "Underflow", "QS", qs.qCfg.Name, "queue", qIdx, "oldNextDispatchR", oldNextDispatchR, "newNextDispatchR", queue.nextDispatchR, "incrR", incrR)
success = false
}
queue.requests.Walk(func(req *request) bool {
queue.requestsWaiting.Walk(func(req *request) bool {
oldArrivalR := req.arrivalR
req.arrivalR -= rDecrement
if req.arrivalR > oldArrivalR {
@ -538,8 +544,8 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 {
for _, queue := range qs.queues {
// here we want the sum of the maximum width of the requests in this queue since our
// goal is to find the maximum rate at which the queue could work.
seatsRequested += (queue.seatsInUse + queue.requests.QueueSum().MaxSeatsSum)
if queue.requests.Length() > 0 || queue.requestsExecuting > 0 {
seatsRequested += (queue.seatsInUse + queue.requestsWaiting.QueueSum().MaxSeatsSum)
if queue.requestsWaiting.Length() > 0 || queue.requestsExecuting.Len() > 0 {
activeQueues++
}
}
@ -589,7 +595,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
if ok := qs.rejectOrEnqueueToBoundLocked(req); !ok {
return nil
}
metrics.ObserveQueueLength(ctx, qs.qCfg.Name, fsName, queue.requests.Length())
metrics.ObserveQueueLength(ctx, qs.qCfg.Name, fsName, queue.requestsWaiting.Length())
return req
}
@ -608,7 +614,7 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac
for i := 0; i < handSize; i++ {
queueIdx := hand[(offset+i)%handSize]
queue := qs.queues[queueIdx]
queueSum := queue.requests.QueueSum()
queueSum := queue.requestsWaiting.QueueSum()
// this is the total amount of work in seat-seconds for requests
// waiting in this queue, we will select the queue with the minimum.
@ -621,7 +627,7 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac
}
if klogV := klog.V(6); klogV.Enabled() {
chosenQueue := qs.queues[bestQueueIdx]
klogV.Infof("QS(%s) at t=%s R=%v: For request %#+v %#+v chose queue %d, with sum: %#v & %d seats in use & nextDispatchR=%v", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.currentR, descr1, descr2, bestQueueIdx, chosenQueue.requests.QueueSum(), chosenQueue.seatsInUse, chosenQueue.nextDispatchR)
klogV.Infof("QS(%s) at t=%s R=%v: For request %#+v %#+v chose queue %d, with sum: %#v & %d seats in use & nextDispatchR=%v", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.currentR, descr1, descr2, bestQueueIdx, chosenQueue.requestsWaiting.QueueSum(), chosenQueue.seatsInUse, chosenQueue.nextDispatchR)
}
return bestQueueIdx
}
@ -632,7 +638,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueToBoundLocked(queue *queue, f
timeoutCount := 0
disqueueSeats := 0
now := qs.clock.Now()
reqs := queue.requests
reqs := queue.requestsWaiting
// reqs are sorted oldest -> newest
// can short circuit loop (break) if oldest requests are not timing out
// as newer requests also will not have timed out
@ -669,7 +675,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueToBoundLocked(queue *queue, f
// Otherwise enqueues and returns true.
func (qs *queueSet) rejectOrEnqueueToBoundLocked(request *request) bool {
queue := request.queue
curQueueLength := queue.requests.Length()
curQueueLength := queue.requestsWaiting.Length()
// rejects the newly arrived request if resource criteria not met
if qs.totSeatsInUse >= qs.dCfg.ConcurrencyLimit &&
curQueueLength >= qs.qCfg.QueueLengthLimit {
@ -684,7 +690,7 @@ func (qs *queueSet) rejectOrEnqueueToBoundLocked(request *request) bool {
func (qs *queueSet) enqueueToBoundLocked(request *request) {
queue := request.queue
now := qs.clock.Now()
if queue.requests.Length() == 0 && queue.requestsExecuting == 0 {
if queue.requestsWaiting.Length() == 0 && queue.requestsExecuting.Len() == 0 {
// the queues start R is set to the virtual time.
queue.nextDispatchR = qs.currentR
klogV := klog.V(6)
@ -692,7 +698,7 @@ func (qs *queueSet) enqueueToBoundLocked(request *request) {
klogV.Infof("QS(%s) at t=%s R=%v: initialized queue %d start R due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.nextDispatchR, queue.index, request.descr1, request.descr2)
}
}
request.removeFromQueueLocked = queue.requests.Enqueue(request)
request.removeFromQueueLocked = queue.requestsWaiting.Enqueue(request)
qs.totRequestsWaiting++
qs.totSeatsWaiting += request.MaxSeats()
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1)
@ -725,6 +731,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
}
qs.totRequestsExecuting++
qs.totSeatsInUse += req.MaxSeats()
qs.requestsExecutingSet = qs.requestsExecutingSet.Insert(req)
metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1)
metrics.AddSeatConcurrencyInUse(qs.qCfg.Name, fsName, req.MaxSeats())
qs.reqsGaugePair.RequestsExecuting.Add(1)
@ -768,7 +775,7 @@ func (qs *queueSet) dispatchLocked() bool {
// problem because other overhead is also included.
qs.totRequestsExecuting++
qs.totSeatsInUse += request.MaxSeats()
queue.requestsExecuting++
queue.requestsExecuting = queue.requestsExecuting.Insert(request)
queue.seatsInUse += request.MaxSeats()
metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1)
metrics.AddSeatConcurrencyInUse(qs.qCfg.Name, request.fsName, request.MaxSeats())
@ -779,7 +786,7 @@ func (qs *queueSet) dispatchLocked() bool {
if klogV.Enabled() {
klogV.Infof("QS(%s) at t=%s R=%v: dispatching request %#+v %#+v work %v from queue %d with start R %v, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied",
qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.currentR, request.descr1, request.descr2,
request.workEstimate, queue.index, queue.nextDispatchR, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse)
request.workEstimate, queue.index, queue.nextDispatchR, queue.requestsWaiting.Length(), queue.requestsExecuting.Len(), queue.seatsInUse, qs.totSeatsInUse)
}
// When a request is dequeued for service -> qs.virtualStart += G * width
if request.totalWork() > rDecrement/100 { // A single increment should never be so big
@ -834,7 +841,7 @@ func (qs *queueSet) findDispatchQueueToBoundLocked() (*queue, *request) {
for range qs.queues {
qs.robinIndex = (qs.robinIndex + 1) % nq
queue := qs.queues[qs.robinIndex]
oldestWaiting, _ := queue.requests.Peek()
oldestWaiting, _ := queue.requestsWaiting.Peek()
if oldestWaiting != nil {
sMin = ssMin(sMin, queue.nextDispatchR)
sMax = ssMax(sMax, queue.nextDispatchR)
@ -851,7 +858,7 @@ func (qs *queueSet) findDispatchQueueToBoundLocked() (*queue, *request) {
}
}
oldestReqFromMinQueue, _ := minQueue.requests.Peek()
oldestReqFromMinQueue, _ := minQueue.requestsWaiting.Peek()
if oldestReqFromMinQueue == nil {
// This cannot happen
klog.ErrorS(errors.New("selected queue is empty"), "Impossible", "queueSet", qs.qCfg.Name)
@ -955,7 +962,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
} else if r.queue != nil {
klogV.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished all use of %d seats, adjusted queue %d start R to %v due to service time %.9fs, queue will have %d requests with %#v waiting & %d requests occupying %d seats",
qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.MaxSeats(), r.queue.index,
r.queue.nextDispatchR, actualServiceDuration.Seconds(), r.queue.requests.Length(), r.queue.requests.QueueSum(), r.queue.requestsExecuting, r.queue.seatsInUse)
r.queue.nextDispatchR, actualServiceDuration.Seconds(), r.queue.requestsWaiting.Length(), r.queue.requestsWaiting.QueueSum(), r.queue.requestsExecuting.Len(), r.queue.seatsInUse)
} else {
klogV.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished all use of %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, qs.totRequestsExecuting, qs.totSeatsInUse)
}
@ -967,7 +974,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
} else if r.queue != nil {
klogV.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished main use of %d seats but lingering on %d seats for %v seconds, adjusted queue %d start R to %v due to service time %.9fs, queue will have %d requests with %#v waiting & %d requests occupying %d seats",
qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, r.workEstimate.FinalSeats, additionalLatency.Seconds(), r.queue.index,
r.queue.nextDispatchR, actualServiceDuration.Seconds(), r.queue.requests.Length(), r.queue.requests.QueueSum(), r.queue.requestsExecuting, r.queue.seatsInUse)
r.queue.nextDispatchR, actualServiceDuration.Seconds(), r.queue.requestsWaiting.Length(), r.queue.requestsWaiting.QueueSum(), r.queue.requestsExecuting.Len(), r.queue.seatsInUse)
} else {
klogV.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished main use of %d seats but lingering on %d seats for %v seconds, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, r.workEstimate.FinalSeats, additionalLatency.Seconds(), qs.totRequestsExecuting, qs.totSeatsInUse)
}
@ -984,7 +991,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
} else if r.queue != nil {
klogV.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished lingering on %d seats, queue %d will have %d requests with %#v waiting & %d requests occupying %d seats",
qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.FinalSeats, r.queue.index,
r.queue.requests.Length(), r.queue.requests.QueueSum(), r.queue.requestsExecuting, r.queue.seatsInUse)
r.queue.requestsWaiting.Length(), r.queue.requestsWaiting.QueueSum(), r.queue.requestsExecuting.Len(), r.queue.seatsInUse)
} else {
klogV.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished lingering on %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.FinalSeats, qs.totRequestsExecuting, qs.totSeatsInUse)
}
@ -994,12 +1001,14 @@ func (qs *queueSet) finishRequestLocked(r *request) {
if r.queue != nil {
// request has finished, remove from requests executing
r.queue.requestsExecuting--
r.queue.requestsExecuting = r.queue.requestsExecuting.Delete(r)
// When a request finishes being served, and the actual service time was S,
// the queues start R is decremented by (G - S)*width.
r.queue.nextDispatchR -= fqrequest.SeatsTimesDuration(float64(r.InitialSeats()), qs.estimatedServiceDuration-actualServiceDuration)
qs.boundNextDispatchLocked(r.queue)
} else {
qs.requestsExecutingSet = qs.requestsExecutingSet.Delete(r)
}
}
@ -1011,7 +1020,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
// The following hack addresses the first side of that inequity,
// by insisting that dispatch in the virtual world not precede arrival.
func (qs *queueSet) boundNextDispatchLocked(queue *queue) {
oldestReqFromMinQueue, _ := queue.requests.Peek()
oldestReqFromMinQueue, _ := queue.requestsWaiting.Peek()
if oldestReqFromMinQueue == nil {
return
}
@ -1032,8 +1041,8 @@ func (qs *queueSet) removeQueueIfEmptyLocked(r *request) {
// If there are more queues than desired and this one has no
// requests then remove it
if len(qs.queues) > qs.qCfg.DesiredNumQueues &&
r.queue.requests.Length() == 0 &&
r.queue.requestsExecuting == 0 {
r.queue.requestsWaiting.Length() == 0 &&
r.queue.requestsExecuting.Len() == 0 {
qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.queue.index)
// decrement here to maintain the invariant that (qs.robinIndex+1) % numQueues
@ -1058,15 +1067,16 @@ func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump {
qs.lock.Lock()
defer qs.lock.Unlock()
d := debug.QueueSetDump{
Queues: make([]debug.QueueDump, len(qs.queues)),
Waiting: qs.totRequestsWaiting,
Executing: qs.totRequestsExecuting,
SeatsInUse: qs.totSeatsInUse,
SeatsWaiting: qs.totSeatsWaiting,
Dispatched: qs.totRequestsDispatched,
Rejected: qs.totRequestsRejected,
Timedout: qs.totRequestsTimedout,
Cancelled: qs.totRequestsCancelled,
Queues: make([]debug.QueueDump, len(qs.queues)),
QueuelessExecutingRequests: SetMapReduce(dumpRequest(includeRequestDetails), append1[debug.RequestDump])(qs.requestsExecutingSet),
Waiting: qs.totRequestsWaiting,
Executing: qs.totRequestsExecuting,
SeatsInUse: qs.totSeatsInUse,
SeatsWaiting: qs.totSeatsWaiting,
Dispatched: qs.totRequestsDispatched,
Rejected: qs.totRequestsRejected,
Timedout: qs.totRequestsTimedout,
Cancelled: qs.totRequestsCancelled,
}
for i, q := range qs.queues {
d.Queues[i] = q.dumpLocked(includeRequestDetails)

View File

@ -30,9 +30,13 @@ import (
"testing"
"time"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/clock"
"k8s.io/apiserver/pkg/authentication/user"
genericrequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise"
test "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing"
@ -183,6 +187,7 @@ type uniformScenario struct {
expectedAverages []float64
expectedEpochAdvances int
seatDemandIntegratorSubject fq.Integrator
dontDump bool
}
func (us uniformScenario) exercise(t *testing.T) {
@ -275,7 +280,10 @@ func (ust *uniformScenarioThread) callK(k int) {
maxWidth := float64(uint64max(ust.uc.initialSeats, ust.uc.finalSeats))
ust.uss.seatDemandIntegratorCheck.Add(maxWidth)
returnSeatDemand := func(time.Time) { ust.uss.seatDemandIntegratorCheck.Add(-maxWidth) }
req, idle := ust.uss.qs.StartRequest(context.Background(), &fcrequest.WorkEstimate{InitialSeats: ust.uc.initialSeats, FinalSeats: ust.uc.finalSeats, AdditionalLatency: ust.uc.padDuration}, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil)
ctx := context.Background()
username := fmt.Sprintf("%d:%d:%d", ust.i, ust.j, k)
ctx = genericrequest.WithUser(ctx, &user.DefaultInfo{Name: username})
req, idle := ust.uss.qs.StartRequest(ctx, &fcrequest.WorkEstimate{InitialSeats: ust.uc.initialSeats, FinalSeats: ust.uc.finalSeats, AdditionalLatency: ust.uc.padDuration}, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil)
ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle)
if req == nil {
atomic.AddUint64(&ust.uss.failedCount, 1)
@ -286,6 +294,9 @@ func (ust *uniformScenarioThread) callK(k int) {
if idle {
ust.uss.t.Error("got request but QueueSet reported idle")
}
if (!ust.uss.dontDump) && k%100 == 0 {
insistRequestFromUser(ust.uss.t, ust.uss.qs, username)
}
var executed bool
var returnTime time.Time
idle2 := req.Finish(func() {
@ -311,6 +322,26 @@ func (ust *uniformScenarioThread) callK(k int) {
}
}
func insistRequestFromUser(t *testing.T, qs fq.QueueSet, username string) {
qsd := qs.Dump(true)
goodRequest := func(rd debug.RequestDump) bool {
return rd.UserName == username
}
goodSliceOfRequests := SliceMapReduce(goodRequest, or)
if goodSliceOfRequests(qsd.QueuelessExecutingRequests) {
t.Logf("Found user %s among queueless requests", username)
return
}
goodQueueDump := func(qd debug.QueueDump) bool {
return goodSliceOfRequests(qd.Requests) || goodSliceOfRequests(qd.RequestsExecuting)
}
if SliceMapReduce(goodQueueDump, or)(qsd.Queues) {
t.Logf("Found user %s among queued requests", username)
return
}
t.Errorf("Failed to find request from user %s", username)
}
func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool, margin float64) {
uss.clk.Run(&lim)
uss.clk.SetTime(lim)
@ -491,6 +522,7 @@ func TestNoRestraint(t *testing.T) {
expectAllRequests: true,
clk: clk,
counter: counter,
dontDump: true,
}.exercise(t)
})
}
@ -1268,13 +1300,14 @@ func TestFindDispatchQueueLocked(t *testing.T) {
queues: []*queue{
{
nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO(
requestsWaiting: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})},
),
requestsExecuting: sets.New[*request](),
},
{
nextDispatchR: fcrequest.SeatsTimesDuration(1, 100*time.Second),
requests: newFIFO(
requestsWaiting: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})},
),
},
@ -1291,9 +1324,10 @@ func TestFindDispatchQueueLocked(t *testing.T) {
queues: []*queue{
{
nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO(
requestsWaiting: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})},
),
requestsExecuting: sets.New[*request](),
},
},
attempts: 1,
@ -1308,15 +1342,17 @@ func TestFindDispatchQueueLocked(t *testing.T) {
queues: []*queue{
{
nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO(
requestsWaiting: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 50})},
),
requestsExecuting: sets.New[*request](),
},
{
nextDispatchR: fcrequest.SeatsTimesDuration(1, 100*time.Second),
requests: newFIFO(
requestsWaiting: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})},
),
requestsExecuting: sets.New[*request](),
},
},
attempts: 1,
@ -1331,15 +1367,17 @@ func TestFindDispatchQueueLocked(t *testing.T) {
queues: []*queue{
{
nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO(
requestsWaiting: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 10})},
),
requestsExecuting: sets.New[*request](),
},
{
nextDispatchR: fcrequest.SeatsTimesDuration(1, 100*time.Second),
requests: newFIFO(
requestsWaiting: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})},
),
requestsExecuting: sets.New[*request](),
},
},
attempts: 3,
@ -1354,15 +1392,17 @@ func TestFindDispatchQueueLocked(t *testing.T) {
queues: []*queue{
{
nextDispatchR: fcrequest.SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO(
requestsWaiting: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 10})},
),
requestsExecuting: sets.New[*request](),
},
{
nextDispatchR: fcrequest.SeatsTimesDuration(1, 100*time.Second),
requests: newFIFO(
requestsWaiting: newFIFO(
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})},
),
requestsExecuting: sets.New[*request](),
},
},
beforeSelectQueueLocked: func(attempt int, qs *queueSet) {
@ -1457,23 +1497,25 @@ func TestFinishRequestLocked(t *testing.T) {
seatDemandIntegrator: fq.NewNamedIntegrator(clk, "seatDemandSubject"),
}
queue := &queue{
requests: newRequestFIFO(),
requestsWaiting: newRequestFIFO(),
requestsExecuting: sets.New[*request](),
}
r := &request{
qs: qs,
queue: queue,
workEstimate: qs.completeWorkEstimate(&test.workEstimate),
}
rOther := &request{qs: qs, queue: queue}
qs.totRequestsExecuting = 111
qs.totSeatsInUse = 222
queue.requestsExecuting = 11
queue.requestsExecuting = sets.New(r, rOther)
queue.seatsInUse = 22
var (
queuesetTotalRequestsExecutingExpected = qs.totRequestsExecuting - 1
queuesetTotalSeatsInUseExpected = qs.totSeatsInUse - test.workEstimate.MaxSeats()
queueRequestsExecutingExpected = queue.requestsExecuting - 1
queueRequestsExecutingExpected = sets.New(rOther)
queueSeatsInUseExpected = queue.seatsInUse - test.workEstimate.MaxSeats()
)
@ -1488,8 +1530,8 @@ func TestFinishRequestLocked(t *testing.T) {
if queuesetTotalSeatsInUseExpected != qs.totSeatsInUse {
t.Errorf("Expected total seats in use: %d, but got: %d", queuesetTotalSeatsInUseExpected, qs.totSeatsInUse)
}
if queueRequestsExecutingExpected != queue.requestsExecuting {
t.Errorf("Expected requests executing for queue: %d, but got: %d", queueRequestsExecutingExpected, queue.requestsExecuting)
if !queueRequestsExecutingExpected.Equal(queue.requestsExecuting) {
t.Errorf("Expected requests executing for queue: %v, but got: %v", queueRequestsExecutingExpected, queue.requestsExecuting)
}
if queueSeatsInUseExpected != queue.seatsInUse {
t.Errorf("Expected seats in use for queue: %d, but got: %d", queueSeatsInUseExpected, queue.seatsInUse)

View File

@ -20,6 +20,7 @@ import (
"context"
"time"
"k8s.io/apimachinery/pkg/util/sets"
genericrequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
@ -90,15 +91,15 @@ type completedWorkEstimate struct {
// queue is a sequence of requests that have arrived but not yet finished
// execution in both the real and virtual worlds.
type queue struct {
// The requests not yet executing in the real world are stored in a FIFO list.
requests fifo
// The requestsWaiting not yet executing in the real world are stored in a FIFO list.
requestsWaiting fifo
// nextDispatchR is the R progress meter reading at
// which the next request will be dispatched in the virtual world.
nextDispatchR fcrequest.SeatSeconds
// requestsExecuting is the count in the real world.
requestsExecuting int
// requestsExecuting is the set of requests executing in the real world.
requestsExecuting sets.Set[*request]
// index is the position of this queue among those in its queueSet.
index int
@ -145,28 +146,14 @@ func (qs *queueSet) computeFinalWork(we *fcrequest.WorkEstimate) fcrequest.SeatS
}
func (q *queue) dumpLocked(includeDetails bool) debug.QueueDump {
digest := make([]debug.RequestDump, q.requests.Length())
i := 0
q.requests.Walk(func(r *request) bool {
// dump requests.
digest[i].MatchedFlowSchema = r.fsName
digest[i].FlowDistinguisher = r.flowDistinguisher
digest[i].ArriveTime = r.arrivalTime
digest[i].StartTime = r.startTime
digest[i].WorkEstimate = r.workEstimate.WorkEstimate
if includeDetails {
userInfo, _ := genericrequest.UserFrom(r.ctx)
digest[i].UserName = userInfo.GetName()
requestInfo, ok := genericrequest.RequestInfoFrom(r.ctx)
if ok {
digest[i].RequestInfo = *requestInfo
}
}
i++
waitingDigest := make([]debug.RequestDump, 0, q.requestsWaiting.Length())
q.requestsWaiting.Walk(func(r *request) bool {
waitingDigest = append(waitingDigest, dumpRequest(includeDetails)(r))
return true
})
executingDigest := SetMapReduce(dumpRequest(includeDetails), append1[debug.RequestDump])(q.requestsExecuting)
sum := q.requests.QueueSum()
sum := q.requestsWaiting.QueueSum()
queueSum := debug.QueueSum{
InitialSeatsSum: sum.InitialSeatsSum,
MaxSeatsSum: sum.MaxSeatsSum,
@ -175,9 +162,57 @@ func (q *queue) dumpLocked(includeDetails bool) debug.QueueDump {
return debug.QueueDump{
NextDispatchR: q.nextDispatchR.String(),
Requests: digest,
ExecutingRequests: q.requestsExecuting,
Requests: waitingDigest,
RequestsExecuting: executingDigest,
ExecutingRequests: q.requestsExecuting.Len(),
SeatsInUse: q.seatsInUse,
QueueSum: queueSum,
}
}
func dumpRequest(includeDetails bool) func(*request) debug.RequestDump {
return func(r *request) debug.RequestDump {
ans := debug.RequestDump{
MatchedFlowSchema: r.fsName,
FlowDistinguisher: r.flowDistinguisher,
ArriveTime: r.arrivalTime,
StartTime: r.startTime,
WorkEstimate: r.workEstimate.WorkEstimate,
}
if includeDetails {
userInfo, _ := genericrequest.UserFrom(r.ctx)
ans.UserName = userInfo.GetName()
requestInfo, ok := genericrequest.RequestInfoFrom(r.ctx)
if ok {
ans.RequestInfo = *requestInfo
}
}
return ans
}
}
// SetMapReduce is map-reduce starting from a set type in the sets package.
func SetMapReduce[Elt comparable, Result, Accumulator any](mapFn func(Elt) Result, reduceFn func(Accumulator, Result) Accumulator) func(map[Elt]sets.Empty) Accumulator {
return func(set map[Elt]sets.Empty) Accumulator {
var ans Accumulator
for elt := range set {
ans = reduceFn(ans, mapFn(elt))
}
return ans
}
}
// SliceMapReduce is map-reduce starting from a slice.
func SliceMapReduce[Elt, Result, Accumulator any](mapFn func(Elt) Result, reduceFn func(Accumulator, Result) Accumulator) func([]Elt) Accumulator {
return func(slice []Elt) Accumulator {
var ans Accumulator
for _, elt := range slice {
ans = reduceFn(ans, mapFn(elt))
}
return ans
}
}
func or(x, y bool) bool { return x || y }
func append1[Elt any](slice []Elt, next Elt) []Elt { return append(slice, next) }