From 20b5308c6c1ab367aee4ee1cc0437be5a61d6138 Mon Sep 17 00:00:00 2001 From: Li Bo Date: Tue, 6 Sep 2022 21:59:50 +0800 Subject: [PATCH 1/3] sort by PriorityLevel Configuration name when dumping priority levels --- .../pkg/util/flowcontrol/apf_controller_debug.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller_debug.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller_debug.go index 91c49a4beda..4834c30586f 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller_debug.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller_debug.go @@ -20,6 +20,7 @@ import ( "fmt" "io" "net/http" + "sort" "strconv" "strings" "text/tabwriter" @@ -59,7 +60,19 @@ func (cfgCtlr *configController) dumpPriorityLevels(w http.ResponseWriter, r *ht } tabPrint(tabWriter, rowForHeaders(columnHeaders)) endLine(tabWriter) - for _, plState := range cfgCtlr.priorityLevelStates { + plNames := make([]string, len(cfgCtlr.priorityLevelStates)) + i := 0 + for plName := range cfgCtlr.priorityLevelStates { + plNames[i] = plName + i++ + } + sort.Strings(plNames) + for i := range plNames { + plState, ok := cfgCtlr.priorityLevelStates[plNames[i]] + if !ok { + continue + } + if plState.queues == nil { tabPrint(tabWriter, row( plState.pl.Name, // 1 From c0bb425d8f3de4bd3468db9ba83962b777c6331b Mon Sep 17 00:00:00 2001 From: Li Bo Date: Tue, 6 Sep 2022 20:06:04 +0800 Subject: [PATCH 2/3] enhance priority-level dumping by adding total requests of dispatched,timed out and rejected --- .../util/flowcontrol/apf_controller_debug.go | 43 +++++++++++++------ .../pkg/util/flowcontrol/apf_filter.go | 1 + .../pkg/util/flowcontrol/debug/dump.go | 4 ++ .../fairqueuing/queueset/queueset.go | 40 +++++++++++++++++ 4 files changed, 75 insertions(+), 13 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller_debug.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller_debug.go index 4834c30586f..b4ccfed4093 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller_debug.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller_debug.go @@ -51,12 +51,16 @@ func (cfgCtlr *configController) dumpPriorityLevels(w http.ResponseWriter, r *ht defer cfgCtlr.lock.Unlock() tabWriter := tabwriter.NewWriter(w, 8, 0, 1, ' ', 0) columnHeaders := []string{ - "PriorityLevelName", // 1 - "ActiveQueues", // 2 - "IsIdle", // 3 - "IsQuiescing", // 4 - "WaitingRequests", // 5 - "ExecutingRequests", // 6 + "PriorityLevelName", // 1 + "ActiveQueues", // 2 + "IsIdle", // 3 + "IsQuiescing", // 4 + "WaitingRequests", // 5 + "ExecutingRequests", // 6 + "DispatchedRequests", // 7 + "RejectedRequests", // 8 + "TimedoutRequests", // 9 + "CancelledRequests", // 10 } tabPrint(tabWriter, rowForHeaders(columnHeaders)) endLine(tabWriter) @@ -81,6 +85,10 @@ func (cfgCtlr *configController) dumpPriorityLevels(w http.ResponseWriter, r *ht "", // 4 "", // 5 "", // 6 + "", // 7 + "", // 8 + "", // 9 + "", // 10 )) endLine(tabWriter) continue @@ -94,12 +102,16 @@ func (cfgCtlr *configController) dumpPriorityLevels(w http.ResponseWriter, r *ht } tabPrint(tabWriter, rowForPriorityLevel( - plState.pl.Name, // 1 - activeQueueNum, // 2 - plState.queues.IsIdle(), // 3 - plState.quiescing, // 4 - queueSetDigest.Waiting, // 5 - queueSetDigest.Executing, // 6 + plState.pl.Name, // 1 + activeQueueNum, // 2 + plState.queues.IsIdle(), // 3 + plState.quiescing, // 4 + queueSetDigest.Waiting, // 5 + queueSetDigest.Executing, // 6 + queueSetDigest.Dispatched, // 7 + queueSetDigest.Rejected, // 8 + queueSetDigest.Timedout, // 9 + queueSetDigest.Cancelled, // 10 )) endLine(tabWriter) } @@ -249,7 +261,8 @@ func rowForHeaders(headers []string) string { return row(headers...) } -func rowForPriorityLevel(plName string, activeQueues int, isIdle, isQuiescing bool, waitingRequests, executingRequests int) string { +func rowForPriorityLevel(plName string, activeQueues int, isIdle, isQuiescing bool, waitingRequests, executingRequests int, + dispatchedReqeusts, rejectedRequests, timedoutRequests, cancelledRequests int) string { return row( plName, strconv.Itoa(activeQueues), @@ -257,6 +270,10 @@ func rowForPriorityLevel(plName string, activeQueues int, isIdle, isQuiescing bo strconv.FormatBool(isQuiescing), strconv.Itoa(waitingRequests), strconv.Itoa(executingRequests), + strconv.Itoa(dispatchedReqeusts), + strconv.Itoa(rejectedRequests), + strconv.Itoa(timedoutRequests), + strconv.Itoa(cancelledRequests), ) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go index 037ac0db154..823e1cf34a2 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go @@ -181,6 +181,7 @@ func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest Reque metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) } metrics.AddDispatch(ctx, pl.Name, fs.Name) + fqs.OnRequestDispatched(ctx, req) executed = true startExecutionTime := time.Now() defer func() { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug/dump.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug/dump.go index 439d48c45ab..f2945b613f9 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug/dump.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug/dump.go @@ -30,6 +30,10 @@ type QueueSetDump struct { Executing int SeatsInUse int SeatsWaiting int + Dispatched int + Rejected int + Timedout int + Cancelled int } // QueueDump is an instant dump of one queue in a queue-set. diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 860e09cf8cc..8671b509840 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -148,6 +148,22 @@ type queueSet struct { // enqueues is the number of requests that have ever been enqueued enqueues int + + // totRequestsDispatched is the total number of requests of this + // queueSet that have been processed. + totRequestsDispatched int + + // totRequestsRejected is the total number of requests of this + // queueSet that have been rejected. + totRequestsRejected int + + // totRequestsTimedout is the total number of requests of this + // queueSet that have been timeouted. + totRequestsTimedout int + + // totRequestsCancelled is the total number of requests of this + // queueSet that have been cancelled. + totRequestsCancelled int } // NewQueueSetFactory creates a new QueueSetFactory object @@ -304,6 +320,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, workEstimate *fqrequest.Wo if !qs.canAccommodateSeatsLocked(workEstimate.MaxSeats()) { klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d seats are asked for, %d seats are in use (%d are executing) and the limit is %d", qs.qCfg.Name, fsName, descr1, descr2, workEstimate, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit) + qs.totRequestsRejected++ metrics.AddReject(ctx, qs.qCfg.Name, fsName, "concurrency-limit") return nil, qs.isIdleLocked() } @@ -323,6 +340,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, workEstimate *fqrequest.Wo // concurrency shares and at max queue length already if req == nil { klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v due to queue full", qs.qCfg.Name, fsName, descr1, descr2) + qs.totRequestsRejected++ metrics.AddReject(ctx, qs.qCfg.Name, fsName, "queue-full") return nil, qs.isIdleLocked() } @@ -400,6 +418,8 @@ func (req *request) wait() (bool, bool) { switch decisionAny { case decisionReject: klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, req.descr1, req.descr2) + qs.totRequestsRejected++ + qs.totRequestsTimedout++ metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "time-out") return false, qs.isIdleLocked() case decisionCancel: @@ -418,6 +438,8 @@ func (req *request) wait() (bool, bool) { defer qs.boundNextDispatchLocked(queue) qs.totRequestsWaiting-- qs.totSeatsWaiting -= req.MaxSeats() + qs.totRequestsRejected++ + qs.totRequestsCancelled++ metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "cancelled") metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) req.NoteQueued(false) @@ -1038,9 +1060,27 @@ func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump { Executing: qs.totRequestsExecuting, SeatsInUse: qs.totSeatsInUse, SeatsWaiting: qs.totSeatsWaiting, + Dispatched: qs.totRequestsDispatched, + Rejected: qs.totRequestsRejected, + Timedout: qs.totRequestsTimedout, + Cancelled: qs.totRequestsCancelled, } for i, q := range qs.queues { d.Queues[i] = q.dumpLocked(includeRequestDetails) } return d } + +func OnRequestDispatched(ctx context.Context, r fq.Request) { + req, ok := r.(*request) + if !ok { + return + } + + qs := req.qs + if qs != nil { + qs.lockAndSyncTime(ctx) + defer qs.lock.Unlock() + qs.totRequestsDispatched++ + } +} From 2d98d2412a153154396c7089fbe355a56e8a2329 Mon Sep 17 00:00:00 2001 From: Li Bo Date: Tue, 14 Feb 2023 15:10:54 +0800 Subject: [PATCH 3/3] refine code --- .../apiserver/pkg/util/flowcontrol/apf_controller_debug.go | 6 ++---- .../src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go | 2 +- .../pkg/util/flowcontrol/fairqueuing/queueset/queueset.go | 4 ++-- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller_debug.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller_debug.go index b4ccfed4093..0b9bc02f927 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller_debug.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller_debug.go @@ -64,11 +64,9 @@ func (cfgCtlr *configController) dumpPriorityLevels(w http.ResponseWriter, r *ht } tabPrint(tabWriter, rowForHeaders(columnHeaders)) endLine(tabWriter) - plNames := make([]string, len(cfgCtlr.priorityLevelStates)) - i := 0 + plNames := make([]string, 0, len(cfgCtlr.priorityLevelStates)) for plName := range cfgCtlr.priorityLevelStates { - plNames[i] = plName - i++ + plNames = append(plNames, plName) } sort.Strings(plNames) for i := range plNames { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go index 823e1cf34a2..f93e6a828ee 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go @@ -181,7 +181,7 @@ func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest Reque metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) } metrics.AddDispatch(ctx, pl.Name, fs.Name) - fqs.OnRequestDispatched(ctx, req) + fqs.OnRequestDispatched(req) executed = true startExecutionTime := time.Now() defer func() { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 8671b509840..71470d1b9db 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -1071,7 +1071,7 @@ func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump { return d } -func OnRequestDispatched(ctx context.Context, r fq.Request) { +func OnRequestDispatched(r fq.Request) { req, ok := r.(*request) if !ok { return @@ -1079,7 +1079,7 @@ func OnRequestDispatched(ctx context.Context, r fq.Request) { qs := req.qs if qs != nil { - qs.lockAndSyncTime(ctx) + qs.lock.Lock() defer qs.lock.Unlock() qs.totRequestsDispatched++ }