mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #90967 from yue9944882/feat/apf-debug
Introducing APF debugging endpoint w/ three levels of granularity
This commit is contained in:
commit
eab28c7ab4
@ -104,6 +104,7 @@ go_library(
|
|||||||
"//staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/endpoints/openapi:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/endpoints/openapi:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates:go_default_library",
|
||||||
@ -113,6 +114,7 @@ go_library(
|
|||||||
"//staging/src/k8s.io/apiserver/pkg/server/mux:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/server/mux:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/server/routes:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/server/routes:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/server/storage:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/server/storage:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/openapi:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/openapi:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||||
|
@ -53,6 +53,7 @@ import (
|
|||||||
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
|
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
|
||||||
apiopenapi "k8s.io/apiserver/pkg/endpoints/openapi"
|
apiopenapi "k8s.io/apiserver/pkg/endpoints/openapi"
|
||||||
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||||
|
"k8s.io/apiserver/pkg/features"
|
||||||
genericregistry "k8s.io/apiserver/pkg/registry/generic"
|
genericregistry "k8s.io/apiserver/pkg/registry/generic"
|
||||||
"k8s.io/apiserver/pkg/server/dynamiccertificates"
|
"k8s.io/apiserver/pkg/server/dynamiccertificates"
|
||||||
"k8s.io/apiserver/pkg/server/egressselector"
|
"k8s.io/apiserver/pkg/server/egressselector"
|
||||||
@ -60,6 +61,7 @@ import (
|
|||||||
"k8s.io/apiserver/pkg/server/healthz"
|
"k8s.io/apiserver/pkg/server/healthz"
|
||||||
"k8s.io/apiserver/pkg/server/routes"
|
"k8s.io/apiserver/pkg/server/routes"
|
||||||
serverstore "k8s.io/apiserver/pkg/server/storage"
|
serverstore "k8s.io/apiserver/pkg/server/storage"
|
||||||
|
"k8s.io/apiserver/pkg/util/feature"
|
||||||
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
restclient "k8s.io/client-go/rest"
|
restclient "k8s.io/client-go/rest"
|
||||||
@ -709,6 +711,9 @@ func installAPI(s *GenericAPIServer, c *Config) {
|
|||||||
if c.EnableDiscovery {
|
if c.EnableDiscovery {
|
||||||
s.Handler.GoRestfulContainer.Add(s.DiscoveryGroupManager.WebService())
|
s.Handler.GoRestfulContainer.Add(s.DiscoveryGroupManager.WebService())
|
||||||
}
|
}
|
||||||
|
if feature.DefaultFeatureGate.Enabled(features.APIPriorityAndFairness) {
|
||||||
|
c.FlowControl.Install(s.Handler.NonGoRestfulMux)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRequestInfoResolver(c *Config) *apirequest.RequestInfoFactory {
|
func NewRequestInfoResolver(c *Config) *apirequest.RequestInfoFactory {
|
||||||
|
@ -4,6 +4,7 @@ go_library(
|
|||||||
name = "go_default_library",
|
name = "go_default_library",
|
||||||
srcs = [
|
srcs = [
|
||||||
"apf_controller.go",
|
"apf_controller.go",
|
||||||
|
"apf_controller_debug.go",
|
||||||
"apf_filter.go",
|
"apf_filter.go",
|
||||||
"formatting.go",
|
"formatting.go",
|
||||||
"rule.go",
|
"rule.go",
|
||||||
@ -16,14 +17,17 @@ go_library(
|
|||||||
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/authentication/serviceaccount:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/authentication/serviceaccount:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/server/mux:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/apihelpers:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/apihelpers:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
|
||||||
@ -52,6 +56,7 @@ filegroup(
|
|||||||
srcs = [
|
srcs = [
|
||||||
":package-srcs",
|
":package-srcs",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:all-srcs",
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:all-srcs",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug:all-srcs",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:all-srcs",
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:all-srcs",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/format:all-srcs",
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/format:all-srcs",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:all-srcs",
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:all-srcs",
|
||||||
@ -76,6 +81,7 @@ go_test(
|
|||||||
"//staging/src/k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/format:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/format:go_default_library",
|
||||||
|
@ -641,14 +641,15 @@ func (cfgCtl *configController) startRequest(ctx context.Context, rd RequestDige
|
|||||||
numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues
|
numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues
|
||||||
|
|
||||||
}
|
}
|
||||||
|
var flowDistinguisher string
|
||||||
var hashValue uint64
|
var hashValue uint64
|
||||||
if numQueues > 1 {
|
if numQueues > 1 {
|
||||||
flowDistinguisher := computeFlowDistinguisher(rd, fs.Spec.DistinguisherMethod)
|
flowDistinguisher = computeFlowDistinguisher(rd, fs.Spec.DistinguisherMethod)
|
||||||
hashValue = hashFlowID(fs.Name, flowDistinguisher)
|
hashValue = hashFlowID(fs.Name, flowDistinguisher)
|
||||||
}
|
}
|
||||||
startWaitingTime = time.Now()
|
startWaitingTime = time.Now()
|
||||||
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, fs.Name, fs.Spec.DistinguisherMethod, plName, numQueues)
|
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, fs.Name, fs.Spec.DistinguisherMethod, plName, numQueues)
|
||||||
req, idle := plState.queues.StartRequest(ctx, hashValue, fs.Name, rd.RequestInfo, rd.User)
|
req, idle := plState.queues.StartRequest(ctx, hashValue, flowDistinguisher, fs.Name, rd.RequestInfo, rd.User)
|
||||||
if idle {
|
if idle {
|
||||||
cfgCtl.maybeReapLocked(plName, plState)
|
cfgCtl.maybeReapLocked(plName, plState)
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,277 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2019 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package flowcontrol
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"text/tabwriter"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
"k8s.io/apimachinery/pkg/util/runtime"
|
||||||
|
"k8s.io/apiserver/pkg/server/mux"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
queryIncludeRequestDetails = "includeRequestDetails"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (cfgCtl *configController) Install(c *mux.PathRecorderMux) {
|
||||||
|
// TODO(yue9944882): handle "Accept" header properly
|
||||||
|
// debugging dumps a CSV content for three levels of granularity
|
||||||
|
// 1. row per priority-level
|
||||||
|
c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_priority_levels", cfgCtl.dumpPriorityLevels)
|
||||||
|
// 2. row per queue
|
||||||
|
c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_queues", cfgCtl.dumpQueues)
|
||||||
|
// 3. row per request
|
||||||
|
c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_requests", cfgCtl.dumpRequests)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cfgCtl *configController) dumpPriorityLevels(w http.ResponseWriter, r *http.Request) {
|
||||||
|
cfgCtl.lock.Lock()
|
||||||
|
defer cfgCtl.lock.Unlock()
|
||||||
|
tabWriter := tabwriter.NewWriter(w, 8, 0, 1, ' ', 0)
|
||||||
|
columnHeaders := []string{
|
||||||
|
"PriorityLevelName", // 1
|
||||||
|
"ActiveQueues", // 2
|
||||||
|
"IsIdle", // 3
|
||||||
|
"IsQuiescing", // 4
|
||||||
|
"WaitingRequests", // 5
|
||||||
|
"ExecutingRequests", // 6
|
||||||
|
}
|
||||||
|
tabPrint(tabWriter, rowForHeaders(columnHeaders))
|
||||||
|
endline(tabWriter)
|
||||||
|
for _, plState := range cfgCtl.priorityLevelStates {
|
||||||
|
if plState.queues == nil {
|
||||||
|
tabPrint(tabWriter, row(
|
||||||
|
plState.pl.Name, // 1
|
||||||
|
"<none>", // 2
|
||||||
|
"<none>", // 3
|
||||||
|
"<none>", // 4
|
||||||
|
"<none>", // 5
|
||||||
|
"<none>", // 6
|
||||||
|
))
|
||||||
|
endline(tabWriter)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
queueSetDigest := plState.queues.Dump(false)
|
||||||
|
activeQueueNum := 0
|
||||||
|
for _, q := range queueSetDigest.Queues {
|
||||||
|
if len(q.Requests) > 0 {
|
||||||
|
activeQueueNum++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tabPrint(tabWriter, rowForPriorityLevel(
|
||||||
|
plState.pl.Name, // 1
|
||||||
|
activeQueueNum, // 2
|
||||||
|
plState.queues.IsIdle(), // 3
|
||||||
|
plState.quiescing, // 4
|
||||||
|
queueSetDigest.Waiting, // 5
|
||||||
|
queueSetDigest.Executing, // 6
|
||||||
|
))
|
||||||
|
endline(tabWriter)
|
||||||
|
}
|
||||||
|
runtime.HandleError(tabWriter.Flush())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cfgCtl *configController) dumpQueues(w http.ResponseWriter, r *http.Request) {
|
||||||
|
cfgCtl.lock.Lock()
|
||||||
|
defer cfgCtl.lock.Unlock()
|
||||||
|
tabWriter := tabwriter.NewWriter(w, 8, 0, 1, ' ', 0)
|
||||||
|
columnHeaders := []string{
|
||||||
|
"PriorityLevelName", // 1
|
||||||
|
"Index", // 2
|
||||||
|
"PendingRequests", // 3
|
||||||
|
"ExecutingRequests", // 4
|
||||||
|
"VirtualStart", // 5
|
||||||
|
}
|
||||||
|
tabPrint(tabWriter, rowForHeaders(columnHeaders))
|
||||||
|
endline(tabWriter)
|
||||||
|
for _, plState := range cfgCtl.priorityLevelStates {
|
||||||
|
if plState.queues == nil {
|
||||||
|
tabPrint(tabWriter, row(
|
||||||
|
plState.pl.Name, // 1
|
||||||
|
"<none>", // 2
|
||||||
|
"<none>", // 3
|
||||||
|
"<none>", // 4
|
||||||
|
"<none>", // 5
|
||||||
|
))
|
||||||
|
endline(tabWriter)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
queueSetDigest := plState.queues.Dump(false)
|
||||||
|
for i, q := range queueSetDigest.Queues {
|
||||||
|
tabPrint(tabWriter, rowForQueue(
|
||||||
|
plState.pl.Name, // 1
|
||||||
|
i, // 2
|
||||||
|
len(q.Requests), // 3
|
||||||
|
q.ExecutingRequests, // 4
|
||||||
|
q.VirtualStart, // 5
|
||||||
|
))
|
||||||
|
endline(tabWriter)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
runtime.HandleError(tabWriter.Flush())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cfgCtl *configController) dumpRequests(w http.ResponseWriter, r *http.Request) {
|
||||||
|
cfgCtl.lock.Lock()
|
||||||
|
defer cfgCtl.lock.Unlock()
|
||||||
|
|
||||||
|
includeRequestDetails := len(r.URL.Query().Get(queryIncludeRequestDetails)) > 0
|
||||||
|
|
||||||
|
tabWriter := tabwriter.NewWriter(w, 8, 0, 1, ' ', 0)
|
||||||
|
tabPrint(tabWriter, rowForHeaders([]string{
|
||||||
|
"PriorityLevelName", // 1
|
||||||
|
"FlowSchemaName", // 2
|
||||||
|
"QueueIndex", // 3
|
||||||
|
"RequestIndexInQueue", // 4
|
||||||
|
"FlowDistingsher", // 5
|
||||||
|
"ArriveTime", // 6
|
||||||
|
}))
|
||||||
|
if includeRequestDetails {
|
||||||
|
tabPrint(tabWriter, rowForHeaders([]string{
|
||||||
|
"UserName", // 7
|
||||||
|
"Verb", // 8
|
||||||
|
"APIPath", // 9
|
||||||
|
"Namespace", // 10
|
||||||
|
"Name", // 11
|
||||||
|
"APIVersion", // 12
|
||||||
|
"Resource", // 13
|
||||||
|
"SubResource", // 14
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
endline(tabWriter)
|
||||||
|
for _, plState := range cfgCtl.priorityLevelStates {
|
||||||
|
if plState.queues == nil {
|
||||||
|
tabPrint(tabWriter, row(
|
||||||
|
plState.pl.Name, // 1
|
||||||
|
"<none>", // 2
|
||||||
|
"<none>", // 3
|
||||||
|
"<none>", // 4
|
||||||
|
"<none>", // 5
|
||||||
|
"<none>", // 6
|
||||||
|
))
|
||||||
|
if includeRequestDetails {
|
||||||
|
tabPrint(tabWriter, row(
|
||||||
|
"<none>", // 7
|
||||||
|
"<none>", // 8
|
||||||
|
"<none>", // 9
|
||||||
|
"<none>", // 10
|
||||||
|
"<none>", // 11
|
||||||
|
"<none>", // 12
|
||||||
|
"<none>", // 13
|
||||||
|
"<none>", // 14
|
||||||
|
))
|
||||||
|
}
|
||||||
|
endline(tabWriter)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
queueSetDigest := plState.queues.Dump(includeRequestDetails)
|
||||||
|
for iq, q := range queueSetDigest.Queues {
|
||||||
|
for ir, r := range q.Requests {
|
||||||
|
tabPrint(tabWriter, rowForRequest(
|
||||||
|
plState.pl.Name, // 1
|
||||||
|
r.MatchedFlowSchema, // 2
|
||||||
|
iq, // 3
|
||||||
|
ir, // 4
|
||||||
|
r.FlowDistinguisher, // 5
|
||||||
|
r.ArriveTime, // 6
|
||||||
|
))
|
||||||
|
if includeRequestDetails {
|
||||||
|
tabPrint(tabWriter, rowForRequestDetails(
|
||||||
|
r.UserName, // 7
|
||||||
|
r.RequestInfo.Verb, // 8
|
||||||
|
r.RequestInfo.Path, // 9
|
||||||
|
r.RequestInfo.Namespace, // 10
|
||||||
|
r.RequestInfo.Name, // 11
|
||||||
|
schema.GroupVersion{
|
||||||
|
Group: r.RequestInfo.APIGroup,
|
||||||
|
Version: r.RequestInfo.APIVersion,
|
||||||
|
}.String(), // 12
|
||||||
|
r.RequestInfo.Resource, // 13
|
||||||
|
r.RequestInfo.Subresource, // 14
|
||||||
|
))
|
||||||
|
}
|
||||||
|
endline(tabWriter)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
runtime.HandleError(tabWriter.Flush())
|
||||||
|
}
|
||||||
|
|
||||||
|
func tabPrint(w io.Writer, row string) {
|
||||||
|
_, err := fmt.Fprint(w, row)
|
||||||
|
runtime.HandleError(err)
|
||||||
|
}
|
||||||
|
func endline(w io.Writer) {
|
||||||
|
_, err := fmt.Fprint(w, "\n")
|
||||||
|
runtime.HandleError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func rowForHeaders(headers []string) string {
|
||||||
|
return row(headers...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func rowForPriorityLevel(plName string, activeQueues int, isIdle, isQuiescing bool, waitingRequests, executingRequests int) string {
|
||||||
|
return row(
|
||||||
|
plName,
|
||||||
|
strconv.Itoa(activeQueues),
|
||||||
|
strconv.FormatBool(isIdle),
|
||||||
|
strconv.FormatBool(isQuiescing),
|
||||||
|
strconv.Itoa(waitingRequests),
|
||||||
|
strconv.Itoa(executingRequests),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func rowForQueue(plName string, index, waitingRequests, executingRequests int, virtualStart float64) string {
|
||||||
|
return row(
|
||||||
|
plName,
|
||||||
|
strconv.Itoa(index),
|
||||||
|
strconv.Itoa(waitingRequests),
|
||||||
|
strconv.Itoa(executingRequests),
|
||||||
|
fmt.Sprintf("%.4f", virtualStart),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func rowForRequest(plName, fsName string, queueIndex, requestIndex int, flowDistinguisher string, arriveTime time.Time) string {
|
||||||
|
return row(
|
||||||
|
plName,
|
||||||
|
fsName,
|
||||||
|
strconv.Itoa(queueIndex),
|
||||||
|
strconv.Itoa(requestIndex),
|
||||||
|
flowDistinguisher,
|
||||||
|
arriveTime.UTC().Format(time.RFC3339Nano),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func rowForRequestDetails(username, verb, path, namespace, name, apiVersion, resource, subResource string) string {
|
||||||
|
return row(
|
||||||
|
username,
|
||||||
|
verb,
|
||||||
|
path,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func row(columns ...string) string {
|
||||||
|
return strings.Join(columns, ",\t") + ",\t"
|
||||||
|
}
|
@ -22,6 +22,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/util/clock"
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
|
"k8s.io/apiserver/pkg/server/mux"
|
||||||
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
|
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
|
||||||
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
||||||
fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset"
|
fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset"
|
||||||
@ -51,6 +52,9 @@ type Interface interface {
|
|||||||
// any needed changes to local behavior. This method ceases
|
// any needed changes to local behavior. This method ceases
|
||||||
// activity and returns after the given channel is closed.
|
// activity and returns after the given channel is closed.
|
||||||
Run(stopCh <-chan struct{}) error
|
Run(stopCh <-chan struct{}) error
|
||||||
|
|
||||||
|
// Install installs debugging endpoints to the web-server.
|
||||||
|
Install(c *mux.PathRecorderMux)
|
||||||
}
|
}
|
||||||
|
|
||||||
// This request filter implements https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md
|
// This request filter implements https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
fcv1a1 "k8s.io/api/flowcontrol/v1alpha1"
|
fcv1a1 "k8s.io/api/flowcontrol/v1alpha1"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
|
fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
|
||||||
|
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
|
||||||
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
||||||
fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
|
fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
@ -95,6 +96,10 @@ func (cqs *ctlTestQueueSet) BeginConfigChange(qc fq.QueuingConfig) (fq.QueueSetC
|
|||||||
return ctlTestQueueSetCompleter{cqs.cts, cqs, qc}, nil
|
return ctlTestQueueSetCompleter{cqs.cts, cqs, qc}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cqs *ctlTestQueueSet) Dump(bool) debug.QueueSetDump {
|
||||||
|
return debug.QueueSetDump{}
|
||||||
|
}
|
||||||
|
|
||||||
func (cqc ctlTestQueueSetCompleter) Complete(dc fq.DispatchingConfig) fq.QueueSet {
|
func (cqc ctlTestQueueSetCompleter) Complete(dc fq.DispatchingConfig) fq.QueueSet {
|
||||||
cqc.cts.lock.Lock()
|
cqc.cts.lock.Lock()
|
||||||
defer cqc.cts.lock.Unlock()
|
defer cqc.cts.lock.Unlock()
|
||||||
@ -115,7 +120,7 @@ func (cqs *ctlTestQueueSet) IsIdle() bool {
|
|||||||
return cqs.countActive == 0
|
return cqs.countActive == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cqs *ctlTestQueueSet) StartRequest(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) (req fq.Request, idle bool) {
|
func (cqs *ctlTestQueueSet) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) (req fq.Request, idle bool) {
|
||||||
cqs.cts.lock.Lock()
|
cqs.cts.lock.Lock()
|
||||||
defer cqs.cts.lock.Unlock()
|
defer cqs.cts.lock.Unlock()
|
||||||
cqs.countActive++
|
cqs.countActive++
|
||||||
|
@ -0,0 +1,24 @@
|
|||||||
|
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||||
|
|
||||||
|
go_library(
|
||||||
|
name = "go_default_library",
|
||||||
|
srcs = ["dump.go"],
|
||||||
|
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/debug",
|
||||||
|
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/debug",
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
deps = ["//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library"],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "package-srcs",
|
||||||
|
srcs = glob(["**"]),
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:private"],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "all-srcs",
|
||||||
|
srcs = [":package-srcs"],
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
)
|
@ -0,0 +1,47 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2016 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package debug
|
||||||
|
|
||||||
|
import (
|
||||||
|
"k8s.io/apiserver/pkg/endpoints/request"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// QueueSetDump is an instant dump of queue-set.
|
||||||
|
type QueueSetDump struct {
|
||||||
|
Queues []QueueDump
|
||||||
|
Waiting int
|
||||||
|
Executing int
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueueDump is an instant dump of one queue in a queue-set.
|
||||||
|
type QueueDump struct {
|
||||||
|
Requests []RequestDump
|
||||||
|
VirtualStart float64
|
||||||
|
ExecutingRequests int
|
||||||
|
}
|
||||||
|
|
||||||
|
// RequestDump is an instant dump of one requests pending in the queue.
|
||||||
|
type RequestDump struct {
|
||||||
|
MatchedFlowSchema string
|
||||||
|
FlowDistinguisher string
|
||||||
|
ArriveTime time.Time
|
||||||
|
StartTime time.Time
|
||||||
|
// request details
|
||||||
|
UserName string
|
||||||
|
RequestInfo request.RequestInfo
|
||||||
|
}
|
@ -6,6 +6,7 @@ go_library(
|
|||||||
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing",
|
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing",
|
||||||
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing",
|
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing",
|
||||||
visibility = ["//visibility:public"],
|
visibility = ["//visibility:public"],
|
||||||
|
deps = ["//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug:go_default_library"],
|
||||||
)
|
)
|
||||||
|
|
||||||
filegroup(
|
filegroup(
|
||||||
|
@ -19,6 +19,8 @@ package fairqueuing
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
|
||||||
)
|
)
|
||||||
|
|
||||||
// QueueSetFactory is used to create QueueSet objects. Creation, like
|
// QueueSetFactory is used to create QueueSet objects. Creation, like
|
||||||
@ -77,7 +79,13 @@ type QueueSet interface {
|
|||||||
// was idle at the moment of the return. Otherwise idle==false
|
// was idle at the moment of the return. Otherwise idle==false
|
||||||
// and the client must call the Finish method of the Request
|
// and the client must call the Finish method of the Request
|
||||||
// exactly once.
|
// exactly once.
|
||||||
StartRequest(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) (req Request, idle bool)
|
StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) (req Request, idle bool)
|
||||||
|
|
||||||
|
// Dump saves and returns the instant internal state of the queue-set.
|
||||||
|
// Note that dumping process will stop the queue-set from proceeding
|
||||||
|
// any requests.
|
||||||
|
// For debugging only.
|
||||||
|
Dump(includeRequestDetails bool) debug.QueueSetDump
|
||||||
}
|
}
|
||||||
|
|
||||||
// Request represents the remainder of the handling of one request
|
// Request represents the remainder of the handling of one request
|
||||||
|
@ -13,7 +13,9 @@ go_library(
|
|||||||
deps = [
|
deps = [
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise:go_default_library",
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/util/clock"
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
"k8s.io/apimachinery/pkg/util/runtime"
|
"k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
|
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
|
||||||
|
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
|
||||||
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
||||||
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise"
|
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise"
|
||||||
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||||
@ -221,7 +222,7 @@ const (
|
|||||||
// executing at each point where there is a change in that quantity,
|
// executing at each point where there is a change in that quantity,
|
||||||
// because the metrics --- and only the metrics --- track that
|
// because the metrics --- and only the metrics --- track that
|
||||||
// quantity per FlowSchema.
|
// quantity per FlowSchema.
|
||||||
func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) (fq.Request, bool) {
|
func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) (fq.Request, bool) {
|
||||||
qs.lockAndSyncTime()
|
qs.lockAndSyncTime()
|
||||||
defer qs.lock.Unlock()
|
defer qs.lock.Unlock()
|
||||||
var req *request
|
var req *request
|
||||||
@ -235,7 +236,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, fsName s
|
|||||||
metrics.AddReject(qs.qCfg.Name, fsName, "concurrency-limit")
|
metrics.AddReject(qs.qCfg.Name, fsName, "concurrency-limit")
|
||||||
return nil, qs.isIdleLocked()
|
return nil, qs.isIdleLocked()
|
||||||
}
|
}
|
||||||
req = qs.dispatchSansQueueLocked(ctx, fsName, descr1, descr2)
|
req = qs.dispatchSansQueueLocked(ctx, flowDistinguisher, fsName, descr1, descr2)
|
||||||
return req, false
|
return req, false
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -246,7 +247,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, fsName s
|
|||||||
// 3) Reject current request if there is not enough concurrency shares and
|
// 3) Reject current request if there is not enough concurrency shares and
|
||||||
// we are at max queue length
|
// we are at max queue length
|
||||||
// 4) If not rejected, create a request and enqueue
|
// 4) If not rejected, create a request and enqueue
|
||||||
req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, hashValue, fsName, descr1, descr2)
|
req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, hashValue, flowDistinguisher, fsName, descr1, descr2)
|
||||||
// req == nil means that the request was rejected - no remaining
|
// req == nil means that the request was rejected - no remaining
|
||||||
// concurrency shares and at max queue length already
|
// concurrency shares and at max queue length already
|
||||||
if req == nil {
|
if req == nil {
|
||||||
@ -398,7 +399,7 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 {
|
|||||||
// returns the enqueud request on a successful enqueue
|
// returns the enqueud request on a successful enqueue
|
||||||
// returns nil in the case that there is no available concurrency or
|
// returns nil in the case that there is no available concurrency or
|
||||||
// the queuelengthlimit has been reached
|
// the queuelengthlimit has been reached
|
||||||
func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) *request {
|
func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request {
|
||||||
// Start with the shuffle sharding, to pick a queue.
|
// Start with the shuffle sharding, to pick a queue.
|
||||||
queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2)
|
queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2)
|
||||||
queue := qs.queues[queueIdx]
|
queue := qs.queues[queueIdx]
|
||||||
@ -412,6 +413,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
|
|||||||
req := &request{
|
req := &request{
|
||||||
qs: qs,
|
qs: qs,
|
||||||
fsName: fsName,
|
fsName: fsName,
|
||||||
|
flowDistinguisher: flowDistinguisher,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter),
|
decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter),
|
||||||
arrivalTime: qs.clock.Now(),
|
arrivalTime: qs.clock.Now(),
|
||||||
@ -523,11 +525,12 @@ func (qs *queueSet) dispatchAsMuchAsPossibleLocked() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, fsName string, descr1, descr2 interface{}) *request {
|
func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request {
|
||||||
now := qs.clock.Now()
|
now := qs.clock.Now()
|
||||||
req := &request{
|
req := &request{
|
||||||
qs: qs,
|
qs: qs,
|
||||||
fsName: fsName,
|
fsName: fsName,
|
||||||
|
flowDistinguisher: flowDistinguisher,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
startTime: now,
|
startTime: now,
|
||||||
decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter),
|
decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter),
|
||||||
@ -709,3 +712,17 @@ func (qs *queueSet) preCreateOrUnblockGoroutine() {
|
|||||||
func (qs *queueSet) goroutineDoneOrBlocked() {
|
func (qs *queueSet) goroutineDoneOrBlocked() {
|
||||||
qs.counter.Add(-1)
|
qs.counter.Add(-1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump {
|
||||||
|
qs.lock.Lock()
|
||||||
|
defer qs.lock.Unlock()
|
||||||
|
d := debug.QueueSetDump{
|
||||||
|
Queues: make([]debug.QueueDump, len(qs.queues)),
|
||||||
|
Waiting: qs.totRequestsWaiting,
|
||||||
|
Executing: qs.totRequestsExecuting,
|
||||||
|
}
|
||||||
|
for i, q := range qs.queues {
|
||||||
|
d.Queues[i] = q.dump(includeRequestDetails)
|
||||||
|
}
|
||||||
|
return d
|
||||||
|
}
|
||||||
|
@ -79,7 +79,7 @@ func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet,
|
|||||||
go func(i, j int, uc uniformClient, igr test.Integrator) {
|
go func(i, j int, uc uniformClient, igr test.Integrator) {
|
||||||
for k := 0; k < uc.nCalls; k++ {
|
for k := 0; k < uc.nCalls; k++ {
|
||||||
ClockWait(clk, counter, uc.thinkDuration)
|
ClockWait(clk, counter, uc.thinkDuration)
|
||||||
req, idle := qs.StartRequest(context.Background(), uc.hash, fsName, name, []int{i, j, k})
|
req, idle := qs.StartRequest(context.Background(), uc.hash, "", fsName, name, []int{i, j, k})
|
||||||
t.Logf("%s: %d, %d, %d got req=%p, idle=%v", clk.Now().Format(nsTimeFmt), i, j, k, req, idle)
|
t.Logf("%s: %d, %d, %d got req=%p, idle=%v", clk.Now().Format(nsTimeFmt), i, j, k, req, idle)
|
||||||
if req == nil {
|
if req == nil {
|
||||||
atomic.AddUint64(&failedCount, 1)
|
atomic.AddUint64(&failedCount, 1)
|
||||||
@ -346,7 +346,7 @@ func TestContextCancel(t *testing.T) {
|
|||||||
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1})
|
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1})
|
||||||
counter.Add(1) // account for the goroutine running this test
|
counter.Add(1) // account for the goroutine running this test
|
||||||
ctx1 := context.Background()
|
ctx1 := context.Background()
|
||||||
req1, _ := qs.StartRequest(ctx1, 1, "fs1", "test", "one")
|
req1, _ := qs.StartRequest(ctx1, 1, "", "fs1", "test", "one")
|
||||||
if req1 == nil {
|
if req1 == nil {
|
||||||
t.Error("Request rejected")
|
t.Error("Request rejected")
|
||||||
return
|
return
|
||||||
@ -362,7 +362,7 @@ func TestContextCancel(t *testing.T) {
|
|||||||
counter.Add(1)
|
counter.Add(1)
|
||||||
cancel2()
|
cancel2()
|
||||||
}()
|
}()
|
||||||
req2, idle2a := qs.StartRequest(ctx2, 2, "fs2", "test", "two")
|
req2, idle2a := qs.StartRequest(ctx2, 2, "", "fs2", "test", "two")
|
||||||
if idle2a {
|
if idle2a {
|
||||||
t.Error("2nd StartRequest returned idle")
|
t.Error("2nd StartRequest returned idle")
|
||||||
}
|
}
|
||||||
|
@ -20,16 +20,21 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
genericrequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||||
|
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
|
||||||
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise"
|
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise"
|
||||||
)
|
)
|
||||||
|
|
||||||
// request is a temporary container for "requests" with additional
|
// request is a temporary container for "requests" with additional
|
||||||
// tracking fields required for the functionality FQScheduler
|
// tracking fields required for the functionality FQScheduler
|
||||||
type request struct {
|
type request struct {
|
||||||
qs *queueSet
|
|
||||||
fsName string
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
|
||||||
|
qs *queueSet
|
||||||
|
|
||||||
|
flowDistinguisher string
|
||||||
|
fsName string
|
||||||
|
|
||||||
// The relevant queue. Is nil if this request did not go through
|
// The relevant queue. Is nil if this request did not go through
|
||||||
// a queue.
|
// a queue.
|
||||||
queue *queue
|
queue *queue
|
||||||
@ -94,3 +99,27 @@ func (q *queue) GetVirtualFinish(J int, G float64) float64 {
|
|||||||
jg := float64(J+1) * float64(G)
|
jg := float64(J+1) * float64(G)
|
||||||
return jg + q.virtualStart
|
return jg + q.virtualStart
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (q *queue) dump(includeDetails bool) debug.QueueDump {
|
||||||
|
digest := make([]debug.RequestDump, len(q.requests))
|
||||||
|
for i, r := range q.requests {
|
||||||
|
// dump requests.
|
||||||
|
digest[i].MatchedFlowSchema = r.fsName
|
||||||
|
digest[i].FlowDistinguisher = r.flowDistinguisher
|
||||||
|
digest[i].ArriveTime = r.arrivalTime
|
||||||
|
digest[i].StartTime = r.startTime
|
||||||
|
if includeDetails {
|
||||||
|
userInfo, _ := genericrequest.UserFrom(r.ctx)
|
||||||
|
digest[i].UserName = userInfo.GetName()
|
||||||
|
requestInfo, ok := genericrequest.RequestInfoFrom(r.ctx)
|
||||||
|
if ok {
|
||||||
|
digest[i].RequestInfo = *requestInfo
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return debug.QueueDump{
|
||||||
|
VirtualStart: q.virtualStart,
|
||||||
|
Requests: digest,
|
||||||
|
ExecutingRequests: q.requestsExecuting,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -11,6 +11,7 @@ go_library(
|
|||||||
visibility = ["//visibility:public"],
|
visibility = ["//visibility:public"],
|
||||||
deps = [
|
deps = [
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -19,6 +19,7 @@ package testing
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
|
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
|
||||||
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -53,10 +54,14 @@ func (noRestraint) IsIdle() bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (noRestraint) StartRequest(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) (fq.Request, bool) {
|
func (noRestraint) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) (fq.Request, bool) {
|
||||||
return noRestraintRequest{}, false
|
return noRestraintRequest{}, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (noRestraint) Dump(bool) debug.QueueSetDump {
|
||||||
|
return debug.QueueSetDump{}
|
||||||
|
}
|
||||||
|
|
||||||
func (noRestraintRequest) Finish(execute func()) (idle bool) {
|
func (noRestraintRequest) Finish(execute func()) (idle bool) {
|
||||||
execute()
|
execute()
|
||||||
return false
|
return false
|
||||||
|
1
vendor/modules.txt
vendored
1
vendor/modules.txt
vendored
@ -1364,6 +1364,7 @@ k8s.io/apiserver/pkg/util/dryrun
|
|||||||
k8s.io/apiserver/pkg/util/feature
|
k8s.io/apiserver/pkg/util/feature
|
||||||
k8s.io/apiserver/pkg/util/flowcontrol
|
k8s.io/apiserver/pkg/util/flowcontrol
|
||||||
k8s.io/apiserver/pkg/util/flowcontrol/counter
|
k8s.io/apiserver/pkg/util/flowcontrol/counter
|
||||||
|
k8s.io/apiserver/pkg/util/flowcontrol/debug
|
||||||
k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing
|
k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing
|
||||||
k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise
|
k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise
|
||||||
k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise
|
k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise
|
||||||
|
Loading…
Reference in New Issue
Block a user