Introduce more metrics on concurrency

Introduce min, average, and standard deviation for the number of
executing mutating and readOnly requests.

Introduce min, max, average, and standard deviation for the number
waiting and number waiting per priority level.

Later:

Revised to use a series of windows

Use three individuals instead of array of powers

Later:

Add coarse queue count metrics, removed windowed avg and stddev

Add metrics for number of queued mutating and readOnly requests,
to complement metrics for number executing.

Later:

Removed windowed average and standard deviation because consumers can
derive such from integrals of consumer's chosen window.

Also replaced "requestKind" Prometheus label with "request_kind".

Later:

Revised to focus on sampling

Make the clock intrinsic to a TimedObserver

... so that the clock can be read while holding the observer's lock;
otherwise, forward progress is not guaranteed (and violations were
observed in testing).

Bug fixes and histogram buckets revision

SetX1 to 1 when queue length limit is zero, beause dividing by zero is nasty.

Remove obsolete argument in gen_test.go.

Add a bucket boundary at 0 for sample-and-water-mark histograms, to
distinguish zeroes from non-zeros.

This includes adding Integrator test.

Simplified test code.

More pervasively used "ctlr" instead of "ctl" as abbreviation for
"controller".
This commit is contained in:
Mike Spreitzer 2020-05-17 01:02:25 -04:00
parent 30b0ebd6d4
commit 57ecea2229
25 changed files with 909 additions and 336 deletions

View File

@ -122,7 +122,7 @@ var (
Help: "Number of requests dropped with 'Try again later' response", Help: "Number of requests dropped with 'Try again later' response",
StabilityLevel: compbasemetrics.ALPHA, StabilityLevel: compbasemetrics.ALPHA,
}, },
[]string{"requestKind"}, []string{"request_kind"},
) )
// TLSHandshakeErrors is a number of requests dropped with 'TLS handshake error from' error // TLSHandshakeErrors is a number of requests dropped with 'TLS handshake error from' error
TLSHandshakeErrors = compbasemetrics.NewCounter( TLSHandshakeErrors = compbasemetrics.NewCounter(
@ -166,7 +166,15 @@ var (
Help: "Maximal number of currently used inflight request limit of this apiserver per request kind in last second.", Help: "Maximal number of currently used inflight request limit of this apiserver per request kind in last second.",
StabilityLevel: compbasemetrics.ALPHA, StabilityLevel: compbasemetrics.ALPHA,
}, },
[]string{"requestKind"}, []string{"request_kind"},
)
currentInqueueRequests = compbasemetrics.NewGaugeVec(
&compbasemetrics.GaugeOpts{
Name: "apiserver_current_inqueue_requests",
Help: "Maximal number of queued requests in this apiserver per request kind in last second.",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"request_kind"},
) )
requestTerminationsTotal = compbasemetrics.NewCounterVec( requestTerminationsTotal = compbasemetrics.NewCounterVec(
@ -191,6 +199,7 @@ var (
WatchEvents, WatchEvents,
WatchEventsSizes, WatchEventsSizes,
currentInflightRequests, currentInflightRequests,
currentInqueueRequests,
requestTerminationsTotal, requestTerminationsTotal,
} }
@ -231,6 +240,11 @@ const (
ReadOnlyKind = "readOnly" ReadOnlyKind = "readOnly"
// MutatingKind is a string identifying mutating request kind // MutatingKind is a string identifying mutating request kind
MutatingKind = "mutating" MutatingKind = "mutating"
// WaitingPhase is the phase value for a request waiting in a queue
WaitingPhase = "waiting"
// ExecutingPhase is the phase value for an executing request
ExecutingPhase = "executing"
) )
const ( const (
@ -261,9 +275,19 @@ func Reset() {
} }
} }
func UpdateInflightRequestMetrics(nonmutating, mutating int) { // UpdateInflightRequestMetrics reports concurrency metrics classified by
currentInflightRequests.WithLabelValues(ReadOnlyKind).Set(float64(nonmutating)) // mutating vs Readonly.
currentInflightRequests.WithLabelValues(MutatingKind).Set(float64(mutating)) func UpdateInflightRequestMetrics(phase string, nonmutating, mutating int) {
for _, kc := range []struct {
kind string
count int
}{{ReadOnlyKind, nonmutating}, {MutatingKind, mutating}} {
if phase == ExecutingPhase {
currentInflightRequests.WithLabelValues(kc.kind).Set(float64(kc.count))
} else {
currentInqueueRequests.WithLabelValues(kc.kind).Set(float64(kc.count))
}
}
} }
// RecordRequestTermination records that the request was terminated early as part of a resource // RecordRequestTermination records that the request was terminated early as part of a resource

View File

@ -62,6 +62,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/httplog:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/httplog:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library",
], ],

View File

@ -27,6 +27,7 @@ import (
"k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/metrics" "k8s.io/apiserver/pkg/endpoints/metrics"
apirequest "k8s.io/apiserver/pkg/endpoints/request" apirequest "k8s.io/apiserver/pkg/endpoints/request"
fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
@ -50,13 +51,17 @@ func handleError(w http.ResponseWriter, r *http.Request, err error) {
klog.Errorf(err.Error()) klog.Errorf(err.Error())
} }
// requestWatermark is used to trak maximal usage of inflight requests. // requestWatermark is used to track maximal numbers of requests in a particular phase of handling
type requestWatermark struct { type requestWatermark struct {
phase string
readOnlyObserver, mutatingObserver fcmetrics.TimedObserver
lock sync.Mutex lock sync.Mutex
readOnlyWatermark, mutatingWatermark int readOnlyWatermark, mutatingWatermark int
} }
func (w *requestWatermark) recordMutating(mutatingVal int) { func (w *requestWatermark) recordMutating(mutatingVal int) {
w.mutatingObserver.Set(float64(mutatingVal))
w.lock.Lock() w.lock.Lock()
defer w.lock.Unlock() defer w.lock.Unlock()
@ -66,6 +71,8 @@ func (w *requestWatermark) recordMutating(mutatingVal int) {
} }
func (w *requestWatermark) recordReadOnly(readOnlyVal int) { func (w *requestWatermark) recordReadOnly(readOnlyVal int) {
w.readOnlyObserver.Set(float64(readOnlyVal))
w.lock.Lock() w.lock.Lock()
defer w.lock.Unlock() defer w.lock.Unlock()
@ -74,9 +81,14 @@ func (w *requestWatermark) recordReadOnly(readOnlyVal int) {
} }
} }
var watermark = &requestWatermark{} // watermark tracks requests being executed (not waiting in a queue)
var watermark = &requestWatermark{
phase: metrics.ExecutingPhase,
readOnlyObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{metrics.ReadOnlyKind}).RequestsExecuting,
mutatingObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{metrics.MutatingKind}).RequestsExecuting,
}
func startRecordingUsage() { func startRecordingUsage(watermark *requestWatermark) {
go func() { go func() {
wait.Forever(func() { wait.Forever(func() {
watermark.lock.Lock() watermark.lock.Lock()
@ -86,7 +98,7 @@ func startRecordingUsage() {
watermark.mutatingWatermark = 0 watermark.mutatingWatermark = 0
watermark.lock.Unlock() watermark.lock.Unlock()
metrics.UpdateInflightRequestMetrics(readOnlyWatermark, mutatingWatermark) metrics.UpdateInflightRequestMetrics(watermark.phase, readOnlyWatermark, mutatingWatermark)
}, inflightUsageMetricUpdatePeriod) }, inflightUsageMetricUpdatePeriod)
}() }()
} }
@ -100,7 +112,7 @@ func WithMaxInFlightLimit(
mutatingLimit int, mutatingLimit int,
longRunningRequestCheck apirequest.LongRunningRequestCheck, longRunningRequestCheck apirequest.LongRunningRequestCheck,
) http.Handler { ) http.Handler {
startOnce.Do(startRecordingUsage) startOnce.Do(func() { startRecordingUsage(watermark) })
if nonMutatingLimit == 0 && mutatingLimit == 0 { if nonMutatingLimit == 0 && mutatingLimit == 0 {
return handler return handler
} }
@ -108,9 +120,11 @@ func WithMaxInFlightLimit(
var mutatingChan chan bool var mutatingChan chan bool
if nonMutatingLimit != 0 { if nonMutatingLimit != 0 {
nonMutatingChan = make(chan bool, nonMutatingLimit) nonMutatingChan = make(chan bool, nonMutatingLimit)
watermark.readOnlyObserver.SetX1(float64(nonMutatingLimit))
} }
if mutatingLimit != 0 { if mutatingLimit != 0 {
mutatingChan = make(chan bool, mutatingLimit) mutatingChan = make(chan bool, mutatingLimit)
watermark.mutatingObserver.SetX1(float64(mutatingLimit))
} }
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@ -141,21 +155,22 @@ func WithMaxInFlightLimit(
select { select {
case c <- true: case c <- true:
var mutatingLen, readOnlyLen int // We note the concurrency level both while the
// request is being served and after it is done being
// served, because both states contribute to the
// sampled stats on concurrency.
if isMutatingRequest { if isMutatingRequest {
mutatingLen = len(mutatingChan) watermark.recordMutating(len(c))
} else { } else {
readOnlyLen = len(nonMutatingChan) watermark.recordReadOnly(len(c))
} }
defer func() { defer func() {
<-c <-c
if isMutatingRequest { if isMutatingRequest {
watermark.recordMutating(mutatingLen) watermark.recordMutating(len(c))
} else { } else {
watermark.recordReadOnly(readOnlyLen) watermark.recordReadOnly(len(c))
} }
}() }()
handler.ServeHTTP(w, r) handler.ServeHTTP(w, r)

View File

@ -24,8 +24,10 @@ import (
fcv1a1 "k8s.io/api/flowcontrol/v1alpha1" fcv1a1 "k8s.io/api/flowcontrol/v1alpha1"
apitypes "k8s.io/apimachinery/pkg/types" apitypes "k8s.io/apimachinery/pkg/types"
epmetrics "k8s.io/apiserver/pkg/endpoints/metrics"
apirequest "k8s.io/apiserver/pkg/endpoints/request" apirequest "k8s.io/apiserver/pkg/endpoints/request"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
@ -53,7 +55,15 @@ func GetClassification(ctx context.Context) *PriorityAndFairnessClassification {
return ctx.Value(priorityAndFairnessKey).(*PriorityAndFairnessClassification) return ctx.Value(priorityAndFairnessKey).(*PriorityAndFairnessClassification)
} }
var atomicMutatingLen, atomicNonMutatingLen int32 // waitingMark tracks requests waiting rather than being executed
var waitingMark = &requestWatermark{
phase: epmetrics.WaitingPhase,
readOnlyObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{epmetrics.ReadOnlyKind}).RequestsWaiting,
mutatingObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{epmetrics.MutatingKind}).RequestsWaiting,
}
var atomicMutatingExecuting, atomicReadOnlyExecuting int32
var atomicMutatingWaiting, atomicReadOnlyWaiting int32
// WithPriorityAndFairness limits the number of in-flight // WithPriorityAndFairness limits the number of in-flight
// requests in a fine-grained way. // requests in a fine-grained way.
@ -66,7 +76,10 @@ func WithPriorityAndFairness(
klog.Warningf("priority and fairness support not found, skipping") klog.Warningf("priority and fairness support not found, skipping")
return handler return handler
} }
startOnce.Do(startRecordingUsage) startOnce.Do(func() {
startRecordingUsage(watermark)
startRecordingUsage(waitingMark)
})
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context() ctx := r.Context()
requestInfo, ok := apirequest.RequestInfoFrom(ctx) requestInfo, ok := apirequest.RequestInfoFrom(ctx)
@ -98,22 +111,23 @@ func WithPriorityAndFairness(
var served bool var served bool
isMutatingRequest := !nonMutatingRequestVerbs.Has(requestInfo.Verb) isMutatingRequest := !nonMutatingRequestVerbs.Has(requestInfo.Verb)
execute := func() { noteExecutingDelta := func(delta int32) {
var mutatingLen, readOnlyLen int
if isMutatingRequest { if isMutatingRequest {
mutatingLen = int(atomic.AddInt32(&atomicMutatingLen, 1)) watermark.recordMutating(int(atomic.AddInt32(&atomicMutatingExecuting, delta)))
} else { } else {
readOnlyLen = int(atomic.AddInt32(&atomicNonMutatingLen, 1)) watermark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyExecuting, delta)))
} }
defer func() { }
if isMutatingRequest { noteWaitingDelta := func(delta int32) {
atomic.AddInt32(&atomicMutatingLen, -11) if isMutatingRequest {
watermark.recordMutating(mutatingLen) waitingMark.recordMutating(int(atomic.AddInt32(&atomicMutatingWaiting, delta)))
} else { } else {
atomic.AddInt32(&atomicNonMutatingLen, -1) waitingMark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyWaiting, delta)))
watermark.recordReadOnly(readOnlyLen) }
} }
}() execute := func() {
noteExecutingDelta(1)
defer noteExecutingDelta(-1)
served = true served = true
innerCtx := context.WithValue(ctx, priorityAndFairnessKey, classification) innerCtx := context.WithValue(ctx, priorityAndFairnessKey, classification)
innerReq := r.Clone(innerCtx) innerReq := r.Clone(innerCtx)
@ -122,10 +136,15 @@ func WithPriorityAndFairness(
handler.ServeHTTP(w, innerReq) handler.ServeHTTP(w, innerReq)
} }
digest := utilflowcontrol.RequestDigest{requestInfo, user} digest := utilflowcontrol.RequestDigest{requestInfo, user}
fcIfc.Handle(ctx, digest, note, execute) fcIfc.Handle(ctx, digest, note, func(inQueue bool) {
if inQueue {
noteWaitingDelta(1)
} else {
noteWaitingDelta(-1)
}
}, execute)
if !served { if !served {
tooManyRequests(r, w) tooManyRequests(r, w)
return
} }
}) })

View File

@ -85,6 +85,7 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/format:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/format:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/flowcontrol/v1alpha1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/flowcontrol/v1alpha1:go_default_library",

View File

@ -84,7 +84,8 @@ type RequestDigest struct {
// this type and cfgMeal follow the convention that the suffix // this type and cfgMeal follow the convention that the suffix
// "Locked" means that the caller must hold the configController lock. // "Locked" means that the caller must hold the configController lock.
type configController struct { type configController struct {
queueSetFactory fq.QueueSetFactory queueSetFactory fq.QueueSetFactory
obsPairGenerator metrics.TimedObserverPairGenerator
// configQueue holds `(interface{})(0)` when the configuration // configQueue holds `(interface{})(0)` when the configuration
// objects need to be reprocessed. // objects need to be reprocessed.
@ -144,6 +145,9 @@ type priorityLevelState struct {
// number of goroutines between Controller::Match and calling the // number of goroutines between Controller::Match and calling the
// returned StartFunction // returned StartFunction
numPending int numPending int
// Observers tracking number waiting, executing
obsPair metrics.TimedObserverPair
} }
// NewTestableController is extra flexible to facilitate testing // NewTestableController is extra flexible to facilitate testing
@ -152,104 +156,106 @@ func newTestableController(
flowcontrolClient fcclientv1a1.FlowcontrolV1alpha1Interface, flowcontrolClient fcclientv1a1.FlowcontrolV1alpha1Interface,
serverConcurrencyLimit int, serverConcurrencyLimit int,
requestWaitLimit time.Duration, requestWaitLimit time.Duration,
obsPairGenerator metrics.TimedObserverPairGenerator,
queueSetFactory fq.QueueSetFactory, queueSetFactory fq.QueueSetFactory,
) *configController { ) *configController {
cfgCtl := &configController{ cfgCtlr := &configController{
queueSetFactory: queueSetFactory, queueSetFactory: queueSetFactory,
obsPairGenerator: obsPairGenerator,
serverConcurrencyLimit: serverConcurrencyLimit, serverConcurrencyLimit: serverConcurrencyLimit,
requestWaitLimit: requestWaitLimit, requestWaitLimit: requestWaitLimit,
flowcontrolClient: flowcontrolClient, flowcontrolClient: flowcontrolClient,
priorityLevelStates: make(map[string]*priorityLevelState), priorityLevelStates: make(map[string]*priorityLevelState),
} }
klog.V(2).Infof("NewTestableController with serverConcurrencyLimit=%d, requestWaitLimit=%s", serverConcurrencyLimit, requestWaitLimit) klog.V(2).Infof("NewTestableController with serverConcurrencyLimit=%d, requestWaitLimit=%s", serverConcurrencyLimit, requestWaitLimit)
cfgCtl.initializeConfigController(informerFactory) cfgCtlr.initializeConfigController(informerFactory)
// ensure the data structure reflects the mandatory config // ensure the data structure reflects the mandatory config
cfgCtl.lockAndDigestConfigObjects(nil, nil) cfgCtlr.lockAndDigestConfigObjects(nil, nil)
return cfgCtl return cfgCtlr
} }
// initializeConfigController sets up the controller that processes // initializeConfigController sets up the controller that processes
// config API objects. // config API objects.
func (cfgCtl *configController) initializeConfigController(informerFactory kubeinformers.SharedInformerFactory) { func (cfgCtlr *configController) initializeConfigController(informerFactory kubeinformers.SharedInformerFactory) {
cfgCtl.configQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 8*time.Hour), "priority_and_fairness_config_queue") cfgCtlr.configQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 8*time.Hour), "priority_and_fairness_config_queue")
fci := informerFactory.Flowcontrol().V1alpha1() fci := informerFactory.Flowcontrol().V1alpha1()
pli := fci.PriorityLevelConfigurations() pli := fci.PriorityLevelConfigurations()
fsi := fci.FlowSchemas() fsi := fci.FlowSchemas()
cfgCtl.plLister = pli.Lister() cfgCtlr.plLister = pli.Lister()
cfgCtl.plInformerSynced = pli.Informer().HasSynced cfgCtlr.plInformerSynced = pli.Informer().HasSynced
cfgCtl.fsLister = fsi.Lister() cfgCtlr.fsLister = fsi.Lister()
cfgCtl.fsInformerSynced = fsi.Informer().HasSynced cfgCtlr.fsInformerSynced = fsi.Informer().HasSynced
pli.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ pli.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { AddFunc: func(obj interface{}) {
pl := obj.(*fctypesv1a1.PriorityLevelConfiguration) pl := obj.(*fctypesv1a1.PriorityLevelConfiguration)
klog.V(7).Infof("Triggered API priority and fairness config reloading due to creation of PLC %s", pl.Name) klog.V(7).Infof("Triggered API priority and fairness config reloading due to creation of PLC %s", pl.Name)
cfgCtl.configQueue.Add(0) cfgCtlr.configQueue.Add(0)
}, },
UpdateFunc: func(oldObj, newObj interface{}) { UpdateFunc: func(oldObj, newObj interface{}) {
newPL := newObj.(*fctypesv1a1.PriorityLevelConfiguration) newPL := newObj.(*fctypesv1a1.PriorityLevelConfiguration)
oldPL := oldObj.(*fctypesv1a1.PriorityLevelConfiguration) oldPL := oldObj.(*fctypesv1a1.PriorityLevelConfiguration)
if !apiequality.Semantic.DeepEqual(oldPL.Spec, newPL.Spec) { if !apiequality.Semantic.DeepEqual(oldPL.Spec, newPL.Spec) {
klog.V(7).Infof("Triggered API priority and fairness config reloading due to spec update of PLC %s", newPL.Name) klog.V(7).Infof("Triggered API priority and fairness config reloading due to spec update of PLC %s", newPL.Name)
cfgCtl.configQueue.Add(0) cfgCtlr.configQueue.Add(0)
} }
}, },
DeleteFunc: func(obj interface{}) { DeleteFunc: func(obj interface{}) {
name, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) name, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
klog.V(7).Infof("Triggered API priority and fairness config reloading due to deletion of PLC %s", name) klog.V(7).Infof("Triggered API priority and fairness config reloading due to deletion of PLC %s", name)
cfgCtl.configQueue.Add(0) cfgCtlr.configQueue.Add(0)
}}) }})
fsi.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ fsi.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { AddFunc: func(obj interface{}) {
fs := obj.(*fctypesv1a1.FlowSchema) fs := obj.(*fctypesv1a1.FlowSchema)
klog.V(7).Infof("Triggered API priority and fairness config reloading due to creation of FS %s", fs.Name) klog.V(7).Infof("Triggered API priority and fairness config reloading due to creation of FS %s", fs.Name)
cfgCtl.configQueue.Add(0) cfgCtlr.configQueue.Add(0)
}, },
UpdateFunc: func(oldObj, newObj interface{}) { UpdateFunc: func(oldObj, newObj interface{}) {
newFS := newObj.(*fctypesv1a1.FlowSchema) newFS := newObj.(*fctypesv1a1.FlowSchema)
oldFS := oldObj.(*fctypesv1a1.FlowSchema) oldFS := oldObj.(*fctypesv1a1.FlowSchema)
if !apiequality.Semantic.DeepEqual(oldFS.Spec, newFS.Spec) { if !apiequality.Semantic.DeepEqual(oldFS.Spec, newFS.Spec) {
klog.V(7).Infof("Triggered API priority and fairness config reloading due to spec update of FS %s", newFS.Name) klog.V(7).Infof("Triggered API priority and fairness config reloading due to spec update of FS %s", newFS.Name)
cfgCtl.configQueue.Add(0) cfgCtlr.configQueue.Add(0)
} }
}, },
DeleteFunc: func(obj interface{}) { DeleteFunc: func(obj interface{}) {
name, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) name, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
klog.V(7).Infof("Triggered API priority and fairness config reloading due to deletion of FS %s", name) klog.V(7).Infof("Triggered API priority and fairness config reloading due to deletion of FS %s", name)
cfgCtl.configQueue.Add(0) cfgCtlr.configQueue.Add(0)
}}) }})
} }
func (cfgCtl *configController) Run(stopCh <-chan struct{}) error { func (cfgCtlr *configController) Run(stopCh <-chan struct{}) error {
defer cfgCtl.configQueue.ShutDown() defer cfgCtlr.configQueue.ShutDown()
klog.Info("Starting API Priority and Fairness config controller") klog.Info("Starting API Priority and Fairness config controller")
if ok := cache.WaitForCacheSync(stopCh, cfgCtl.plInformerSynced, cfgCtl.fsInformerSynced); !ok { if ok := cache.WaitForCacheSync(stopCh, cfgCtlr.plInformerSynced, cfgCtlr.fsInformerSynced); !ok {
return fmt.Errorf("Never achieved initial sync") return fmt.Errorf("Never achieved initial sync")
} }
klog.Info("Running API Priority and Fairness config worker") klog.Info("Running API Priority and Fairness config worker")
wait.Until(cfgCtl.runWorker, time.Second, stopCh) wait.Until(cfgCtlr.runWorker, time.Second, stopCh)
klog.Info("Shutting down API Priority and Fairness config worker") klog.Info("Shutting down API Priority and Fairness config worker")
return nil return nil
} }
func (cfgCtl *configController) runWorker() { func (cfgCtlr *configController) runWorker() {
for cfgCtl.processNextWorkItem() { for cfgCtlr.processNextWorkItem() {
} }
} }
func (cfgCtl *configController) processNextWorkItem() bool { func (cfgCtlr *configController) processNextWorkItem() bool {
obj, shutdown := cfgCtl.configQueue.Get() obj, shutdown := cfgCtlr.configQueue.Get()
if shutdown { if shutdown {
return false return false
} }
func(obj interface{}) { func(obj interface{}) {
defer cfgCtl.configQueue.Done(obj) defer cfgCtlr.configQueue.Done(obj)
if !cfgCtl.syncOne() { if !cfgCtlr.syncOne() {
cfgCtl.configQueue.AddRateLimited(obj) cfgCtlr.configQueue.AddRateLimited(obj)
} else { } else {
cfgCtl.configQueue.Forget(obj) cfgCtlr.configQueue.Forget(obj)
} }
}(obj) }(obj)
@ -259,19 +265,19 @@ func (cfgCtl *configController) processNextWorkItem() bool {
// syncOne attempts to sync all the API Priority and Fairness config // syncOne attempts to sync all the API Priority and Fairness config
// objects. It either succeeds and returns `true` or logs an error // objects. It either succeeds and returns `true` or logs an error
// and returns `false`. // and returns `false`.
func (cfgCtl *configController) syncOne() bool { func (cfgCtlr *configController) syncOne() bool {
all := labels.Everything() all := labels.Everything()
newPLs, err := cfgCtl.plLister.List(all) newPLs, err := cfgCtlr.plLister.List(all)
if err != nil { if err != nil {
klog.Errorf("Unable to list PriorityLevelConfiguration objects: %s", err.Error()) klog.Errorf("Unable to list PriorityLevelConfiguration objects: %s", err.Error())
return false return false
} }
newFSs, err := cfgCtl.fsLister.List(all) newFSs, err := cfgCtlr.fsLister.List(all)
if err != nil { if err != nil {
klog.Errorf("Unable to list FlowSchema objects: %s", err.Error()) klog.Errorf("Unable to list FlowSchema objects: %s", err.Error())
return false return false
} }
err = cfgCtl.digestConfigObjects(newPLs, newFSs) err = cfgCtlr.digestConfigObjects(newPLs, newFSs)
if err == nil { if err == nil {
return true return true
} }
@ -288,7 +294,7 @@ func (cfgCtl *configController) syncOne() bool {
// FlowSchemas --- with the work dvided among the passes according to // FlowSchemas --- with the work dvided among the passes according to
// those dependencies. // those dependencies.
type cfgMeal struct { type cfgMeal struct {
cfgCtl *configController cfgCtlr *configController
newPLStates map[string]*priorityLevelState newPLStates map[string]*priorityLevelState
@ -315,9 +321,9 @@ type fsStatusUpdate struct {
} }
// digestConfigObjects is given all the API objects that configure // digestConfigObjects is given all the API objects that configure
// cfgCtl and writes its consequent new configState. // cfgCtlr and writes its consequent new configState.
func (cfgCtl *configController) digestConfigObjects(newPLs []*fctypesv1a1.PriorityLevelConfiguration, newFSs []*fctypesv1a1.FlowSchema) error { func (cfgCtlr *configController) digestConfigObjects(newPLs []*fctypesv1a1.PriorityLevelConfiguration, newFSs []*fctypesv1a1.FlowSchema) error {
fsStatusUpdates := cfgCtl.lockAndDigestConfigObjects(newPLs, newFSs) fsStatusUpdates := cfgCtlr.lockAndDigestConfigObjects(newPLs, newFSs)
var errs []error var errs []error
for _, fsu := range fsStatusUpdates { for _, fsu := range fsStatusUpdates {
enc, err := json.Marshal(fsu.condition) enc, err := json.Marshal(fsu.condition)
@ -326,7 +332,7 @@ func (cfgCtl *configController) digestConfigObjects(newPLs []*fctypesv1a1.Priori
panic(fmt.Sprintf("Failed to json.Marshall(%#+v): %s", fsu.condition, err.Error())) panic(fmt.Sprintf("Failed to json.Marshall(%#+v): %s", fsu.condition, err.Error()))
} }
klog.V(4).Infof("Writing Condition %s to FlowSchema %s because its previous value was %s", string(enc), fsu.flowSchema.Name, fcfmt.Fmt(fsu.oldValue)) klog.V(4).Infof("Writing Condition %s to FlowSchema %s because its previous value was %s", string(enc), fsu.flowSchema.Name, fcfmt.Fmt(fsu.oldValue))
_, err = cfgCtl.flowcontrolClient.FlowSchemas().Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, []byte(fmt.Sprintf(`{"status": {"conditions": [ %s ] } }`, string(enc))), metav1.PatchOptions{FieldManager: "api-priority-and-fairness-config-consumer-v1"}, "status") _, err = cfgCtlr.flowcontrolClient.FlowSchemas().Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, []byte(fmt.Sprintf(`{"status": {"conditions": [ %s ] } }`, string(enc))), metav1.PatchOptions{FieldManager: "api-priority-and-fairness-config-consumer-v1"}, "status")
if err != nil { if err != nil {
errs = append(errs, errors.Wrap(err, fmt.Sprintf("failed to set a status.condition for FlowSchema %s", fsu.flowSchema.Name))) errs = append(errs, errors.Wrap(err, fmt.Sprintf("failed to set a status.condition for FlowSchema %s", fsu.flowSchema.Name)))
} }
@ -337,11 +343,11 @@ func (cfgCtl *configController) digestConfigObjects(newPLs []*fctypesv1a1.Priori
return apierrors.NewAggregate(errs) return apierrors.NewAggregate(errs)
} }
func (cfgCtl *configController) lockAndDigestConfigObjects(newPLs []*fctypesv1a1.PriorityLevelConfiguration, newFSs []*fctypesv1a1.FlowSchema) []fsStatusUpdate { func (cfgCtlr *configController) lockAndDigestConfigObjects(newPLs []*fctypesv1a1.PriorityLevelConfiguration, newFSs []*fctypesv1a1.FlowSchema) []fsStatusUpdate {
cfgCtl.lock.Lock() cfgCtlr.lock.Lock()
defer cfgCtl.lock.Unlock() defer cfgCtlr.lock.Unlock()
meal := cfgMeal{ meal := cfgMeal{
cfgCtl: cfgCtl, cfgCtlr: cfgCtlr,
newPLStates: make(map[string]*priorityLevelState), newPLStates: make(map[string]*priorityLevelState),
} }
@ -351,16 +357,16 @@ func (cfgCtl *configController) lockAndDigestConfigObjects(newPLs []*fctypesv1a1
// Supply missing mandatory PriorityLevelConfiguration objects // Supply missing mandatory PriorityLevelConfiguration objects
if !meal.haveExemptPL { if !meal.haveExemptPL {
meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationExempt, cfgCtl.requestWaitLimit) meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationExempt, cfgCtlr.requestWaitLimit)
} }
if !meal.haveCatchAllPL { if !meal.haveCatchAllPL {
meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationCatchAll, cfgCtl.requestWaitLimit) meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationCatchAll, cfgCtlr.requestWaitLimit)
} }
meal.finishQueueSetReconfigsLocked() meal.finishQueueSetReconfigsLocked()
// The new config has been constructed // The new config has been constructed
cfgCtl.priorityLevelStates = meal.newPLStates cfgCtlr.priorityLevelStates = meal.newPLStates
klog.V(5).Infof("Switched to new API Priority and Fairness configuration") klog.V(5).Infof("Switched to new API Priority and Fairness configuration")
return meal.fsStatusUpdates return meal.fsStatusUpdates
} }
@ -369,11 +375,11 @@ func (cfgCtl *configController) lockAndDigestConfigObjects(newPLs []*fctypesv1a1
// Pretend broken ones do not exist. // Pretend broken ones do not exist.
func (meal *cfgMeal) digestNewPLsLocked(newPLs []*fctypesv1a1.PriorityLevelConfiguration) { func (meal *cfgMeal) digestNewPLsLocked(newPLs []*fctypesv1a1.PriorityLevelConfiguration) {
for _, pl := range newPLs { for _, pl := range newPLs {
state := meal.cfgCtl.priorityLevelStates[pl.Name] state := meal.cfgCtlr.priorityLevelStates[pl.Name]
if state == nil { if state == nil {
state = &priorityLevelState{} state = &priorityLevelState{obsPair: meal.cfgCtlr.obsPairGenerator.Generate(1, 1, []string{pl.Name})}
} }
qsCompleter, err := qscOfPL(meal.cfgCtl.queueSetFactory, state.queues, pl, meal.cfgCtl.requestWaitLimit) qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.obsPair)
if err != nil { if err != nil {
klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err) klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err)
continue continue
@ -439,7 +445,7 @@ func (meal *cfgMeal) digestFlowSchemasLocked(newFSs []*fctypesv1a1.FlowSchema) {
fsSeq = append(fsSeq, fcboot.MandatoryFlowSchemaCatchAll) fsSeq = append(fsSeq, fcboot.MandatoryFlowSchemaCatchAll)
} }
meal.cfgCtl.flowSchemas = fsSeq meal.cfgCtlr.flowSchemas = fsSeq
if klog.V(5).Enabled() { if klog.V(5).Enabled() {
for _, fs := range fsSeq { for _, fs := range fsSeq {
klog.Infof("Using FlowSchema %s", fcfmt.Fmt(fs)) klog.Infof("Using FlowSchema %s", fcfmt.Fmt(fs))
@ -453,7 +459,7 @@ func (meal *cfgMeal) digestFlowSchemasLocked(newFSs []*fctypesv1a1.FlowSchema) {
// queues, otherwise start the quiescing process if that has not // queues, otherwise start the quiescing process if that has not
// already been started. // already been started.
func (meal *cfgMeal) processOldPLsLocked() { func (meal *cfgMeal) processOldPLsLocked() {
for plName, plState := range meal.cfgCtl.priorityLevelStates { for plName, plState := range meal.cfgCtlr.priorityLevelStates {
if meal.newPLStates[plName] != nil { if meal.newPLStates[plName] != nil {
// Still desired and already updated // Still desired and already updated
continue continue
@ -476,9 +482,9 @@ func (meal *cfgMeal) processOldPLsLocked() {
} }
} }
var err error var err error
plState.qsCompleter, err = qscOfPL(meal.cfgCtl.queueSetFactory, plState.queues, plState.pl, meal.cfgCtl.requestWaitLimit) plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.obsPair)
if err != nil { if err != nil {
// This can not happen because qscOfPL already approved this config // This can not happen because queueSetCompleterForPL already approved this config
panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec))) panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec)))
} }
if plState.pl.Spec.Limited != nil { if plState.pl.Spec.Limited != nil {
@ -509,7 +515,7 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
// The use of math.Ceil here means that the results might sum // The use of math.Ceil here means that the results might sum
// to a little more than serverConcurrencyLimit but the // to a little more than serverConcurrencyLimit but the
// difference will be negligible. // difference will be negligible.
concurrencyLimit := int(math.Ceil(float64(meal.cfgCtl.serverConcurrencyLimit) * float64(plState.pl.Spec.Limited.AssuredConcurrencyShares) / meal.shareSum)) concurrencyLimit := int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit) * float64(plState.pl.Spec.Limited.AssuredConcurrencyShares) / meal.shareSum))
metrics.UpdateSharedConcurrencyLimit(plName, concurrencyLimit) metrics.UpdateSharedConcurrencyLimit(plName, concurrencyLimit)
if plState.queues == nil { if plState.queues == nil {
@ -521,10 +527,11 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
} }
} }
// qscOfPL returns a pointer to an appropriate QueuingConfig or nil // queueSetCompleterForPL returns an appropriate QueueSetCompleter for the
// if no limiting is called for. Returns nil and an error if the given // given priority level configuration. Returns nil if that config
// does not call for limiting. Returns nil and an error if the given
// object is malformed in a way that is a problem for this package. // object is malformed in a way that is a problem for this package.
func qscOfPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *fctypesv1a1.PriorityLevelConfiguration, requestWaitLimit time.Duration) (fq.QueueSetCompleter, error) { func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *fctypesv1a1.PriorityLevelConfiguration, requestWaitLimit time.Duration, intPair metrics.TimedObserverPair) (fq.QueueSetCompleter, error) {
if (pl.Spec.Type == fctypesv1a1.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) { if (pl.Spec.Type == fctypesv1a1.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) {
return nil, errors.New("broken union structure at the top") return nil, errors.New("broken union structure at the top")
} }
@ -553,7 +560,7 @@ func qscOfPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *fctypesv1a1.Priorit
if queues != nil { if queues != nil {
qsc, err = queues.BeginConfigChange(qcQS) qsc, err = queues.BeginConfigChange(qcQS)
} else { } else {
qsc, err = qsf.BeginConstruction(qcQS) qsc, err = qsf.BeginConstruction(qcQS, intPair)
} }
if err != nil { if err != nil {
err = errors.Wrap(err, fmt.Sprintf("priority level %q has QueuingConfiguration %#+v, which is invalid", pl.Name, qcAPI)) err = errors.Wrap(err, fmt.Sprintf("priority level %q has QueuingConfiguration %#+v, which is invalid", pl.Name, qcAPI))
@ -594,9 +601,11 @@ func (meal *cfgMeal) presyncFlowSchemaStatus(fs *fctypesv1a1.FlowSchema, isDangl
} }
// imaginePL adds a priority level based on one of the mandatory ones // imaginePL adds a priority level based on one of the mandatory ones
// that does not actually exist (right now) as a real API object.
func (meal *cfgMeal) imaginePL(proto *fctypesv1a1.PriorityLevelConfiguration, requestWaitLimit time.Duration) { func (meal *cfgMeal) imaginePL(proto *fctypesv1a1.PriorityLevelConfiguration, requestWaitLimit time.Duration) {
klog.V(3).Infof("No %s PriorityLevelConfiguration found, imagining one", proto.Name) klog.V(3).Infof("No %s PriorityLevelConfiguration found, imagining one", proto.Name)
qsCompleter, err := qscOfPL(meal.cfgCtl.queueSetFactory, nil, proto, requestWaitLimit) obsPair := meal.cfgCtlr.obsPairGenerator.Generate(1, 1, []string{proto.Name})
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, obsPair)
if err != nil { if err != nil {
// This can not happen because proto is one of the mandatory // This can not happen because proto is one of the mandatory
// objects and these are not erroneous // objects and these are not erroneous
@ -605,6 +614,7 @@ func (meal *cfgMeal) imaginePL(proto *fctypesv1a1.PriorityLevelConfiguration, re
meal.newPLStates[proto.Name] = &priorityLevelState{ meal.newPLStates[proto.Name] = &priorityLevelState{
pl: proto, pl: proto,
qsCompleter: qsCompleter, qsCompleter: qsCompleter,
obsPair: obsPair,
} }
if proto.Spec.Limited != nil { if proto.Spec.Limited != nil {
meal.shareSum += float64(proto.Spec.Limited.AssuredConcurrencyShares) meal.shareSum += float64(proto.Spec.Limited.AssuredConcurrencyShares)
@ -624,14 +634,14 @@ func (immediateRequest) Finish(execute func()) bool {
// The returned bool indicates whether the request is exempt from // The returned bool indicates whether the request is exempt from
// limitation. The startWaitingTime is when the request started // limitation. The startWaitingTime is when the request started
// waiting in its queue, or `Time{}` if this did not happen. // waiting in its queue, or `Time{}` if this did not happen.
func (cfgCtl *configController) startRequest(ctx context.Context, rd RequestDigest) (fs *fctypesv1a1.FlowSchema, pl *fctypesv1a1.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) { func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, queueNoteFn fq.QueueNoteFn) (fs *fctypesv1a1.FlowSchema, pl *fctypesv1a1.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) {
klog.V(7).Infof("startRequest(%#+v)", rd) klog.V(7).Infof("startRequest(%#+v)", rd)
cfgCtl.lock.Lock() cfgCtlr.lock.Lock()
defer cfgCtl.lock.Unlock() defer cfgCtlr.lock.Unlock()
for _, fs := range cfgCtl.flowSchemas { for _, fs := range cfgCtlr.flowSchemas {
if matchesFlowSchema(rd, fs) { if matchesFlowSchema(rd, fs) {
plName := fs.Spec.PriorityLevelConfiguration.Name plName := fs.Spec.PriorityLevelConfiguration.Name
plState := cfgCtl.priorityLevelStates[plName] plState := cfgCtlr.priorityLevelStates[plName]
if plState.pl.Spec.Type == fctypesv1a1.PriorityLevelEnablementExempt { if plState.pl.Spec.Type == fctypesv1a1.PriorityLevelEnablementExempt {
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, fs.Name, fs.Spec.DistinguisherMethod, plName) klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, fs.Name, fs.Spec.DistinguisherMethod, plName)
return fs, plState.pl, true, immediateRequest{}, time.Time{} return fs, plState.pl, true, immediateRequest{}, time.Time{}
@ -649,9 +659,9 @@ func (cfgCtl *configController) startRequest(ctx context.Context, rd RequestDige
} }
startWaitingTime = time.Now() startWaitingTime = time.Now()
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, fs.Name, fs.Spec.DistinguisherMethod, plName, numQueues) klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, fs.Name, fs.Spec.DistinguisherMethod, plName, numQueues)
req, idle := plState.queues.StartRequest(ctx, hashValue, flowDistinguisher, fs.Name, rd.RequestInfo, rd.User) req, idle := plState.queues.StartRequest(ctx, hashValue, flowDistinguisher, fs.Name, rd.RequestInfo, rd.User, queueNoteFn)
if idle { if idle {
cfgCtl.maybeReapLocked(plName, plState) cfgCtlr.maybeReapLocked(plName, plState)
} }
return fs, plState.pl, false, req, startWaitingTime return fs, plState.pl, false, req, startWaitingTime
} }
@ -660,7 +670,7 @@ func (cfgCtl *configController) startRequest(ctx context.Context, rd RequestDige
// FlowSchema that matches everything. If somehow control reaches // FlowSchema that matches everything. If somehow control reaches
// here, panic with some relevant information. // here, panic with some relevant information.
var catchAll *fctypesv1a1.FlowSchema var catchAll *fctypesv1a1.FlowSchema
for _, fs := range cfgCtl.flowSchemas { for _, fs := range cfgCtlr.flowSchemas {
if fs.Name == fctypesv1a1.FlowSchemaNameCatchAll { if fs.Name == fctypesv1a1.FlowSchemaNameCatchAll {
catchAll = fs catchAll = fs
} }
@ -669,10 +679,10 @@ func (cfgCtl *configController) startRequest(ctx context.Context, rd RequestDige
} }
// Call this after getting a clue that the given priority level is undesired and idle // Call this after getting a clue that the given priority level is undesired and idle
func (cfgCtl *configController) maybeReap(plName string) { func (cfgCtlr *configController) maybeReap(plName string) {
cfgCtl.lock.Lock() cfgCtlr.lock.Lock()
defer cfgCtl.lock.Unlock() defer cfgCtlr.lock.Unlock()
plState := cfgCtl.priorityLevelStates[plName] plState := cfgCtlr.priorityLevelStates[plName]
if plState == nil { if plState == nil {
klog.V(7).Infof("plName=%s, plState==nil", plName) klog.V(7).Infof("plName=%s, plState==nil", plName)
return return
@ -685,17 +695,17 @@ func (cfgCtl *configController) maybeReap(plName string) {
} }
} }
klog.V(3).Infof("Triggered API priority and fairness config reloading because priority level %s is undesired and idle", plName) klog.V(3).Infof("Triggered API priority and fairness config reloading because priority level %s is undesired and idle", plName)
cfgCtl.configQueue.Add(0) cfgCtlr.configQueue.Add(0)
} }
// Call this if both (1) plState.queues is non-nil and reported being // Call this if both (1) plState.queues is non-nil and reported being
// idle, and (2) cfgCtl's lock has not been released since then. // idle, and (2) cfgCtlr's lock has not been released since then.
func (cfgCtl *configController) maybeReapLocked(plName string, plState *priorityLevelState) { func (cfgCtlr *configController) maybeReapLocked(plName string, plState *priorityLevelState) {
if !(plState.quiescing && plState.numPending == 0) { if !(plState.quiescing && plState.numPending == 0) {
return return
} }
klog.V(3).Infof("Triggered API priority and fairness config reloading because priority level %s is undesired and idle", plName) klog.V(3).Infof("Triggered API priority and fairness config reloading because priority level %s is undesired and idle", plName)
cfgCtl.configQueue.Add(0) cfgCtlr.configQueue.Add(0)
} }
// computeFlowDistinguisher extracts the flow distinguisher according to the given method // computeFlowDistinguisher extracts the flow distinguisher according to the given method

View File

@ -34,20 +34,20 @@ const (
queryIncludeRequestDetails = "includeRequestDetails" queryIncludeRequestDetails = "includeRequestDetails"
) )
func (cfgCtl *configController) Install(c *mux.PathRecorderMux) { func (cfgCtlr *configController) Install(c *mux.PathRecorderMux) {
// TODO(yue9944882): handle "Accept" header properly // TODO(yue9944882): handle "Accept" header properly
// debugging dumps a CSV content for three levels of granularity // debugging dumps a CSV content for three levels of granularity
// 1. row per priority-level // 1. row per priority-level
c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_priority_levels", cfgCtl.dumpPriorityLevels) c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_priority_levels", cfgCtlr.dumpPriorityLevels)
// 2. row per queue // 2. row per queue
c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_queues", cfgCtl.dumpQueues) c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_queues", cfgCtlr.dumpQueues)
// 3. row per request // 3. row per request
c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_requests", cfgCtl.dumpRequests) c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_requests", cfgCtlr.dumpRequests)
} }
func (cfgCtl *configController) dumpPriorityLevels(w http.ResponseWriter, r *http.Request) { func (cfgCtlr *configController) dumpPriorityLevels(w http.ResponseWriter, r *http.Request) {
cfgCtl.lock.Lock() cfgCtlr.lock.Lock()
defer cfgCtl.lock.Unlock() defer cfgCtlr.lock.Unlock()
tabWriter := tabwriter.NewWriter(w, 8, 0, 1, ' ', 0) tabWriter := tabwriter.NewWriter(w, 8, 0, 1, ' ', 0)
columnHeaders := []string{ columnHeaders := []string{
"PriorityLevelName", // 1 "PriorityLevelName", // 1
@ -59,7 +59,7 @@ func (cfgCtl *configController) dumpPriorityLevels(w http.ResponseWriter, r *htt
} }
tabPrint(tabWriter, rowForHeaders(columnHeaders)) tabPrint(tabWriter, rowForHeaders(columnHeaders))
endline(tabWriter) endline(tabWriter)
for _, plState := range cfgCtl.priorityLevelStates { for _, plState := range cfgCtlr.priorityLevelStates {
if plState.queues == nil { if plState.queues == nil {
tabPrint(tabWriter, row( tabPrint(tabWriter, row(
plState.pl.Name, // 1 plState.pl.Name, // 1
@ -93,9 +93,9 @@ func (cfgCtl *configController) dumpPriorityLevels(w http.ResponseWriter, r *htt
runtime.HandleError(tabWriter.Flush()) runtime.HandleError(tabWriter.Flush())
} }
func (cfgCtl *configController) dumpQueues(w http.ResponseWriter, r *http.Request) { func (cfgCtlr *configController) dumpQueues(w http.ResponseWriter, r *http.Request) {
cfgCtl.lock.Lock() cfgCtlr.lock.Lock()
defer cfgCtl.lock.Unlock() defer cfgCtlr.lock.Unlock()
tabWriter := tabwriter.NewWriter(w, 8, 0, 1, ' ', 0) tabWriter := tabwriter.NewWriter(w, 8, 0, 1, ' ', 0)
columnHeaders := []string{ columnHeaders := []string{
"PriorityLevelName", // 1 "PriorityLevelName", // 1
@ -106,7 +106,7 @@ func (cfgCtl *configController) dumpQueues(w http.ResponseWriter, r *http.Reques
} }
tabPrint(tabWriter, rowForHeaders(columnHeaders)) tabPrint(tabWriter, rowForHeaders(columnHeaders))
endline(tabWriter) endline(tabWriter)
for _, plState := range cfgCtl.priorityLevelStates { for _, plState := range cfgCtlr.priorityLevelStates {
if plState.queues == nil { if plState.queues == nil {
tabPrint(tabWriter, row( tabPrint(tabWriter, row(
plState.pl.Name, // 1 plState.pl.Name, // 1
@ -133,9 +133,9 @@ func (cfgCtl *configController) dumpQueues(w http.ResponseWriter, r *http.Reques
runtime.HandleError(tabWriter.Flush()) runtime.HandleError(tabWriter.Flush())
} }
func (cfgCtl *configController) dumpRequests(w http.ResponseWriter, r *http.Request) { func (cfgCtlr *configController) dumpRequests(w http.ResponseWriter, r *http.Request) {
cfgCtl.lock.Lock() cfgCtlr.lock.Lock()
defer cfgCtl.lock.Unlock() defer cfgCtlr.lock.Unlock()
includeRequestDetails := len(r.URL.Query().Get(queryIncludeRequestDetails)) > 0 includeRequestDetails := len(r.URL.Query().Get(queryIncludeRequestDetails)) > 0
@ -161,7 +161,7 @@ func (cfgCtl *configController) dumpRequests(w http.ResponseWriter, r *http.Requ
})) }))
} }
endline(tabWriter) endline(tabWriter)
for _, plState := range cfgCtl.priorityLevelStates { for _, plState := range cfgCtlr.priorityLevelStates {
if plState.queues == nil { if plState.queues == nil {
tabPrint(tabWriter, row( tabPrint(tabWriter, row(
plState.pl.Name, // 1 plState.pl.Name, // 1

View File

@ -45,6 +45,7 @@ type Interface interface {
Handle(ctx context.Context, Handle(ctx context.Context,
requestDigest RequestDigest, requestDigest RequestDigest,
noteFn func(fs *fctypesv1a1.FlowSchema, pl *fctypesv1a1.PriorityLevelConfiguration), noteFn func(fs *fctypesv1a1.FlowSchema, pl *fctypesv1a1.PriorityLevelConfiguration),
queueNoteFn fq.QueueNoteFn,
execFn func(), execFn func(),
) )
@ -72,6 +73,7 @@ func New(
flowcontrolClient, flowcontrolClient,
serverConcurrencyLimit, serverConcurrencyLimit,
requestWaitLimit, requestWaitLimit,
metrics.PriorityLevelConcurrencyObserverPairGenerator,
fqs.NewQueueSetFactory(&clock.RealClock{}, grc), fqs.NewQueueSetFactory(&clock.RealClock{}, grc),
) )
} }
@ -82,15 +84,17 @@ func NewTestable(
flowcontrolClient fcclientv1a1.FlowcontrolV1alpha1Interface, flowcontrolClient fcclientv1a1.FlowcontrolV1alpha1Interface,
serverConcurrencyLimit int, serverConcurrencyLimit int,
requestWaitLimit time.Duration, requestWaitLimit time.Duration,
obsPairGenerator metrics.TimedObserverPairGenerator,
queueSetFactory fq.QueueSetFactory, queueSetFactory fq.QueueSetFactory,
) Interface { ) Interface {
return newTestableController(informerFactory, flowcontrolClient, serverConcurrencyLimit, requestWaitLimit, queueSetFactory) return newTestableController(informerFactory, flowcontrolClient, serverConcurrencyLimit, requestWaitLimit, obsPairGenerator, queueSetFactory)
} }
func (cfgCtl *configController) Handle(ctx context.Context, requestDigest RequestDigest, func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest RequestDigest,
noteFn func(fs *fctypesv1a1.FlowSchema, pl *fctypesv1a1.PriorityLevelConfiguration), noteFn func(fs *fctypesv1a1.FlowSchema, pl *fctypesv1a1.PriorityLevelConfiguration),
queueNoteFn fq.QueueNoteFn,
execFn func()) { execFn func()) {
fs, pl, isExempt, req, startWaitingTime := cfgCtl.startRequest(ctx, requestDigest) fs, pl, isExempt, req, startWaitingTime := cfgCtlr.startRequest(ctx, requestDigest, queueNoteFn)
queued := startWaitingTime != time.Time{} queued := startWaitingTime != time.Time{}
noteFn(fs, pl) noteFn(fs, pl)
if req == nil { if req == nil {
@ -117,6 +121,6 @@ func (cfgCtl *configController) Handle(ctx context.Context, requestDigest Reques
} }
klog.V(7).Infof("Handle(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, isExempt=%v, queued=%v, Finish() => idle=%v", requestDigest, fs.Name, fs.Spec.DistinguisherMethod, pl.Name, isExempt, queued, idle) klog.V(7).Infof("Handle(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, isExempt=%v, queued=%v, Finish() => idle=%v", requestDigest, fs.Name, fs.Spec.DistinguisherMethod, pl.Name, isExempt, queued, idle)
if idle { if idle {
cfgCtl.maybeReap(pl.Name) cfgCtlr.maybeReap(pl.Name)
} }
} }

View File

@ -31,6 +31,7 @@ import (
"k8s.io/apiserver/pkg/util/flowcontrol/debug" "k8s.io/apiserver/pkg/util/flowcontrol/debug"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format" fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake" clientsetfake "k8s.io/client-go/kubernetes/fake"
fcclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1alpha1" fcclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1alpha1"
@ -50,16 +51,16 @@ var mandPLs = func() map[string]*fcv1a1.PriorityLevelConfiguration {
return ans return ans
}() }()
type ctlTestState struct { type ctlrTestState struct {
t *testing.T t *testing.T
cfgCtl *configController cfgCtlr *configController
fcIfc fcclient.FlowcontrolV1alpha1Interface fcIfc fcclient.FlowcontrolV1alpha1Interface
existingPLs map[string]*fcv1a1.PriorityLevelConfiguration existingPLs map[string]*fcv1a1.PriorityLevelConfiguration
existingFSs map[string]*fcv1a1.FlowSchema existingFSs map[string]*fcv1a1.FlowSchema
heldRequestsMap map[string][]heldRequest heldRequestsMap map[string][]heldRequest
requestWG sync.WaitGroup requestWG sync.WaitGroup
lock sync.Mutex lock sync.Mutex
queues map[string]*ctlTestQueueSet queues map[string]*ctlrTestQueueSet
} }
type heldRequest struct { type heldRequest struct {
@ -67,45 +68,45 @@ type heldRequest struct {
finishCh chan struct{} finishCh chan struct{}
} }
var _ fq.QueueSetFactory = (*ctlTestState)(nil) var _ fq.QueueSetFactory = (*ctlrTestState)(nil)
type ctlTestQueueSetCompleter struct { type ctlrTestQueueSetCompleter struct {
cts *ctlTestState cts *ctlrTestState
cqs *ctlTestQueueSet cqs *ctlrTestQueueSet
qc fq.QueuingConfig qc fq.QueuingConfig
} }
type ctlTestQueueSet struct { type ctlrTestQueueSet struct {
cts *ctlTestState cts *ctlrTestState
qc fq.QueuingConfig qc fq.QueuingConfig
dc fq.DispatchingConfig dc fq.DispatchingConfig
countActive int countActive int
} }
type ctlTestRequest struct { type ctlrTestRequest struct {
cqs *ctlTestQueueSet cqs *ctlrTestQueueSet
qsName string qsName string
descr1, descr2 interface{} descr1, descr2 interface{}
} }
func (cts *ctlTestState) BeginConstruction(qc fq.QueuingConfig) (fq.QueueSetCompleter, error) { func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, ip metrics.TimedObserverPair) (fq.QueueSetCompleter, error) {
return ctlTestQueueSetCompleter{cts, nil, qc}, nil return ctlrTestQueueSetCompleter{cts, nil, qc}, nil
} }
func (cqs *ctlTestQueueSet) BeginConfigChange(qc fq.QueuingConfig) (fq.QueueSetCompleter, error) { func (cqs *ctlrTestQueueSet) BeginConfigChange(qc fq.QueuingConfig) (fq.QueueSetCompleter, error) {
return ctlTestQueueSetCompleter{cqs.cts, cqs, qc}, nil return ctlrTestQueueSetCompleter{cqs.cts, cqs, qc}, nil
} }
func (cqs *ctlTestQueueSet) Dump(bool) debug.QueueSetDump { func (cqs *ctlrTestQueueSet) Dump(bool) debug.QueueSetDump {
return debug.QueueSetDump{} return debug.QueueSetDump{}
} }
func (cqc ctlTestQueueSetCompleter) Complete(dc fq.DispatchingConfig) fq.QueueSet { func (cqc ctlrTestQueueSetCompleter) Complete(dc fq.DispatchingConfig) fq.QueueSet {
cqc.cts.lock.Lock() cqc.cts.lock.Lock()
defer cqc.cts.lock.Unlock() defer cqc.cts.lock.Unlock()
qs := cqc.cqs qs := cqc.cqs
if qs == nil { if qs == nil {
qs = &ctlTestQueueSet{cts: cqc.cts, qc: cqc.qc, dc: dc} qs = &ctlrTestQueueSet{cts: cqc.cts, qc: cqc.qc, dc: dc}
cqc.cts.queues[cqc.qc.Name] = qs cqc.cts.queues[cqc.qc.Name] = qs
} else { } else {
qs.qc, qs.dc = cqc.qc, dc qs.qc, qs.dc = cqc.qc, dc
@ -113,22 +114,22 @@ func (cqc ctlTestQueueSetCompleter) Complete(dc fq.DispatchingConfig) fq.QueueSe
return qs return qs
} }
func (cqs *ctlTestQueueSet) IsIdle() bool { func (cqs *ctlrTestQueueSet) IsIdle() bool {
cqs.cts.lock.Lock() cqs.cts.lock.Lock()
defer cqs.cts.lock.Unlock() defer cqs.cts.lock.Unlock()
klog.V(7).Infof("For %p QS %s, countActive==%d", cqs, cqs.qc.Name, cqs.countActive) klog.V(7).Infof("For %p QS %s, countActive==%d", cqs, cqs.qc.Name, cqs.countActive)
return cqs.countActive == 0 return cqs.countActive == 0
} }
func (cqs *ctlTestQueueSet) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) (req fq.Request, idle bool) { func (cqs *ctlrTestQueueSet) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (req fq.Request, idle bool) {
cqs.cts.lock.Lock() cqs.cts.lock.Lock()
defer cqs.cts.lock.Unlock() defer cqs.cts.lock.Unlock()
cqs.countActive++ cqs.countActive++
cqs.cts.t.Logf("Queued %q %#+v %#+v for %p QS=%s, countActive:=%d", fsName, descr1, descr2, cqs, cqs.qc.Name, cqs.countActive) cqs.cts.t.Logf("Queued %q %#+v %#+v for %p QS=%s, countActive:=%d", fsName, descr1, descr2, cqs, cqs.qc.Name, cqs.countActive)
return &ctlTestRequest{cqs, cqs.qc.Name, descr1, descr2}, false return &ctlrTestRequest{cqs, cqs.qc.Name, descr1, descr2}, false
} }
func (ctr *ctlTestRequest) Finish(execute func()) bool { func (ctr *ctlrTestRequest) Finish(execute func()) bool {
execute() execute()
ctr.cqs.cts.lock.Lock() ctr.cqs.cts.lock.Lock()
defer ctr.cqs.cts.lock.Unlock() defer ctr.cqs.cts.lock.Unlock()
@ -137,13 +138,13 @@ func (ctr *ctlTestRequest) Finish(execute func()) bool {
return ctr.cqs.countActive == 0 return ctr.cqs.countActive == 0
} }
func (cts *ctlTestState) getQueueSetNames() sets.String { func (cts *ctlrTestState) getQueueSetNames() sets.String {
cts.lock.Lock() cts.lock.Lock()
defer cts.lock.Unlock() defer cts.lock.Unlock()
return sets.StringKeySet(cts.queues) return sets.StringKeySet(cts.queues)
} }
func (cts *ctlTestState) getNonIdleQueueSetNames() sets.String { func (cts *ctlrTestState) getNonIdleQueueSetNames() sets.String {
cts.lock.Lock() cts.lock.Lock()
defer cts.lock.Unlock() defer cts.lock.Unlock()
ans := sets.NewString() ans := sets.NewString()
@ -155,14 +156,14 @@ func (cts *ctlTestState) getNonIdleQueueSetNames() sets.String {
return ans return ans
} }
func (cts *ctlTestState) hasNonIdleQueueSet(name string) bool { func (cts *ctlrTestState) hasNonIdleQueueSet(name string) bool {
cts.lock.Lock() cts.lock.Lock()
defer cts.lock.Unlock() defer cts.lock.Unlock()
qs := cts.queues[name] qs := cts.queues[name]
return qs != nil && qs.countActive > 0 return qs != nil && qs.countActive > 0
} }
func (cts *ctlTestState) addHeldRequest(plName string, rd RequestDigest, finishCh chan struct{}) { func (cts *ctlrTestState) addHeldRequest(plName string, rd RequestDigest, finishCh chan struct{}) {
cts.lock.Lock() cts.lock.Lock()
defer cts.lock.Unlock() defer cts.lock.Unlock()
hrs := cts.heldRequestsMap[plName] hrs := cts.heldRequestsMap[plName]
@ -171,7 +172,7 @@ func (cts *ctlTestState) addHeldRequest(plName string, rd RequestDigest, finishC
cts.t.Logf("Holding %#+v for %s, count:=%d", rd, plName, len(hrs)) cts.t.Logf("Holding %#+v for %s, count:=%d", rd, plName, len(hrs))
} }
func (cts *ctlTestState) popHeldRequest() (plName string, hr *heldRequest, nCount int) { func (cts *ctlrTestState) popHeldRequest() (plName string, hr *heldRequest, nCount int) {
cts.lock.Lock() cts.lock.Lock()
defer cts.lock.Unlock() defer cts.lock.Unlock()
var hrs []heldRequest var hrs []heldRequest
@ -219,21 +220,22 @@ func TestConfigConsumer(t *testing.T) {
clientset := clientsetfake.NewSimpleClientset() clientset := clientsetfake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(clientset, 0) informerFactory := informers.NewSharedInformerFactory(clientset, 0)
flowcontrolClient := clientset.FlowcontrolV1alpha1() flowcontrolClient := clientset.FlowcontrolV1alpha1()
cts := &ctlTestState{t: t, cts := &ctlrTestState{t: t,
fcIfc: flowcontrolClient, fcIfc: flowcontrolClient,
existingFSs: map[string]*fcv1a1.FlowSchema{}, existingFSs: map[string]*fcv1a1.FlowSchema{},
existingPLs: map[string]*fcv1a1.PriorityLevelConfiguration{}, existingPLs: map[string]*fcv1a1.PriorityLevelConfiguration{},
heldRequestsMap: map[string][]heldRequest{}, heldRequestsMap: map[string][]heldRequest{},
queues: map[string]*ctlTestQueueSet{}, queues: map[string]*ctlrTestQueueSet{},
} }
ctl := newTestableController( ctlr := newTestableController(
informerFactory, informerFactory,
flowcontrolClient, flowcontrolClient,
100, // server concurrency limit 100, // server concurrency limit
time.Minute, // request wait limit time.Minute, // request wait limit
metrics.PriorityLevelConcurrencyObserverPairGenerator,
cts, cts,
) )
cts.cfgCtl = ctl cts.cfgCtlr = ctlr
persistingPLNames := sets.NewString() persistingPLNames := sets.NewString()
trialStep := fmt.Sprintf("trial%d-0", i) trialStep := fmt.Sprintf("trial%d-0", i)
_, _, desiredPLNames, newBadPLNames := genPLs(rng, trialStep, persistingPLNames, 0) _, _, desiredPLNames, newBadPLNames := genPLs(rng, trialStep, persistingPLNames, 0)
@ -290,7 +292,7 @@ func TestConfigConsumer(t *testing.T) {
for _, newFS := range newFSs { for _, newFS := range newFSs {
t.Logf("For %s, digesting newFS=%s", trialStep, fcfmt.Fmt(newFS)) t.Logf("For %s, digesting newFS=%s", trialStep, fcfmt.Fmt(newFS))
} }
_ = ctl.lockAndDigestConfigObjects(newPLs, newFSs) _ = ctlr.lockAndDigestConfigObjects(newPLs, newFSs)
} }
for plName, hr, nCount := cts.popHeldRequest(); hr != nil; plName, hr, nCount = cts.popHeldRequest() { for plName, hr, nCount := cts.popHeldRequest(); hr != nil; plName, hr, nCount = cts.popHeldRequest() {
desired := desiredPLNames.Has(plName) || mandPLs[plName] != nil desired := desiredPLNames.Has(plName) || mandPLs[plName] != nil
@ -302,9 +304,9 @@ func TestConfigConsumer(t *testing.T) {
} }
} }
func checkNewFS(cts *ctlTestState, rng *rand.Rand, trialName string, ftr *fsTestingRecord, catchAlls map[bool]*fcv1a1.FlowSchema) { func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTestingRecord, catchAlls map[bool]*fcv1a1.FlowSchema) {
t := cts.t t := cts.t
ctl := cts.cfgCtl ctlr := cts.cfgCtlr
fs := ftr.fs fs := ftr.fs
expectedPLName := fs.Spec.PriorityLevelConfiguration.Name expectedPLName := fs.Spec.PriorityLevelConfiguration.Name
ctx := context.Background() ctx := context.Background()
@ -320,17 +322,18 @@ func checkNewFS(cts *ctlTestState, rng *rand.Rand, trialName string, ftr *fsTest
startWG.Add(1) startWG.Add(1)
go func(matches, isResource bool, rdu RequestDigest) { go func(matches, isResource bool, rdu RequestDigest) {
expectedMatch := matches && ftr.wellFormed && (fsPrecedes(fs, catchAlls[isResource]) || fs.Name == catchAlls[isResource].Name) expectedMatch := matches && ftr.wellFormed && (fsPrecedes(fs, catchAlls[isResource]) || fs.Name == catchAlls[isResource].Name)
ctl.Handle(ctx, rdu, func(matchFS *fcv1a1.FlowSchema, matchPL *fcv1a1.PriorityLevelConfiguration) { ctlr.Handle(ctx, rdu, func(matchFS *fcv1a1.FlowSchema, matchPL *fcv1a1.PriorityLevelConfiguration) {
matchIsExempt := matchPL.Spec.Type == fcv1a1.PriorityLevelEnablementExempt matchIsExempt := matchPL.Spec.Type == fcv1a1.PriorityLevelEnablementExempt
t.Logf("Considering FlowSchema %s, expectedMatch=%v, isResource=%v: Handle(%#+v) => note(fs=%s, pl=%s, isExempt=%v)", fs.Name, expectedMatch, isResource, rdu, matchFS.Name, matchPL.Name, matchIsExempt) t.Logf("Considering FlowSchema %s, expectedMatch=%v, isResource=%v: Handle(%#+v) => note(fs=%s, pl=%s, isExempt=%v)", fs.Name, expectedMatch, isResource, rdu, matchFS.Name, matchPL.Name, matchIsExempt)
if e, a := expectedMatch, matchFS.Name == fs.Name; e != a { if a := matchFS.Name == fs.Name; expectedMatch != a {
t.Errorf("Fail at %s/%s: rd=%#+v, expectedMatch=%v, actualMatch=%v, matchFSName=%q, catchAlls=%#+v", trialName, fs.Name, rdu, e, a, matchFS.Name, catchAlls) t.Errorf("Fail at %s/%s: rd=%#+v, expectedMatch=%v, actualMatch=%v, matchFSName=%q, catchAlls=%#+v", trialName, fs.Name, rdu, expectedMatch, a, matchFS.Name, catchAlls)
} }
if matchFS.Name == fs.Name { if matchFS.Name == fs.Name {
if e, a := fs.Spec.PriorityLevelConfiguration.Name, matchPL.Name; e != a { if fs.Spec.PriorityLevelConfiguration.Name != matchPL.Name {
t.Errorf("Fail at %s/%s: e=%v, a=%v", trialName, fs.Name, e, a) t.Errorf("Fail at %s/%s: expected=%v, actual=%v", trialName, fs.Name, fs.Spec.PriorityLevelConfiguration.Name, matchPL.Name)
} }
} }
}, func(inQueue bool) {
}, func() { }, func() {
startWG.Done() startWG.Done()
_ = <-finishCh _ = <-finishCh

View File

@ -1,12 +1,19 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library") load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = ["interface.go"], srcs = [
"integrator.go",
"interface.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing", importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing",
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing", importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing",
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = ["//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug:go_default_library"], deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library",
],
) )
filegroup( filegroup(
@ -27,3 +34,10 @@ filegroup(
tags = ["automanaged"], tags = ["automanaged"],
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
) )
go_test(
name = "go_default_test",
srcs = ["integrator_test.go"],
embed = [":go_default_library"],
deps = ["//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library"],
)

View File

@ -0,0 +1,180 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fairqueuing
import (
"math"
"sync"
"time"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
)
// Integrator computes the moments of some variable X over time as
// read from a particular clock. The integrals start when the
// Integrator is created, and ends at the latest operation on the
// Integrator. As a `metrics.TimedObserver` this fixes X1=1 and
// ignores attempts to change X1.
type Integrator interface {
metrics.TimedObserver
GetResults() IntegratorResults
// Return the results of integrating to now, and reset integration to start now
Reset() IntegratorResults
}
// IntegratorResults holds statistical abstracts of the integration
type IntegratorResults struct {
Duration float64 //seconds
Average float64 //time-weighted
Deviation float64 //standard deviation: sqrt(avg((value-avg)^2))
Min, Max float64
}
// Equal tests for semantic equality.
// This considers all NaN values to be equal to each other.
func (x *IntegratorResults) Equal(y *IntegratorResults) bool {
return x == y || x != nil && y != nil && x.Duration == y.Duration && x.Min == y.Min && x.Max == y.Max && (x.Average == y.Average || math.IsNaN(x.Average) && math.IsNaN(y.Average)) && (x.Deviation == y.Deviation || math.IsNaN(x.Deviation) && math.IsNaN(y.Deviation))
}
type integrator struct {
clock clock.PassiveClock
sync.Mutex
lastTime time.Time
x float64
moments Moments
min, max float64
}
// NewIntegrator makes one that uses the given clock
func NewIntegrator(clock clock.PassiveClock) Integrator {
return &integrator{
clock: clock,
lastTime: clock.Now(),
}
}
func (igr *integrator) SetX1(x1 float64) {
}
func (igr *integrator) Set(x float64) {
igr.Lock()
igr.setLocked(x)
igr.Unlock()
}
func (igr *integrator) setLocked(x float64) {
igr.updateLocked()
igr.x = x
if x < igr.min {
igr.min = x
}
if x > igr.max {
igr.max = x
}
}
func (igr *integrator) Add(deltaX float64) {
igr.Lock()
igr.setLocked(igr.x + deltaX)
igr.Unlock()
}
func (igr *integrator) updateLocked() {
now := igr.clock.Now()
dt := now.Sub(igr.lastTime).Seconds()
igr.lastTime = now
igr.moments = igr.moments.Add(ConstantMoments(dt, igr.x))
}
func (igr *integrator) GetResults() IntegratorResults {
igr.Lock()
defer igr.Unlock()
return igr.getResultsLocked()
}
func (igr *integrator) Reset() IntegratorResults {
igr.Lock()
defer igr.Unlock()
results := igr.getResultsLocked()
igr.moments = Moments{}
igr.min = igr.x
igr.max = igr.x
return results
}
func (igr *integrator) getResultsLocked() (results IntegratorResults) {
igr.updateLocked()
results.Min, results.Max = igr.min, igr.max
results.Duration = igr.moments.ElapsedSeconds
results.Average, results.Deviation = igr.moments.AvgAndStdDev()
return
}
// Moments are the integrals of the 0, 1, and 2 powers of some
// variable X over some range of time.
type Moments struct {
ElapsedSeconds float64 // integral of dt
IntegralX float64 // integral of x dt
IntegralXX float64 // integral of x*x dt
}
// ConstantMoments is for a constant X
func ConstantMoments(dt, x float64) Moments {
return Moments{
ElapsedSeconds: dt,
IntegralX: x * dt,
IntegralXX: x * x * dt,
}
}
// Add combines over two ranges of time
func (igr Moments) Add(ogr Moments) Moments {
return Moments{
ElapsedSeconds: igr.ElapsedSeconds + ogr.ElapsedSeconds,
IntegralX: igr.IntegralX + ogr.IntegralX,
IntegralXX: igr.IntegralXX + ogr.IntegralXX,
}
}
// Sub finds the difference between a range of time and a subrange
func (igr Moments) Sub(ogr Moments) Moments {
return Moments{
ElapsedSeconds: igr.ElapsedSeconds - ogr.ElapsedSeconds,
IntegralX: igr.IntegralX - ogr.IntegralX,
IntegralXX: igr.IntegralXX - ogr.IntegralXX,
}
}
// AvgAndStdDev returns the average and standard devation
func (igr Moments) AvgAndStdDev() (float64, float64) {
if igr.ElapsedSeconds <= 0 {
return math.NaN(), math.NaN()
}
avg := igr.IntegralX / igr.ElapsedSeconds
// standard deviation is sqrt( average( (x - xbar)^2 ) )
// = sqrt( Integral( x^2 + xbar^2 -2*x*xbar dt ) / Duration )
// = sqrt( ( Integral( x^2 dt ) + Duration * xbar^2 - 2*xbar*Integral(x dt) ) / Duration)
// = sqrt( Integral(x^2 dt)/Duration - xbar^2 )
variance := igr.IntegralXX/igr.ElapsedSeconds - avg*avg
if variance >= 0 {
return avg, math.Sqrt(variance)
}
return avg, math.NaN()
}

View File

@ -0,0 +1,53 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fairqueuing
import (
"math"
"testing"
"time"
"k8s.io/apimachinery/pkg/util/clock"
)
func TestIntegrator(t *testing.T) {
now := time.Now()
clk := clock.NewFakeClock(now)
igr := NewIntegrator(clk)
igr.Add(3)
clk.Step(time.Second)
results := igr.GetResults()
rToo := igr.Reset()
if e := (IntegratorResults{Duration: time.Second.Seconds(), Average: 3, Deviation: 0, Min: 0, Max: 3}); !e.Equal(&results) {
t.Errorf("expected %#+v, got %#+v", e, results)
}
if !results.Equal(&rToo) {
t.Errorf("expected %#+v, got %#+v", results, rToo)
}
igr.Set(2)
results = igr.GetResults()
if e := (IntegratorResults{Duration: 0, Average: math.NaN(), Deviation: math.NaN(), Min: 2, Max: 3}); !e.Equal(&results) {
t.Errorf("expected %#+v, got %#+v", e, results)
}
clk.Step(time.Millisecond)
igr.Add(-1)
clk.Step(time.Millisecond)
results = igr.GetResults()
if e := (IntegratorResults{Duration: 2 * time.Millisecond.Seconds(), Average: 1.5, Deviation: 0.5, Min: 1, Max: 3}); !e.Equal(&results) {
t.Errorf("expected %#+v, got %#+v", e, results)
}
}

View File

@ -21,6 +21,7 @@ import (
"time" "time"
"k8s.io/apiserver/pkg/util/flowcontrol/debug" "k8s.io/apiserver/pkg/util/flowcontrol/debug"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
) )
// QueueSetFactory is used to create QueueSet objects. Creation, like // QueueSetFactory is used to create QueueSet objects. Creation, like
@ -30,7 +31,7 @@ import (
// before committing to a concurrency allotment for the second. // before committing to a concurrency allotment for the second.
type QueueSetFactory interface { type QueueSetFactory interface {
// BeginConstruction does the first phase of creating a QueueSet // BeginConstruction does the first phase of creating a QueueSet
BeginConstruction(QueuingConfig) (QueueSetCompleter, error) BeginConstruction(QueuingConfig, metrics.TimedObserverPair) (QueueSetCompleter, error)
} }
// QueueSetCompleter finishes the two-step process of creating or // QueueSetCompleter finishes the two-step process of creating or
@ -79,7 +80,7 @@ type QueueSet interface {
// was idle at the moment of the return. Otherwise idle==false // was idle at the moment of the return. Otherwise idle==false
// and the client must call the Finish method of the Request // and the client must call the Finish method of the Request
// exactly once. // exactly once.
StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) (req Request, idle bool) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn QueueNoteFn) (req Request, idle bool)
// Dump saves and returns the instant internal state of the queue-set. // Dump saves and returns the instant internal state of the queue-set.
// Note that dumping process will stop the queue-set from proceeding // Note that dumping process will stop the queue-set from proceeding
@ -88,6 +89,9 @@ type QueueSet interface {
Dump(includeRequestDetails bool) debug.QueueSetDump Dump(includeRequestDetails bool) debug.QueueSetDump
} }
// QueueNoteFn is called when a request enters and leaves a queue
type QueueNoteFn func(inQueue bool)
// Request represents the remainder of the handling of one request // Request represents the remainder of the handling of one request
type Request interface { type Request interface {
// Finish determines whether to execute or reject the request and // Finish determines whether to execute or reject the request and

View File

@ -31,6 +31,7 @@ go_test(
srcs = ["queueset_test.go"], srcs = ["queueset_test.go"],
embed = [":go_default_library"], embed = [":go_default_library"],
deps = [ deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:go_default_library",

View File

@ -49,6 +49,7 @@ type queueSetFactory struct {
// the fields `factory` and `theSet` is non-nil. // the fields `factory` and `theSet` is non-nil.
type queueSetCompleter struct { type queueSetCompleter struct {
factory *queueSetFactory factory *queueSetFactory
obsPair metrics.TimedObserverPair
theSet *queueSet theSet *queueSet
qCfg fq.QueuingConfig qCfg fq.QueuingConfig
dealer *shufflesharding.Dealer dealer *shufflesharding.Dealer
@ -67,6 +68,7 @@ type queueSet struct {
clock clock.PassiveClock clock clock.PassiveClock
counter counter.GoRoutineCounter counter counter.GoRoutineCounter
estimatedServiceTime float64 estimatedServiceTime float64
obsPair metrics.TimedObserverPair
lock sync.Mutex lock sync.Mutex
@ -116,13 +118,14 @@ func NewQueueSetFactory(c clock.PassiveClock, counter counter.GoRoutineCounter)
} }
} }
func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) { func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, obsPair metrics.TimedObserverPair) (fq.QueueSetCompleter, error) {
dealer, err := checkConfig(qCfg) dealer, err := checkConfig(qCfg)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &queueSetCompleter{ return &queueSetCompleter{
factory: qsf, factory: qsf,
obsPair: obsPair,
qCfg: qCfg, qCfg: qCfg,
dealer: dealer}, nil dealer: dealer}, nil
} }
@ -148,6 +151,7 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet {
clock: qsc.factory.clock, clock: qsc.factory.clock,
counter: qsc.factory.counter, counter: qsc.factory.counter,
estimatedServiceTime: 60, estimatedServiceTime: 60,
obsPair: qsc.obsPair,
qCfg: qsc.qCfg, qCfg: qsc.qCfg,
virtualTime: 0, virtualTime: 0,
lastRealTime: qsc.factory.clock.Now(), lastRealTime: qsc.factory.clock.Now(),
@ -203,6 +207,12 @@ func (qs *queueSet) setConfiguration(qCfg fq.QueuingConfig, dealer *shuffleshard
qs.qCfg = qCfg qs.qCfg = qCfg
qs.dCfg = dCfg qs.dCfg = dCfg
qs.dealer = dealer qs.dealer = dealer
qll := qCfg.QueueLengthLimit
if qll < 1 {
qll = 1
}
qs.obsPair.RequestsWaiting.SetX1(float64(qll))
qs.obsPair.RequestsExecuting.SetX1(float64(dCfg.ConcurrencyLimit))
qs.dispatchAsMuchAsPossibleLocked() qs.dispatchAsMuchAsPossibleLocked()
} }
@ -222,7 +232,7 @@ const (
// executing at each point where there is a change in that quantity, // executing at each point where there is a change in that quantity,
// because the metrics --- and only the metrics --- track that // because the metrics --- and only the metrics --- track that
// quantity per FlowSchema. // quantity per FlowSchema.
func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) (fq.Request, bool) { func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) {
qs.lockAndSyncTime() qs.lockAndSyncTime()
defer qs.lock.Unlock() defer qs.lock.Unlock()
var req *request var req *request
@ -247,7 +257,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist
// 3) Reject current request if there is not enough concurrency shares and // 3) Reject current request if there is not enough concurrency shares and
// we are at max queue length // we are at max queue length
// 4) If not rejected, create a request and enqueue // 4) If not rejected, create a request and enqueue
req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, hashValue, flowDistinguisher, fsName, descr1, descr2) req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, hashValue, flowDistinguisher, fsName, descr1, descr2, queueNoteFn)
// req == nil means that the request was rejected - no remaining // req == nil means that the request was rejected - no remaining
// concurrency shares and at max queue length already // concurrency shares and at max queue length already
if req == nil { if req == nil {
@ -295,6 +305,12 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist
return req, false return req, false
} }
func (req *request) NoteQueued(inQueue bool) {
if req.queueNoteFn != nil {
req.queueNoteFn(inQueue)
}
}
func (req *request) Finish(execFn func()) bool { func (req *request) Finish(execFn func()) bool {
exec, idle := req.wait() exec, idle := req.wait()
if !exec { if !exec {
@ -399,7 +415,7 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 {
// returns the enqueud request on a successful enqueue // returns the enqueud request on a successful enqueue
// returns nil in the case that there is no available concurrency or // returns nil in the case that there is no available concurrency or
// the queuelengthlimit has been reached // the queuelengthlimit has been reached
func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request { func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request {
// Start with the shuffle sharding, to pick a queue. // Start with the shuffle sharding, to pick a queue.
queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2) queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2)
queue := qs.queues[queueIdx] queue := qs.queues[queueIdx]
@ -420,6 +436,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
queue: queue, queue: queue,
descr1: descr1, descr1: descr1,
descr2: descr2, descr2: descr2,
queueNoteFn: queueNoteFn,
} }
if ok := qs.rejectOrEnqueueLocked(req); !ok { if ok := qs.rejectOrEnqueueLocked(req); !ok {
return nil return nil
@ -463,6 +480,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s
// get index for timed out requests // get index for timed out requests
timeoutIdx = i timeoutIdx = i
metrics.AddRequestsInQueues(qs.qCfg.Name, req.fsName, -1) metrics.AddRequestsInQueues(qs.qCfg.Name, req.fsName, -1)
req.NoteQueued(false)
} else { } else {
break break
} }
@ -475,6 +493,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s
queue.requests = reqs[removeIdx:] queue.requests = reqs[removeIdx:]
// decrement the # of requestsEnqueued // decrement the # of requestsEnqueued
qs.totRequestsWaiting -= removeIdx qs.totRequestsWaiting -= removeIdx
qs.obsPair.RequestsWaiting.Add(float64(-removeIdx))
} }
} }
@ -498,16 +517,19 @@ func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
// enqueues a request into its queue. // enqueues a request into its queue.
func (qs *queueSet) enqueueLocked(request *request) { func (qs *queueSet) enqueueLocked(request *request) {
queue := request.queue queue := request.queue
now := qs.clock.Now()
if len(queue.requests) == 0 && queue.requestsExecuting == 0 { if len(queue.requests) == 0 && queue.requestsExecuting == 0 {
// the queues virtual start time is set to the virtual time. // the queues virtual start time is set to the virtual time.
queue.virtualStart = qs.virtualTime queue.virtualStart = qs.virtualTime
if klog.V(6).Enabled() { if klog.V(6).Enabled() {
klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2) klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2)
} }
} }
queue.Enqueue(request) queue.Enqueue(request)
qs.totRequestsWaiting++ qs.totRequestsWaiting++
metrics.AddRequestsInQueues(qs.qCfg.Name, request.fsName, 1) metrics.AddRequestsInQueues(qs.qCfg.Name, request.fsName, 1)
request.NoteQueued(true)
qs.obsPair.RequestsWaiting.Add(1)
} }
// dispatchAsMuchAsPossibleLocked runs a loop, as long as there // dispatchAsMuchAsPossibleLocked runs a loop, as long as there
@ -541,6 +563,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, flowDistinguish
req.decision.SetLocked(decisionExecute) req.decision.SetLocked(decisionExecute)
qs.totRequestsExecuting++ qs.totRequestsExecuting++
metrics.AddRequestsExecuting(qs.qCfg.Name, fsName, 1) metrics.AddRequestsExecuting(qs.qCfg.Name, fsName, 1)
qs.obsPair.RequestsExecuting.Add(1)
if klog.V(5).Enabled() { if klog.V(5).Enabled() {
klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting) klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting)
} }
@ -570,7 +593,10 @@ func (qs *queueSet) dispatchLocked() bool {
qs.totRequestsExecuting++ qs.totRequestsExecuting++
queue.requestsExecuting++ queue.requestsExecuting++
metrics.AddRequestsInQueues(qs.qCfg.Name, request.fsName, -1) metrics.AddRequestsInQueues(qs.qCfg.Name, request.fsName, -1)
request.NoteQueued(false)
metrics.AddRequestsExecuting(qs.qCfg.Name, request.fsName, 1) metrics.AddRequestsExecuting(qs.qCfg.Name, request.fsName, 1)
qs.obsPair.RequestsWaiting.Add(-1)
qs.obsPair.RequestsExecuting.Add(1)
if klog.V(6).Enabled() { if klog.V(6).Enabled() {
klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.index, queue.virtualStart, len(queue.requests), queue.requestsExecuting) klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.index, queue.virtualStart, len(queue.requests), queue.requestsExecuting)
} }
@ -599,6 +625,8 @@ func (qs *queueSet) cancelWait(req *request) {
queue.requests = append(queue.requests[:i], queue.requests[i+1:]...) queue.requests = append(queue.requests[:i], queue.requests[i+1:]...)
qs.totRequestsWaiting-- qs.totRequestsWaiting--
metrics.AddRequestsInQueues(qs.qCfg.Name, req.fsName, -1) metrics.AddRequestsInQueues(qs.qCfg.Name, req.fsName, -1)
req.NoteQueued(false)
qs.obsPair.RequestsWaiting.Add(-1)
break break
} }
} }
@ -650,17 +678,19 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool
// previously dispatched request has completed it's service. This // previously dispatched request has completed it's service. This
// callback updates important state in the queueSet // callback updates important state in the queueSet
func (qs *queueSet) finishRequestLocked(r *request) { func (qs *queueSet) finishRequestLocked(r *request) {
now := qs.clock.Now()
qs.totRequestsExecuting-- qs.totRequestsExecuting--
metrics.AddRequestsExecuting(qs.qCfg.Name, r.fsName, -1) metrics.AddRequestsExecuting(qs.qCfg.Name, r.fsName, -1)
qs.obsPair.RequestsExecuting.Add(-1)
if r.queue == nil { if r.queue == nil {
if klog.V(6).Enabled() { if klog.V(6).Enabled() {
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting) klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting)
} }
return return
} }
S := qs.clock.Since(r.startTime).Seconds() S := now.Sub(r.startTime).Seconds()
// When a request finishes being served, and the actual service time was S, // When a request finishes being served, and the actual service time was S,
// the queues virtual start time is decremented by G - S. // the queues virtual start time is decremented by G - S.
@ -670,7 +700,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
r.queue.requestsExecuting-- r.queue.requestsExecuting--
if klog.V(6).Enabled() { if klog.V(6).Enabled() {
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index, r.queue.virtualStart, S, len(r.queue.requests), r.queue.requestsExecuting) klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index, r.queue.virtualStart, S, len(r.queue.requests), r.queue.requestsExecuting)
} }
// If there are more queues than desired and this one has no // If there are more queues than desired and this one has no

View File

@ -26,10 +26,11 @@ import (
"testing" "testing"
"time" "time"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apiserver/pkg/util/flowcontrol/counter" "k8s.io/apiserver/pkg/util/flowcontrol/counter"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
test "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing" test "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock" testclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics" "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
@ -128,7 +129,7 @@ type uniformScenario struct {
expectAllRequests bool expectAllRequests bool
evalInqueueMetrics, evalExecutingMetrics bool evalInqueueMetrics, evalExecutingMetrics bool
rejectReason string rejectReason string
clk *clock.FakeEventClock clk *testclock.FakeEventClock
counter counter.GoRoutineCounter counter counter.GoRoutineCounter
} }
@ -137,7 +138,7 @@ func (us uniformScenario) exercise(t *testing.T) {
t: t, t: t,
uniformScenario: us, uniformScenario: us,
startTime: time.Now(), startTime: time.Now(),
integrators: make([]test.Integrator, len(us.clients)), integrators: make([]fq.Integrator, len(us.clients)),
executions: make([]int32, len(us.clients)), executions: make([]int32, len(us.clients)),
rejects: make([]int32, len(us.clients)), rejects: make([]int32, len(us.clients)),
} }
@ -152,7 +153,7 @@ type uniformScenarioState struct {
uniformScenario uniformScenario
startTime time.Time startTime time.Time
doSplit bool doSplit bool
integrators []test.Integrator integrators []fq.Integrator
failedCount uint64 failedCount uint64
expectedInqueue, expectedExecuting string expectedInqueue, expectedExecuting string
executions, rejects []int32 executions, rejects []int32
@ -164,7 +165,7 @@ func (uss *uniformScenarioState) exercise() {
metrics.Reset() metrics.Reset()
} }
for i, uc := range uss.clients { for i, uc := range uss.clients {
uss.integrators[i] = test.NewIntegrator(uss.clk) uss.integrators[i] = fq.NewIntegrator(uss.clk)
fsName := fmt.Sprintf("client%d", i) fsName := fmt.Sprintf("client%d", i)
uss.expectedInqueue = uss.expectedInqueue + fmt.Sprintf(` apiserver_flowcontrol_current_inqueue_requests{flowSchema=%q,priorityLevel=%q} 0%s`, fsName, uss.name, "\n") uss.expectedInqueue = uss.expectedInqueue + fmt.Sprintf(` apiserver_flowcontrol_current_inqueue_requests{flowSchema=%q,priorityLevel=%q} 0%s`, fsName, uss.name, "\n")
for j := 0; j < uc.nThreads; j++ { for j := 0; j < uc.nThreads; j++ {
@ -193,7 +194,7 @@ type uniformScenarioThread struct {
i, j int i, j int
nCalls int nCalls int
uc uniformClient uc uniformClient
igr test.Integrator igr fq.Integrator
fsName string fsName string
} }
@ -223,7 +224,7 @@ func (ust *uniformScenarioThread) callK(k int) {
if k >= ust.nCalls { if k >= ust.nCalls {
return return
} }
req, idle := ust.uss.qs.StartRequest(context.Background(), ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}) req, idle := ust.uss.qs.StartRequest(context.Background(), ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil)
ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle) ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle)
if req == nil { if req == nil {
atomic.AddUint64(&ust.uss.failedCount, 1) atomic.AddUint64(&ust.uss.failedCount, 1)
@ -236,7 +237,8 @@ func (ust *uniformScenarioThread) callK(k int) {
var executed bool var executed bool
idle2 := req.Finish(func() { idle2 := req.Finish(func() {
executed = true executed = true
ust.uss.t.Logf("%s: %d, %d, %d executing", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k) execStart := ust.uss.clk.Now()
ust.uss.t.Logf("%s: %d, %d, %d executing", execStart.Format(nsTimeFmt), ust.i, ust.j, k)
atomic.AddInt32(&ust.uss.executions[ust.i], 1) atomic.AddInt32(&ust.uss.executions[ust.i], 1)
ust.igr.Add(1) ust.igr.Add(1)
ust.uss.clk.EventAfterDuration(ust.genCallK(k+1), ust.uc.execDuration+ust.uc.thinkDuration) ust.uss.clk.EventAfterDuration(ust.genCallK(k+1), ust.uc.execDuration+ust.uc.thinkDuration)
@ -339,7 +341,7 @@ func (uss *uniformScenarioState) finalReview() {
} }
} }
func ClockWait(clk *clock.FakeEventClock, counter counter.GoRoutineCounter, duration time.Duration) { func ClockWait(clk *testclock.FakeEventClock, counter counter.GoRoutineCounter, duration time.Duration) {
dunch := make(chan struct{}) dunch := make(chan struct{})
clk.EventAfterDuration(func(time.Time) { clk.EventAfterDuration(func(time.Time) {
counter.Add(1) counter.Add(1)
@ -359,8 +361,8 @@ func init() {
func TestNoRestraint(t *testing.T) { func TestNoRestraint(t *testing.T) {
metrics.Register() metrics.Register()
now := time.Now() now := time.Now()
clk, counter := clock.NewFakeEventClock(now, 0, nil) clk, counter := testclock.NewFakeEventClock(now, 0, nil)
nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}) nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newObserverPair(clk))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -384,7 +386,7 @@ func TestUniformFlowsHandSize1(t *testing.T) {
metrics.Register() metrics.Register()
now := time.Now() now := time.Now()
clk, counter := clock.NewFakeEventClock(now, 0, nil) clk, counter := testclock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter) qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{ qCfg := fq.QueuingConfig{
Name: "TestUniformFlowsHandSize1", Name: "TestUniformFlowsHandSize1",
@ -393,7 +395,7 @@ func TestUniformFlowsHandSize1(t *testing.T) {
HandSize: 1, HandSize: 1,
RequestWaitLimit: 10 * time.Minute, RequestWaitLimit: 10 * time.Minute,
} }
qsc, err := qsf.BeginConstruction(qCfg) qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -420,7 +422,7 @@ func TestUniformFlowsHandSize3(t *testing.T) {
metrics.Register() metrics.Register()
now := time.Now() now := time.Now()
clk, counter := clock.NewFakeEventClock(now, 0, nil) clk, counter := testclock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter) qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{ qCfg := fq.QueuingConfig{
Name: "TestUniformFlowsHandSize3", Name: "TestUniformFlowsHandSize3",
@ -429,7 +431,7 @@ func TestUniformFlowsHandSize3(t *testing.T) {
HandSize: 3, HandSize: 3,
RequestWaitLimit: 10 * time.Minute, RequestWaitLimit: 10 * time.Minute,
} }
qsc, err := qsf.BeginConstruction(qCfg) qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -455,7 +457,7 @@ func TestDifferentFlowsExpectEqual(t *testing.T) {
metrics.Register() metrics.Register()
now := time.Now() now := time.Now()
clk, counter := clock.NewFakeEventClock(now, 0, nil) clk, counter := testclock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter) qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{ qCfg := fq.QueuingConfig{
Name: "DiffFlowsExpectEqual", Name: "DiffFlowsExpectEqual",
@ -464,7 +466,7 @@ func TestDifferentFlowsExpectEqual(t *testing.T) {
HandSize: 1, HandSize: 1,
RequestWaitLimit: 10 * time.Minute, RequestWaitLimit: 10 * time.Minute,
} }
qsc, err := qsf.BeginConstruction(qCfg) qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -491,7 +493,7 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) {
metrics.Register() metrics.Register()
now := time.Now() now := time.Now()
clk, counter := clock.NewFakeEventClock(now, 0, nil) clk, counter := testclock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter) qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{ qCfg := fq.QueuingConfig{
Name: "DiffFlowsExpectUnequal", Name: "DiffFlowsExpectUnequal",
@ -500,7 +502,7 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) {
HandSize: 1, HandSize: 1,
RequestWaitLimit: 10 * time.Minute, RequestWaitLimit: 10 * time.Minute,
} }
qsc, err := qsf.BeginConstruction(qCfg) qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -527,7 +529,7 @@ func TestWindup(t *testing.T) {
metrics.Register() metrics.Register()
now := time.Now() now := time.Now()
clk, counter := clock.NewFakeEventClock(now, 0, nil) clk, counter := testclock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter) qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{ qCfg := fq.QueuingConfig{
Name: "TestWindup", Name: "TestWindup",
@ -536,7 +538,7 @@ func TestWindup(t *testing.T) {
HandSize: 1, HandSize: 1,
RequestWaitLimit: 10 * time.Minute, RequestWaitLimit: 10 * time.Minute,
} }
qsc, err := qsf.BeginConstruction(qCfg) qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -562,13 +564,13 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) {
metrics.Register() metrics.Register()
now := time.Now() now := time.Now()
clk, counter := clock.NewFakeEventClock(now, 0, nil) clk, counter := testclock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter) qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{ qCfg := fq.QueuingConfig{
Name: "TestDifferentFlowsWithoutQueuing", Name: "TestDifferentFlowsWithoutQueuing",
DesiredNumQueues: 0, DesiredNumQueues: 0,
} }
qsc, err := qsf.BeginConstruction(qCfg) qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -594,7 +596,7 @@ func TestTimeout(t *testing.T) {
metrics.Register() metrics.Register()
now := time.Now() now := time.Now()
clk, counter := clock.NewFakeEventClock(now, 0, nil) clk, counter := testclock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter) qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{ qCfg := fq.QueuingConfig{
Name: "TestTimeout", Name: "TestTimeout",
@ -603,7 +605,7 @@ func TestTimeout(t *testing.T) {
HandSize: 1, HandSize: 1,
RequestWaitLimit: 0, RequestWaitLimit: 0,
} }
qsc, err := qsf.BeginConstruction(qCfg) qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -629,7 +631,7 @@ func TestContextCancel(t *testing.T) {
metrics.Register() metrics.Register()
metrics.Reset() metrics.Reset()
now := time.Now() now := time.Now()
clk, counter := clock.NewFakeEventClock(now, 0, nil) clk, counter := testclock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter) qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{ qCfg := fq.QueuingConfig{
Name: "TestContextCancel", Name: "TestContextCancel",
@ -638,18 +640,26 @@ func TestContextCancel(t *testing.T) {
HandSize: 1, HandSize: 1,
RequestWaitLimit: 15 * time.Second, RequestWaitLimit: 15 * time.Second,
} }
qsc, err := qsf.BeginConstruction(qCfg) qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1}) qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1})
counter.Add(1) // account for the goroutine running this test counter.Add(1) // account for the goroutine running this test
ctx1 := context.Background() ctx1 := context.Background()
req1, _ := qs.StartRequest(ctx1, 1, "", "fs1", "test", "one") b2i := map[bool]int{false: 0, true: 1}
var qnc [2][2]int32
req1, _ := qs.StartRequest(ctx1, 1, "", "fs1", "test", "one", func(inQueue bool) { atomic.AddInt32(&qnc[0][b2i[inQueue]], 1) })
if req1 == nil { if req1 == nil {
t.Error("Request rejected") t.Error("Request rejected")
return return
} }
if a := atomic.AddInt32(&qnc[0][0], 0); a != 1 {
t.Errorf("Got %d calls to queueNoteFn1(false), expected 1", a)
}
if a := atomic.AddInt32(&qnc[0][1], 0); a != 1 {
t.Errorf("Got %d calls to queueNoteFn1(true), expected 1", a)
}
var executed1 bool var executed1 bool
idle1 := req1.Finish(func() { idle1 := req1.Finish(func() {
executed1 = true executed1 = true
@ -657,11 +667,17 @@ func TestContextCancel(t *testing.T) {
tBefore := time.Now() tBefore := time.Now()
go func() { go func() {
time.Sleep(time.Second) time.Sleep(time.Second)
if a := atomic.AddInt32(&qnc[1][0], 0); a != 0 {
t.Errorf("Got %d calls to queueNoteFn2(false), expected 0", a)
}
if a := atomic.AddInt32(&qnc[1][1], 0); a != 1 {
t.Errorf("Got %d calls to queueNoteFn2(true), expected 1", a)
}
// account for unblocking the goroutine that waits on cancelation // account for unblocking the goroutine that waits on cancelation
counter.Add(1) counter.Add(1)
cancel2() cancel2()
}() }()
req2, idle2a := qs.StartRequest(ctx2, 2, "", "fs2", "test", "two") req2, idle2a := qs.StartRequest(ctx2, 2, "", "fs2", "test", "two", func(inQueue bool) { atomic.AddInt32(&qnc[1][b2i[inQueue]], 1) })
if idle2a { if idle2a {
t.Error("2nd StartRequest returned idle") t.Error("2nd StartRequest returned idle")
} }
@ -672,6 +688,9 @@ func TestContextCancel(t *testing.T) {
if idle2b { if idle2b {
t.Error("2nd Finish returned idle") t.Error("2nd Finish returned idle")
} }
if a := atomic.AddInt32(&qnc[1][0], 0); a != 1 {
t.Errorf("Got %d calls to queueNoteFn2(false), expected 1", a)
}
} }
tAfter := time.Now() tAfter := time.Now()
dt := tAfter.Sub(tBefore) dt := tAfter.Sub(tBefore)
@ -686,3 +705,7 @@ func TestContextCancel(t *testing.T) {
t.Error("Not idle at the end") t.Error("Not idle at the end")
} }
} }
func newObserverPair(clk clock.PassiveClock) metrics.TimedObserverPair {
return metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, []string{"test"})
}

View File

@ -22,6 +22,7 @@ import (
genericrequest "k8s.io/apiserver/pkg/endpoints/request" genericrequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/util/flowcontrol/debug" "k8s.io/apiserver/pkg/util/flowcontrol/debug"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise" "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise"
) )
@ -58,6 +59,8 @@ type request struct {
// Indicates whether client has called Request::Wait() // Indicates whether client has called Request::Wait()
waitStarted bool waitStarted bool
queueNoteFn fq.QueueNoteFn
} }
// queue is an array of requests with additional metadata required for // queue is an array of requests with additional metadata required for

View File

@ -2,17 +2,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = ["no-restraint.go"],
"integrator.go",
"no-restraint.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing", importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing",
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing", importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing",
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library",
], ],
) )

View File

@ -1,116 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
import (
"math"
"sync"
"time"
"k8s.io/apimachinery/pkg/util/clock"
)
// Integrator computes the integral of some variable X over time as
// read from a particular clock. The integral starts when the
// Integrator is created, and ends at the latest operation on the
// Integrator.
type Integrator interface {
Set(float64) // set the value of X
Add(float64) // add the given quantity to X
GetResults() IntegratorResults
Reset() IntegratorResults // restart the integration from now
}
// IntegratorResults holds statistical abstracts of the integration
type IntegratorResults struct {
Duration float64 //seconds
Average float64
Deviation float64 //sqrt(avg((value-avg)^2))
}
type integrator struct {
sync.Mutex
clk clock.PassiveClock
lastTime time.Time
x float64
integrals [3]float64 // integral of x^0, x^1, and x^2
}
// NewIntegrator makes one that uses the given clock
func NewIntegrator(clk clock.PassiveClock) Integrator {
return &integrator{
clk: clk,
lastTime: clk.Now(),
}
}
func (igr *integrator) Set(x float64) {
igr.Lock()
igr.updateLocked()
igr.x = x
igr.Unlock()
}
func (igr *integrator) Add(deltaX float64) {
igr.Lock()
igr.updateLocked()
igr.x += deltaX
igr.Unlock()
}
func (igr *integrator) updateLocked() {
now := igr.clk.Now()
dt := now.Sub(igr.lastTime).Seconds()
igr.lastTime = now
igr.integrals[0] += dt
igr.integrals[1] += dt * igr.x
igr.integrals[2] += dt * igr.x * igr.x
}
func (igr *integrator) GetResults() (results IntegratorResults) {
igr.Lock()
defer func() { igr.Unlock() }()
return igr.getResultsLocked()
}
func (igr *integrator) getResultsLocked() (results IntegratorResults) {
igr.updateLocked()
results.Duration = igr.integrals[0]
if results.Duration <= 0 {
results.Average = math.NaN()
results.Deviation = math.NaN()
return
}
results.Average = igr.integrals[1] / igr.integrals[0]
// Deviation is sqrt( Integral( (x - xbar)^2 dt) / Duration )
// = sqrt( Integral( x^2 + xbar^2 -2*x*xbar dt ) / Duration )
// = sqrt( ( Integral( x^2 dt ) + Duration * xbar^2 - 2*xbar*Integral(x dt) ) / Duration)
// = sqrt( Integral(x^2 dt)/Duration - xbar^2 )
variance := igr.integrals[2]/igr.integrals[0] - results.Average*results.Average
if variance > 0 {
results.Deviation = math.Sqrt(variance)
}
return
}
func (igr *integrator) Reset() (results IntegratorResults) {
igr.Lock()
defer func() { igr.Unlock() }()
results = igr.getResultsLocked()
igr.integrals = [3]float64{0, 0, 0}
return
}

View File

@ -21,6 +21,7 @@ import (
"k8s.io/apiserver/pkg/util/flowcontrol/debug" "k8s.io/apiserver/pkg/util/flowcontrol/debug"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
) )
// NewNoRestraintFactory makes a QueueSetFactory that produces // NewNoRestraintFactory makes a QueueSetFactory that produces
@ -38,7 +39,7 @@ type noRestraint struct{}
type noRestraintRequest struct{} type noRestraintRequest struct{}
func (noRestraintFactory) BeginConstruction(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) { func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.TimedObserverPair) (fq.QueueSetCompleter, error) {
return noRestraintCompleter{}, nil return noRestraintCompleter{}, nil
} }
@ -54,7 +55,7 @@ func (noRestraint) IsIdle() bool {
return false return false
} }
func (noRestraint) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) (fq.Request, bool) { func (noRestraint) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) {
return noRestraintRequest{}, false return noRestraintRequest{}, false
} }

View File

@ -31,6 +31,7 @@ import (
"k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/endpoints/request"
fqtesting "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing" fqtesting "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing"
fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format" fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
) )
var noRestraintQSF = fqtesting.NewNoRestraintFactory() var noRestraintQSF = fqtesting.NewNoRestraintFactory()
@ -55,7 +56,7 @@ func genPL(rng *rand.Rand, name string) *fcv1a1.PriorityLevelConfiguration {
HandSize: hs, HandSize: hs,
QueueLengthLimit: 5} QueueLengthLimit: 5}
} }
_, err := qscOfPL(noRestraintQSF, nil, plc, time.Minute) _, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, time.Minute, metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, []string{"test"}))
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@ -2,14 +2,20 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = ["metrics.go"], srcs = [
"metrics.go",
"sample_and_watermark.go",
"timed_observer.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/metrics", importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/metrics",
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/metrics", importpath = "k8s.io/apiserver/pkg/util/flowcontrol/metrics",
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/component-base/metrics:go_default_library", "//staging/src/k8s.io/component-base/metrics:go_default_library",
"//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library", "//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library",
"//staging/src/k8s.io/component-base/metrics/testutil:go_default_library", "//staging/src/k8s.io/component-base/metrics/testutil:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
], ],
) )

View File

@ -21,6 +21,7 @@ import (
"sync" "sync"
"time" "time"
"k8s.io/apimachinery/pkg/util/clock"
compbasemetrics "k8s.io/component-base/metrics" compbasemetrics "k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/legacyregistry"
basemetricstestutil "k8s.io/component-base/metrics/testutil" basemetricstestutil "k8s.io/component-base/metrics/testutil"
@ -32,8 +33,11 @@ const (
) )
const ( const (
requestKind = "request_kind"
priorityLevel = "priorityLevel" priorityLevel = "priorityLevel"
flowSchema = "flowSchema" flowSchema = "flowSchema"
phase = "phase"
mark = "mark"
) )
var ( var (
@ -69,6 +73,14 @@ func GatherAndCompare(expected string, metricNames ...string) error {
return basemetricstestutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...) return basemetricstestutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...)
} }
// Registerables is a slice of Registerable
type Registerables []compbasemetrics.Registerable
// Append adds more
func (rs Registerables) Append(more ...compbasemetrics.Registerable) Registerables {
return append(rs, more...)
}
var ( var (
apiserverRejectedRequestsTotal = compbasemetrics.NewCounterVec( apiserverRejectedRequestsTotal = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{ &compbasemetrics.CounterOpts{
@ -88,6 +100,47 @@ var (
}, },
[]string{priorityLevel, flowSchema}, []string{priorityLevel, flowSchema},
) )
// PriorityLevelConcurrencyObserverPairGenerator creates pairs that observe concurrency for priority levels
PriorityLevelConcurrencyObserverPairGenerator = NewSampleAndWaterMarkHistogramsPairGenerator(clock.RealClock{}, time.Millisecond,
&compbasemetrics.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "priority_level_request_count_samples",
Help: "Periodic observations of the number of requests",
Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1},
StabilityLevel: compbasemetrics.ALPHA,
},
&compbasemetrics.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "priority_level_request_count_watermarks",
Help: "Watermarks of the number of requests",
Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1},
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{priorityLevel})
// ReadWriteConcurrencyObserverPairGenerator creates pairs that observe concurrency broken down by mutating vs readonly
ReadWriteConcurrencyObserverPairGenerator = NewSampleAndWaterMarkHistogramsPairGenerator(clock.RealClock{}, time.Millisecond,
&compbasemetrics.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "read_vs_write_request_count_samples",
Help: "Periodic observations of the number of requests",
Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1},
StabilityLevel: compbasemetrics.ALPHA,
},
&compbasemetrics.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "read_vs_write_request_count_watermarks",
Help: "Watermarks of the number of requests",
Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1},
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{requestKind})
apiserverCurrentInqueueRequests = compbasemetrics.NewGaugeVec( apiserverCurrentInqueueRequests = compbasemetrics.NewGaugeVec(
&compbasemetrics.GaugeOpts{ &compbasemetrics.GaugeOpts{
Namespace: namespace, Namespace: namespace,
@ -145,7 +198,7 @@ var (
}, },
[]string{priorityLevel, flowSchema}, []string{priorityLevel, flowSchema},
) )
metrics = []compbasemetrics.Registerable{ metrics = Registerables{
apiserverRejectedRequestsTotal, apiserverRejectedRequestsTotal,
apiserverDispatchedRequestsTotal, apiserverDispatchedRequestsTotal,
apiserverCurrentInqueueRequests, apiserverCurrentInqueueRequests,
@ -154,7 +207,9 @@ var (
apiserverCurrentExecutingRequests, apiserverCurrentExecutingRequests,
apiserverRequestWaitingSeconds, apiserverRequestWaitingSeconds,
apiserverRequestExecutionSeconds, apiserverRequestExecutionSeconds,
} }.
Append(PriorityLevelConcurrencyObserverPairGenerator.metrics()...).
Append(ReadWriteConcurrencyObserverPairGenerator.metrics()...)
) )
// AddRequestsInQueues adds the given delta to the gauge of the # of requests in the queues of the specified flowSchema and priorityLevel // AddRequestsInQueues adds the given delta to the gauge of the # of requests in the queues of the specified flowSchema and priorityLevel

View File

@ -0,0 +1,192 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"sync"
"time"
"k8s.io/apimachinery/pkg/util/clock"
compbasemetrics "k8s.io/component-base/metrics"
"k8s.io/klog/v2"
)
const (
labelNameMark = "mark"
labelValueLo = "low"
labelValueHi = "high"
labelNamePhase = "phase"
labelValueWaiting = "waiting"
labelValueExecuting = "executing"
)
// SampleAndWaterMarkPairGenerator makes pairs of TimedObservers that
// track samples and watermarks.
type SampleAndWaterMarkPairGenerator struct {
urGenerator SampleAndWaterMarkObserverGenerator
}
var _ TimedObserverPairGenerator = SampleAndWaterMarkPairGenerator{}
// NewSampleAndWaterMarkHistogramsPairGenerator makes a new pair generator
func NewSampleAndWaterMarkHistogramsPairGenerator(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkPairGenerator {
return SampleAndWaterMarkPairGenerator{
urGenerator: NewSampleAndWaterMarkHistogramsGenerator(clock, samplePeriod, sampleOpts, waterMarkOpts, append([]string{labelNamePhase}, labelNames...)),
}
}
// Generate makes a new pair
func (spg SampleAndWaterMarkPairGenerator) Generate(waiting1, executing1 float64, labelValues []string) TimedObserverPair {
return TimedObserverPair{
RequestsWaiting: spg.urGenerator.Generate(0, waiting1, append([]string{labelValueWaiting}, labelValues...)),
RequestsExecuting: spg.urGenerator.Generate(0, executing1, append([]string{labelValueExecuting}, labelValues...)),
}
}
func (spg SampleAndWaterMarkPairGenerator) metrics() Registerables {
return spg.urGenerator.metrics()
}
// SampleAndWaterMarkObserverGenerator creates TimedObservers that
// populate histograms of samples and low- and high-water-marks. The
// generator has a samplePeriod, and the histograms get an observation
// every samplePeriod.
type SampleAndWaterMarkObserverGenerator struct {
*sampleAndWaterMarkObserverGenerator
}
type sampleAndWaterMarkObserverGenerator struct {
clock clock.PassiveClock
samplePeriod time.Duration
samples *compbasemetrics.HistogramVec
waterMarks *compbasemetrics.HistogramVec
}
var _ TimedObserverGenerator = (*sampleAndWaterMarkObserverGenerator)(nil)
// NewSampleAndWaterMarkHistogramsGenerator makes a new one
func NewSampleAndWaterMarkHistogramsGenerator(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkObserverGenerator {
return SampleAndWaterMarkObserverGenerator{
&sampleAndWaterMarkObserverGenerator{
clock: clock,
samplePeriod: samplePeriod,
samples: compbasemetrics.NewHistogramVec(sampleOpts, labelNames),
waterMarks: compbasemetrics.NewHistogramVec(waterMarkOpts, append([]string{labelNameMark}, labelNames...)),
}}
}
func (swg *sampleAndWaterMarkObserverGenerator) quantize(when time.Time) int64 {
return when.UnixNano() / int64(swg.samplePeriod)
}
// Generate makes a new TimedObserver
func (swg *sampleAndWaterMarkObserverGenerator) Generate(x, x1 float64, labelValues []string) TimedObserver {
relX := x / x1
when := swg.clock.Now()
return &sampleAndWaterMarkHistograms{
sampleAndWaterMarkObserverGenerator: swg,
labelValues: labelValues,
loLabelValues: append([]string{labelValueLo}, labelValues...),
hiLabelValues: append([]string{labelValueHi}, labelValues...),
x1: x1,
sampleAndWaterMarkAccumulator: sampleAndWaterMarkAccumulator{
lastSet: when,
lastSetInt: swg.quantize(when),
x: x,
relX: relX,
loRelX: relX,
hiRelX: relX,
}}
}
func (swg *sampleAndWaterMarkObserverGenerator) metrics() Registerables {
return Registerables{swg.samples, swg.waterMarks}
}
type sampleAndWaterMarkHistograms struct {
*sampleAndWaterMarkObserverGenerator
labelValues []string
loLabelValues, hiLabelValues []string
sync.Mutex
x1 float64
sampleAndWaterMarkAccumulator
}
type sampleAndWaterMarkAccumulator struct {
lastSet time.Time
lastSetInt int64 // lastSet / samplePeriod
x float64
relX float64 // x / x1
loRelX, hiRelX float64
}
var _ TimedObserver = (*sampleAndWaterMarkHistograms)(nil)
func (saw *sampleAndWaterMarkHistograms) Add(deltaX float64) {
saw.innerSet(func() {
saw.x += deltaX
})
}
func (saw *sampleAndWaterMarkHistograms) Set(x float64) {
saw.innerSet(func() {
saw.x = x
})
}
func (saw *sampleAndWaterMarkHistograms) SetX1(x1 float64) {
saw.innerSet(func() {
saw.x1 = x1
})
}
func (saw *sampleAndWaterMarkHistograms) innerSet(updateXOrX1 func()) {
saw.Lock()
when := saw.clock.Now()
whenInt := saw.quantize(when)
acc := saw.sampleAndWaterMarkAccumulator
wellOrdered := !when.Before(acc.lastSet)
if wellOrdered {
updateXOrX1()
saw.relX = saw.x / saw.x1
if acc.lastSetInt < whenInt {
saw.loRelX, saw.hiRelX = acc.relX, acc.relX
saw.lastSetInt = whenInt
}
if saw.relX < saw.loRelX {
saw.loRelX = saw.relX
} else if saw.relX > saw.hiRelX {
saw.hiRelX = saw.relX
}
saw.lastSet = when
}
saw.Unlock()
if !wellOrdered {
lastSetS := acc.lastSet.Format(time.RFC3339Nano)
whenS := when.Format(time.RFC3339Nano)
klog.Fatalf("Time went backwards from %s to %s for labelValues=%#+v", lastSetS, whenS, saw.labelValues)
panic(append([]string{lastSetS, whenS}, saw.labelValues...))
}
for acc.lastSetInt < whenInt {
saw.samples.WithLabelValues(saw.labelValues...).Observe(acc.relX)
saw.waterMarks.WithLabelValues(saw.loLabelValues...).Observe(acc.loRelX)
saw.waterMarks.WithLabelValues(saw.hiLabelValues...).Observe(acc.hiRelX)
acc.lastSetInt++
acc.loRelX, acc.hiRelX = acc.relX, acc.relX
}
}

View File

@ -0,0 +1,52 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
// TimedObserver gets informed about the values assigned to a variable
// `X float64` over time, and reports on the ratio `X/X1`.
type TimedObserver interface {
// Add notes a change to the variable
Add(deltaX float64)
// Set notes a setting of the variable
Set(x float64)
// SetX1 changes the value to use for X1
SetX1(x1 float64)
}
// TimedObserverGenerator creates related observers that are
// differentiated by a series of label values
type TimedObserverGenerator interface {
Generate(x, x1 float64, labelValues []string) TimedObserver
}
// TimedObserverPair is a corresponding pair of observers, one for the
// number of requests waiting in queue(s) and one for the number of
// requests being executed
type TimedObserverPair struct {
// RequestsWaiting is given observations of the number of currently queued requests
RequestsWaiting TimedObserver
// RequestsExecuting is given observations of the number of requests currently executing
RequestsExecuting TimedObserver
}
// TimedObserverPairGenerator generates pairs
type TimedObserverPairGenerator interface {
Generate(waiting1, executing1 float64, labelValues []string) TimedObserverPair
}