From 567becd5eedd9a1f3802f2b3b0b6b8efc445d2a1 Mon Sep 17 00:00:00 2001 From: yue9944882 <291271447@qq.com> Date: Fri, 22 May 2020 13:02:10 +0800 Subject: [PATCH] introduce apf debug endpoint --- staging/src/k8s.io/apiserver/pkg/server/BUILD | 2 + .../src/k8s.io/apiserver/pkg/server/config.go | 5 + .../apiserver/pkg/util/flowcontrol/BUILD | 6 + .../pkg/util/flowcontrol/apf_controller.go | 5 +- .../util/flowcontrol/apf_controller_debug.go | 277 ++++++++++++++++++ .../pkg/util/flowcontrol/apf_filter.go | 4 + .../pkg/util/flowcontrol/controller_test.go | 7 +- .../pkg/util/flowcontrol/debug/BUILD | 24 ++ .../pkg/util/flowcontrol/debug/dump.go | 47 +++ .../pkg/util/flowcontrol/fairqueuing/BUILD | 1 + .../util/flowcontrol/fairqueuing/interface.go | 10 +- .../flowcontrol/fairqueuing/queueset/BUILD | 2 + .../fairqueuing/queueset/queueset.go | 59 ++-- .../fairqueuing/queueset/queueset_test.go | 6 +- .../flowcontrol/fairqueuing/queueset/types.go | 35 ++- .../flowcontrol/fairqueuing/testing/BUILD | 1 + .../fairqueuing/testing/no-restraint.go | 7 +- vendor/modules.txt | 1 + 18 files changed, 467 insertions(+), 32 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller_debug.go create mode 100644 staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug/BUILD create mode 100644 staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug/dump.go diff --git a/staging/src/k8s.io/apiserver/pkg/server/BUILD b/staging/src/k8s.io/apiserver/pkg/server/BUILD index 1f1a706a5fc..96fb73b6a2c 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/BUILD @@ -104,6 +104,7 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/openapi:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/features:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates:go_default_library", @@ -113,6 +114,7 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/server/mux:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/routes:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/storage:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/openapi:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 930ad28403d..c1d1eb61fc4 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -53,6 +53,7 @@ import ( genericapifilters "k8s.io/apiserver/pkg/endpoints/filters" apiopenapi "k8s.io/apiserver/pkg/endpoints/openapi" apirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/features" genericregistry "k8s.io/apiserver/pkg/registry/generic" "k8s.io/apiserver/pkg/server/dynamiccertificates" "k8s.io/apiserver/pkg/server/egressselector" @@ -60,6 +61,7 @@ import ( "k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/server/routes" serverstore "k8s.io/apiserver/pkg/server/storage" + "k8s.io/apiserver/pkg/util/feature" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" "k8s.io/client-go/informers" restclient "k8s.io/client-go/rest" @@ -708,6 +710,9 @@ func installAPI(s *GenericAPIServer, c *Config) { if c.EnableDiscovery { s.Handler.GoRestfulContainer.Add(s.DiscoveryGroupManager.WebService()) } + if feature.DefaultFeatureGate.Enabled(features.APIPriorityAndFairness) { + c.FlowControl.Install(s.Handler.NonGoRestfulMux) + } } func NewRequestInfoResolver(c *Config) *apirequest.RequestInfoFactory { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/BUILD b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/BUILD index 378dd43e517..32a9d47dbbe 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/BUILD @@ -4,6 +4,7 @@ go_library( name = "go_default_library", srcs = [ "apf_controller.go", + "apf_controller_debug.go", "apf_filter.go", "formatting.go", "rule.go", @@ -16,14 +17,17 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/serviceaccount:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/server/mux:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/apihelpers:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library", @@ -52,6 +56,7 @@ filegroup( srcs = [ ":package-srcs", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:all-srcs", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/format:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:all-srcs", @@ -76,6 +81,7 @@ go_test( "//staging/src/k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/format:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index e1c446576ea..577ae2eb1f6 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -641,14 +641,15 @@ func (cfgCtl *configController) startRequest(ctx context.Context, rd RequestDige numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues } + var flowDistinguisher string var hashValue uint64 if numQueues > 1 { - flowDistinguisher := computeFlowDistinguisher(rd, fs.Spec.DistinguisherMethod) + flowDistinguisher = computeFlowDistinguisher(rd, fs.Spec.DistinguisherMethod) hashValue = hashFlowID(fs.Name, flowDistinguisher) } startWaitingTime = time.Now() klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, fs.Name, fs.Spec.DistinguisherMethod, plName, numQueues) - req, idle := plState.queues.StartRequest(ctx, hashValue, fs.Name, rd.RequestInfo, rd.User) + req, idle := plState.queues.StartRequest(ctx, hashValue, flowDistinguisher, fs.Name, rd.RequestInfo, rd.User) if idle { cfgCtl.maybeReapLocked(plName, plState) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller_debug.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller_debug.go new file mode 100644 index 00000000000..4a467b6d9f2 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller_debug.go @@ -0,0 +1,277 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flowcontrol + +import ( + "fmt" + "io" + "net/http" + "strconv" + "strings" + "text/tabwriter" + "time" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apiserver/pkg/server/mux" +) + +const ( + queryIncludeRequestDetails = "includeRequestDetails" +) + +func (cfgCtl *configController) Install(c *mux.PathRecorderMux) { + // TODO(yue9944882): handle "Accept" header properly + // debugging dumps a CSV content for three levels of granularity + // 1. row per priority-level + c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_priority_levels", cfgCtl.dumpPriorityLevels) + // 2. row per queue + c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_queues", cfgCtl.dumpQueues) + // 3. row per request + c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_requests", cfgCtl.dumpRequests) +} + +func (cfgCtl *configController) dumpPriorityLevels(w http.ResponseWriter, r *http.Request) { + cfgCtl.lock.Lock() + defer cfgCtl.lock.Unlock() + tabWriter := tabwriter.NewWriter(w, 8, 0, 1, ' ', 0) + columnHeaders := []string{ + "PriorityLevelName", // 1 + "ActiveQueues", // 2 + "IsIdle", // 3 + "IsQuiescing", // 4 + "WaitingRequests", // 5 + "ExecutingRequests", // 6 + } + tabPrint(tabWriter, rowForHeaders(columnHeaders)) + endline(tabWriter) + for _, plState := range cfgCtl.priorityLevelStates { + if plState.queues == nil { + tabPrint(tabWriter, row( + plState.pl.Name, // 1 + "", // 2 + "", // 3 + "", // 4 + "", // 5 + "", // 6 + )) + endline(tabWriter) + continue + } + queueSetDigest := plState.queues.Dump(false) + activeQueueNum := 0 + for _, q := range queueSetDigest.Queues { + if len(q.Requests) > 0 { + activeQueueNum++ + } + } + + tabPrint(tabWriter, rowForPriorityLevel( + plState.pl.Name, // 1 + activeQueueNum, // 2 + plState.queues.IsIdle(), // 3 + plState.quiescing, // 4 + queueSetDigest.Waiting, // 5 + queueSetDigest.Executing, // 6 + )) + endline(tabWriter) + } + runtime.HandleError(tabWriter.Flush()) +} + +func (cfgCtl *configController) dumpQueues(w http.ResponseWriter, r *http.Request) { + cfgCtl.lock.Lock() + defer cfgCtl.lock.Unlock() + tabWriter := tabwriter.NewWriter(w, 8, 0, 1, ' ', 0) + columnHeaders := []string{ + "PriorityLevelName", // 1 + "Index", // 2 + "PendingRequests", // 3 + "ExecutingRequests", // 4 + "VirtualStart", // 5 + } + tabPrint(tabWriter, rowForHeaders(columnHeaders)) + endline(tabWriter) + for _, plState := range cfgCtl.priorityLevelStates { + if plState.queues == nil { + tabPrint(tabWriter, row( + plState.pl.Name, // 1 + "", // 2 + "", // 3 + "", // 4 + "", // 5 + )) + endline(tabWriter) + continue + } + queueSetDigest := plState.queues.Dump(false) + for i, q := range queueSetDigest.Queues { + tabPrint(tabWriter, rowForQueue( + plState.pl.Name, // 1 + i, // 2 + len(q.Requests), // 3 + q.ExecutingRequests, // 4 + q.VirtualStart, // 5 + )) + endline(tabWriter) + } + } + runtime.HandleError(tabWriter.Flush()) +} + +func (cfgCtl *configController) dumpRequests(w http.ResponseWriter, r *http.Request) { + cfgCtl.lock.Lock() + defer cfgCtl.lock.Unlock() + + includeRequestDetails := len(r.URL.Query().Get(queryIncludeRequestDetails)) > 0 + + tabWriter := tabwriter.NewWriter(w, 8, 0, 1, ' ', 0) + tabPrint(tabWriter, rowForHeaders([]string{ + "PriorityLevelName", // 1 + "FlowSchemaName", // 2 + "QueueIndex", // 3 + "RequestIndexInQueue", // 4 + "FlowDistingsher", // 5 + "ArriveTime", // 6 + })) + if includeRequestDetails { + tabPrint(tabWriter, rowForHeaders([]string{ + "UserName", // 7 + "Verb", // 8 + "APIPath", // 9 + "Namespace", // 10 + "Name", // 11 + "APIVersion", // 12 + "Resource", // 13 + "SubResource", // 14 + })) + } + endline(tabWriter) + for _, plState := range cfgCtl.priorityLevelStates { + if plState.queues == nil { + tabPrint(tabWriter, row( + plState.pl.Name, // 1 + "", // 2 + "", // 3 + "", // 4 + "", // 5 + "", // 6 + )) + if includeRequestDetails { + tabPrint(tabWriter, row( + "", // 7 + "", // 8 + "", // 9 + "", // 10 + "", // 11 + "", // 12 + "", // 13 + "", // 14 + )) + } + endline(tabWriter) + continue + } + queueSetDigest := plState.queues.Dump(includeRequestDetails) + for iq, q := range queueSetDigest.Queues { + for ir, r := range q.Requests { + tabPrint(tabWriter, rowForRequest( + plState.pl.Name, // 1 + r.MatchedFlowSchema, // 2 + iq, // 3 + ir, // 4 + r.FlowDistinguisher, // 5 + r.ArriveTime, // 6 + )) + if includeRequestDetails { + tabPrint(tabWriter, rowForRequestDetails( + r.UserName, // 7 + r.RequestInfo.Verb, // 8 + r.RequestInfo.Path, // 9 + r.RequestInfo.Namespace, // 10 + r.RequestInfo.Name, // 11 + schema.GroupVersion{ + Group: r.RequestInfo.APIGroup, + Version: r.RequestInfo.APIVersion, + }.String(), // 12 + r.RequestInfo.Resource, // 13 + r.RequestInfo.Subresource, // 14 + )) + } + endline(tabWriter) + } + } + } + runtime.HandleError(tabWriter.Flush()) +} + +func tabPrint(w io.Writer, row string) { + _, err := fmt.Fprint(w, row) + runtime.HandleError(err) +} +func endline(w io.Writer) { + _, err := fmt.Fprint(w, "\n") + runtime.HandleError(err) +} + +func rowForHeaders(headers []string) string { + return row(headers...) +} + +func rowForPriorityLevel(plName string, activeQueues int, isIdle, isQuiescing bool, waitingRequests, executingRequests int) string { + return row( + plName, + strconv.Itoa(activeQueues), + strconv.FormatBool(isIdle), + strconv.FormatBool(isQuiescing), + strconv.Itoa(waitingRequests), + strconv.Itoa(executingRequests), + ) +} + +func rowForQueue(plName string, index, waitingRequests, executingRequests int, virtualStart float64) string { + return row( + plName, + strconv.Itoa(index), + strconv.Itoa(waitingRequests), + strconv.Itoa(executingRequests), + fmt.Sprintf("%.4f", virtualStart), + ) +} + +func rowForRequest(plName, fsName string, queueIndex, requestIndex int, flowDistinguisher string, arriveTime time.Time) string { + return row( + plName, + fsName, + strconv.Itoa(queueIndex), + strconv.Itoa(requestIndex), + flowDistinguisher, + arriveTime.UTC().Format(time.RFC3339Nano), + ) +} + +func rowForRequestDetails(username, verb, path, namespace, name, apiVersion, resource, subResource string) string { + return row( + username, + verb, + path, + ) +} + +func row(columns ...string) string { + return strings.Join(columns, ",\t") + ",\t" +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go index 79bc9faad1a..0ee68f2340f 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go @@ -22,6 +22,7 @@ import ( "time" "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apiserver/pkg/server/mux" "k8s.io/apiserver/pkg/util/flowcontrol/counter" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset" @@ -51,6 +52,9 @@ type Interface interface { // any needed changes to local behavior. This method ceases // activity and returns after the given channel is closed. Run(stopCh <-chan struct{}) error + + // Install installs debugging endpoints to the web-server. + Install(c *mux.PathRecorderMux) } // This request filter implements https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go index 84a5d72c5a8..faa8c74080f 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go @@ -28,6 +28,7 @@ import ( fcv1a1 "k8s.io/api/flowcontrol/v1alpha1" "k8s.io/apimachinery/pkg/util/sets" fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" + "k8s.io/apiserver/pkg/util/flowcontrol/debug" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format" "k8s.io/client-go/informers" @@ -95,6 +96,10 @@ func (cqs *ctlTestQueueSet) BeginConfigChange(qc fq.QueuingConfig) (fq.QueueSetC return ctlTestQueueSetCompleter{cqs.cts, cqs, qc}, nil } +func (cqs *ctlTestQueueSet) Dump(bool) debug.QueueSetDump { + return debug.QueueSetDump{} +} + func (cqc ctlTestQueueSetCompleter) Complete(dc fq.DispatchingConfig) fq.QueueSet { cqc.cts.lock.Lock() defer cqc.cts.lock.Unlock() @@ -115,7 +120,7 @@ func (cqs *ctlTestQueueSet) IsIdle() bool { return cqs.countActive == 0 } -func (cqs *ctlTestQueueSet) StartRequest(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) (req fq.Request, idle bool) { +func (cqs *ctlTestQueueSet) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) (req fq.Request, idle bool) { cqs.cts.lock.Lock() defer cqs.cts.lock.Unlock() cqs.countActive++ diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug/BUILD b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug/BUILD new file mode 100644 index 00000000000..2d7ebcd405e --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug/BUILD @@ -0,0 +1,24 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["dump.go"], + importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/debug", + importpath = "k8s.io/apiserver/pkg/util/flowcontrol/debug", + visibility = ["//visibility:public"], + deps = ["//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug/dump.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug/dump.go new file mode 100644 index 00000000000..d668d9fe7b2 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug/dump.go @@ -0,0 +1,47 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package debug + +import ( + "k8s.io/apiserver/pkg/endpoints/request" + "time" +) + +// QueueSetDump is an instant dump of queue-set. +type QueueSetDump struct { + Queues []QueueDump + Waiting int + Executing int +} + +// QueueDump is an instant dump of one queue in a queue-set. +type QueueDump struct { + Requests []RequestDump + VirtualStart float64 + ExecutingRequests int +} + +// RequestDump is an instant dump of one requests pending in the queue. +type RequestDump struct { + MatchedFlowSchema string + FlowDistinguisher string + ArriveTime time.Time + StartTime time.Time + // request details + UserName string + RequestInfo request.RequestInfo +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/BUILD b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/BUILD index fcd44292fda..ee1de9ce75f 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/BUILD @@ -6,6 +6,7 @@ go_library( importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing", importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing", visibility = ["//visibility:public"], + deps = ["//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug:go_default_library"], ) filegroup( diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go index 5e573bf8f21..3ac03f78758 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go @@ -19,6 +19,8 @@ package fairqueuing import ( "context" "time" + + "k8s.io/apiserver/pkg/util/flowcontrol/debug" ) // QueueSetFactory is used to create QueueSet objects. Creation, like @@ -77,7 +79,13 @@ type QueueSet interface { // was idle at the moment of the return. Otherwise idle==false // and the client must call the Finish method of the Request // exactly once. - StartRequest(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) (req Request, idle bool) + StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) (req Request, idle bool) + + // Dump saves and returns the instant internal state of the queue-set. + // Note that dumping process will stop the queue-set from proceeding + // any requests. + // For debugging only. + Dump(includeRequestDetails bool) debug.QueueSetDump } // Request represents the remainder of the handling of one request diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/BUILD b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/BUILD index 7380fe60b80..75a78ece462 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/BUILD @@ -13,7 +13,9 @@ go_library( deps = [ "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 57ce829bcc5..316ca34794c 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apiserver/pkg/util/flowcontrol/counter" + "k8s.io/apiserver/pkg/util/flowcontrol/debug" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise" "k8s.io/apiserver/pkg/util/flowcontrol/metrics" @@ -221,7 +222,7 @@ const ( // executing at each point where there is a change in that quantity, // because the metrics --- and only the metrics --- track that // quantity per FlowSchema. -func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) (fq.Request, bool) { +func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) (fq.Request, bool) { qs.lockAndSyncTime() defer qs.lock.Unlock() var req *request @@ -235,7 +236,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, fsName s metrics.AddReject(qs.qCfg.Name, fsName, "concurrency-limit") return nil, qs.isIdleLocked() } - req = qs.dispatchSansQueueLocked(ctx, fsName, descr1, descr2) + req = qs.dispatchSansQueueLocked(ctx, flowDistinguisher, fsName, descr1, descr2) return req, false } @@ -246,7 +247,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, fsName s // 3) Reject current request if there is not enough concurrency shares and // we are at max queue length // 4) If not rejected, create a request and enqueue - req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, hashValue, fsName, descr1, descr2) + req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, hashValue, flowDistinguisher, fsName, descr1, descr2) // req == nil means that the request was rejected - no remaining // concurrency shares and at max queue length already if req == nil { @@ -398,7 +399,7 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 { // returns the enqueud request on a successful enqueue // returns nil in the case that there is no available concurrency or // the queuelengthlimit has been reached -func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) *request { +func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request { // Start with the shuffle sharding, to pick a queue. queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2) queue := qs.queues[queueIdx] @@ -410,14 +411,15 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte // Create a request and enqueue req := &request{ - qs: qs, - fsName: fsName, - ctx: ctx, - decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter), - arrivalTime: qs.clock.Now(), - queue: queue, - descr1: descr1, - descr2: descr2, + qs: qs, + fsName: fsName, + flowDistinguisher: flowDistinguisher, + ctx: ctx, + decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter), + arrivalTime: qs.clock.Now(), + queue: queue, + descr1: descr1, + descr2: descr2, } if ok := qs.rejectOrEnqueueLocked(req); !ok { return nil @@ -523,17 +525,18 @@ func (qs *queueSet) dispatchAsMuchAsPossibleLocked() { } } -func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, fsName string, descr1, descr2 interface{}) *request { +func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request { now := qs.clock.Now() req := &request{ - qs: qs, - fsName: fsName, - ctx: ctx, - startTime: now, - decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter), - arrivalTime: now, - descr1: descr1, - descr2: descr2, + qs: qs, + fsName: fsName, + flowDistinguisher: flowDistinguisher, + ctx: ctx, + startTime: now, + decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter), + arrivalTime: now, + descr1: descr1, + descr2: descr2, } req.decision.SetLocked(decisionExecute) qs.totRequestsExecuting++ @@ -709,3 +712,17 @@ func (qs *queueSet) preCreateOrUnblockGoroutine() { func (qs *queueSet) goroutineDoneOrBlocked() { qs.counter.Add(-1) } + +func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump { + qs.lock.Lock() + defer qs.lock.Unlock() + d := debug.QueueSetDump{ + Queues: make([]debug.QueueDump, len(qs.queues)), + Waiting: qs.totRequestsWaiting, + Executing: qs.totRequestsExecuting, + } + for i, q := range qs.queues { + d.Queues[i] = q.dump(includeRequestDetails) + } + return d +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 50bc2f7b4b8..e9c700efcc4 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -79,7 +79,7 @@ func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet, go func(i, j int, uc uniformClient, igr test.Integrator) { for k := 0; k < uc.nCalls; k++ { ClockWait(clk, counter, uc.thinkDuration) - req, idle := qs.StartRequest(context.Background(), uc.hash, fsName, name, []int{i, j, k}) + req, idle := qs.StartRequest(context.Background(), uc.hash, "", fsName, name, []int{i, j, k}) t.Logf("%s: %d, %d, %d got req=%p, idle=%v", clk.Now().Format(nsTimeFmt), i, j, k, req, idle) if req == nil { atomic.AddUint64(&failedCount, 1) @@ -346,7 +346,7 @@ func TestContextCancel(t *testing.T) { qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1}) counter.Add(1) // account for the goroutine running this test ctx1 := context.Background() - req1, _ := qs.StartRequest(ctx1, 1, "fs1", "test", "one") + req1, _ := qs.StartRequest(ctx1, 1, "", "fs1", "test", "one") if req1 == nil { t.Error("Request rejected") return @@ -362,7 +362,7 @@ func TestContextCancel(t *testing.T) { counter.Add(1) cancel2() }() - req2, idle2a := qs.StartRequest(ctx2, 2, "fs2", "test", "two") + req2, idle2a := qs.StartRequest(ctx2, 2, "", "fs2", "test", "two") if idle2a { t.Error("2nd StartRequest returned idle") } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index 1facc701d9e..1bcb8cfb32c 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -20,15 +20,20 @@ import ( "context" "time" + genericrequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/util/flowcontrol/debug" "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise" ) // request is a temporary container for "requests" with additional // tracking fields required for the functionality FQScheduler type request struct { - qs *queueSet - fsName string - ctx context.Context + ctx context.Context + + qs *queueSet + + flowDistinguisher string + fsName string // The relevant queue. Is nil if this request did not go through // a queue. @@ -94,3 +99,27 @@ func (q *queue) GetVirtualFinish(J int, G float64) float64 { jg := float64(J+1) * float64(G) return jg + q.virtualStart } + +func (q *queue) dump(includeDetails bool) debug.QueueDump { + digest := make([]debug.RequestDump, len(q.requests)) + for i, r := range q.requests { + // dump requests. + digest[i].MatchedFlowSchema = r.fsName + digest[i].FlowDistinguisher = r.flowDistinguisher + digest[i].ArriveTime = r.arrivalTime + digest[i].StartTime = r.startTime + if includeDetails { + userInfo, _ := genericrequest.UserFrom(r.ctx) + digest[i].UserName = userInfo.GetName() + requestInfo, ok := genericrequest.RequestInfoFrom(r.ctx) + if ok { + digest[i].RequestInfo = *requestInfo + } + } + } + return debug.QueueDump{ + VirtualStart: q.virtualStart, + Requests: digest, + ExecutingRequests: q.requestsExecuting, + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/BUILD b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/BUILD index 4fdeb893837..558001431b2 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/BUILD @@ -11,6 +11,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go index 14504f20179..72e7f5706ef 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go @@ -19,6 +19,7 @@ package testing import ( "context" + "k8s.io/apiserver/pkg/util/flowcontrol/debug" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" ) @@ -53,10 +54,14 @@ func (noRestraint) IsIdle() bool { return false } -func (noRestraint) StartRequest(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) (fq.Request, bool) { +func (noRestraint) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) (fq.Request, bool) { return noRestraintRequest{}, false } +func (noRestraint) Dump(bool) debug.QueueSetDump { + return debug.QueueSetDump{} +} + func (noRestraintRequest) Finish(execute func()) (idle bool) { execute() return false diff --git a/vendor/modules.txt b/vendor/modules.txt index 9209bcb18aa..c6805e66ab7 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1362,6 +1362,7 @@ k8s.io/apiserver/pkg/util/dryrun k8s.io/apiserver/pkg/util/feature k8s.io/apiserver/pkg/util/flowcontrol k8s.io/apiserver/pkg/util/flowcontrol/counter +k8s.io/apiserver/pkg/util/flowcontrol/debug k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise