Merge pull request #112393 from borgerli/apf-pl-dump

APF: two improvements when dumping priority levels
This commit is contained in:
Kubernetes Prow Robot 2023-02-16 19:33:48 -08:00 committed by GitHub
commit e55f2a9b54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 87 additions and 14 deletions

View File

@ -20,6 +20,7 @@ import (
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"sort"
"strconv" "strconv"
"strings" "strings"
"text/tabwriter" "text/tabwriter"
@ -50,16 +51,30 @@ func (cfgCtlr *configController) dumpPriorityLevels(w http.ResponseWriter, r *ht
defer cfgCtlr.lock.Unlock() defer cfgCtlr.lock.Unlock()
tabWriter := tabwriter.NewWriter(w, 8, 0, 1, ' ', 0) tabWriter := tabwriter.NewWriter(w, 8, 0, 1, ' ', 0)
columnHeaders := []string{ columnHeaders := []string{
"PriorityLevelName", // 1 "PriorityLevelName", // 1
"ActiveQueues", // 2 "ActiveQueues", // 2
"IsIdle", // 3 "IsIdle", // 3
"IsQuiescing", // 4 "IsQuiescing", // 4
"WaitingRequests", // 5 "WaitingRequests", // 5
"ExecutingRequests", // 6 "ExecutingRequests", // 6
"DispatchedRequests", // 7
"RejectedRequests", // 8
"TimedoutRequests", // 9
"CancelledRequests", // 10
} }
tabPrint(tabWriter, rowForHeaders(columnHeaders)) tabPrint(tabWriter, rowForHeaders(columnHeaders))
endLine(tabWriter) endLine(tabWriter)
for _, plState := range cfgCtlr.priorityLevelStates { plNames := make([]string, 0, len(cfgCtlr.priorityLevelStates))
for plName := range cfgCtlr.priorityLevelStates {
plNames = append(plNames, plName)
}
sort.Strings(plNames)
for i := range plNames {
plState, ok := cfgCtlr.priorityLevelStates[plNames[i]]
if !ok {
continue
}
if plState.queues == nil { if plState.queues == nil {
tabPrint(tabWriter, row( tabPrint(tabWriter, row(
plState.pl.Name, // 1 plState.pl.Name, // 1
@ -68,6 +83,10 @@ func (cfgCtlr *configController) dumpPriorityLevels(w http.ResponseWriter, r *ht
"<none>", // 4 "<none>", // 4
"<none>", // 5 "<none>", // 5
"<none>", // 6 "<none>", // 6
"<none>", // 7
"<none>", // 8
"<none>", // 9
"<none>", // 10
)) ))
endLine(tabWriter) endLine(tabWriter)
continue continue
@ -81,12 +100,16 @@ func (cfgCtlr *configController) dumpPriorityLevels(w http.ResponseWriter, r *ht
} }
tabPrint(tabWriter, rowForPriorityLevel( tabPrint(tabWriter, rowForPriorityLevel(
plState.pl.Name, // 1 plState.pl.Name, // 1
activeQueueNum, // 2 activeQueueNum, // 2
plState.queues.IsIdle(), // 3 plState.queues.IsIdle(), // 3
plState.quiescing, // 4 plState.quiescing, // 4
queueSetDigest.Waiting, // 5 queueSetDigest.Waiting, // 5
queueSetDigest.Executing, // 6 queueSetDigest.Executing, // 6
queueSetDigest.Dispatched, // 7
queueSetDigest.Rejected, // 8
queueSetDigest.Timedout, // 9
queueSetDigest.Cancelled, // 10
)) ))
endLine(tabWriter) endLine(tabWriter)
} }
@ -236,7 +259,8 @@ func rowForHeaders(headers []string) string {
return row(headers...) return row(headers...)
} }
func rowForPriorityLevel(plName string, activeQueues int, isIdle, isQuiescing bool, waitingRequests, executingRequests int) string { func rowForPriorityLevel(plName string, activeQueues int, isIdle, isQuiescing bool, waitingRequests, executingRequests int,
dispatchedReqeusts, rejectedRequests, timedoutRequests, cancelledRequests int) string {
return row( return row(
plName, plName,
strconv.Itoa(activeQueues), strconv.Itoa(activeQueues),
@ -244,6 +268,10 @@ func rowForPriorityLevel(plName string, activeQueues int, isIdle, isQuiescing bo
strconv.FormatBool(isQuiescing), strconv.FormatBool(isQuiescing),
strconv.Itoa(waitingRequests), strconv.Itoa(waitingRequests),
strconv.Itoa(executingRequests), strconv.Itoa(executingRequests),
strconv.Itoa(dispatchedReqeusts),
strconv.Itoa(rejectedRequests),
strconv.Itoa(timedoutRequests),
strconv.Itoa(cancelledRequests),
) )
} }

View File

@ -181,6 +181,7 @@ func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest Reque
metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
} }
metrics.AddDispatch(ctx, pl.Name, fs.Name) metrics.AddDispatch(ctx, pl.Name, fs.Name)
fqs.OnRequestDispatched(req)
executed = true executed = true
startExecutionTime := time.Now() startExecutionTime := time.Now()
defer func() { defer func() {

View File

@ -30,6 +30,10 @@ type QueueSetDump struct {
Executing int Executing int
SeatsInUse int SeatsInUse int
SeatsWaiting int SeatsWaiting int
Dispatched int
Rejected int
Timedout int
Cancelled int
} }
// QueueDump is an instant dump of one queue in a queue-set. // QueueDump is an instant dump of one queue in a queue-set.

View File

@ -148,6 +148,22 @@ type queueSet struct {
// enqueues is the number of requests that have ever been enqueued // enqueues is the number of requests that have ever been enqueued
enqueues int enqueues int
// totRequestsDispatched is the total number of requests of this
// queueSet that have been processed.
totRequestsDispatched int
// totRequestsRejected is the total number of requests of this
// queueSet that have been rejected.
totRequestsRejected int
// totRequestsTimedout is the total number of requests of this
// queueSet that have been timeouted.
totRequestsTimedout int
// totRequestsCancelled is the total number of requests of this
// queueSet that have been cancelled.
totRequestsCancelled int
} }
// NewQueueSetFactory creates a new QueueSetFactory object // NewQueueSetFactory creates a new QueueSetFactory object
@ -304,6 +320,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, workEstimate *fqrequest.Wo
if !qs.canAccommodateSeatsLocked(workEstimate.MaxSeats()) { if !qs.canAccommodateSeatsLocked(workEstimate.MaxSeats()) {
klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d seats are asked for, %d seats are in use (%d are executing) and the limit is %d", klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d seats are asked for, %d seats are in use (%d are executing) and the limit is %d",
qs.qCfg.Name, fsName, descr1, descr2, workEstimate, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit) qs.qCfg.Name, fsName, descr1, descr2, workEstimate, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit)
qs.totRequestsRejected++
metrics.AddReject(ctx, qs.qCfg.Name, fsName, "concurrency-limit") metrics.AddReject(ctx, qs.qCfg.Name, fsName, "concurrency-limit")
return nil, qs.isIdleLocked() return nil, qs.isIdleLocked()
} }
@ -323,6 +340,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, workEstimate *fqrequest.Wo
// concurrency shares and at max queue length already // concurrency shares and at max queue length already
if req == nil { if req == nil {
klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v due to queue full", qs.qCfg.Name, fsName, descr1, descr2) klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v due to queue full", qs.qCfg.Name, fsName, descr1, descr2)
qs.totRequestsRejected++
metrics.AddReject(ctx, qs.qCfg.Name, fsName, "queue-full") metrics.AddReject(ctx, qs.qCfg.Name, fsName, "queue-full")
return nil, qs.isIdleLocked() return nil, qs.isIdleLocked()
} }
@ -400,6 +418,8 @@ func (req *request) wait() (bool, bool) {
switch decisionAny { switch decisionAny {
case decisionReject: case decisionReject:
klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, req.descr1, req.descr2) klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, req.descr1, req.descr2)
qs.totRequestsRejected++
qs.totRequestsTimedout++
metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "time-out") metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "time-out")
return false, qs.isIdleLocked() return false, qs.isIdleLocked()
case decisionCancel: case decisionCancel:
@ -418,6 +438,8 @@ func (req *request) wait() (bool, bool) {
defer qs.boundNextDispatchLocked(queue) defer qs.boundNextDispatchLocked(queue)
qs.totRequestsWaiting-- qs.totRequestsWaiting--
qs.totSeatsWaiting -= req.MaxSeats() qs.totSeatsWaiting -= req.MaxSeats()
qs.totRequestsRejected++
qs.totRequestsCancelled++
metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "cancelled") metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "cancelled")
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
req.NoteQueued(false) req.NoteQueued(false)
@ -1038,9 +1060,27 @@ func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump {
Executing: qs.totRequestsExecuting, Executing: qs.totRequestsExecuting,
SeatsInUse: qs.totSeatsInUse, SeatsInUse: qs.totSeatsInUse,
SeatsWaiting: qs.totSeatsWaiting, SeatsWaiting: qs.totSeatsWaiting,
Dispatched: qs.totRequestsDispatched,
Rejected: qs.totRequestsRejected,
Timedout: qs.totRequestsTimedout,
Cancelled: qs.totRequestsCancelled,
} }
for i, q := range qs.queues { for i, q := range qs.queues {
d.Queues[i] = q.dumpLocked(includeRequestDetails) d.Queues[i] = q.dumpLocked(includeRequestDetails)
} }
return d return d
} }
func OnRequestDispatched(r fq.Request) {
req, ok := r.(*request)
if !ok {
return
}
qs := req.qs
if qs != nil {
qs.lock.Lock()
defer qs.lock.Unlock()
qs.totRequestsDispatched++
}
}