diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go index fc9f515f5fd..8d042dc51cc 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go @@ -122,7 +122,7 @@ var ( Help: "Number of requests dropped with 'Try again later' response", StabilityLevel: compbasemetrics.ALPHA, }, - []string{"requestKind"}, + []string{"request_kind"}, ) // TLSHandshakeErrors is a number of requests dropped with 'TLS handshake error from' error TLSHandshakeErrors = compbasemetrics.NewCounter( @@ -166,7 +166,15 @@ var ( Help: "Maximal number of currently used inflight request limit of this apiserver per request kind in last second.", StabilityLevel: compbasemetrics.ALPHA, }, - []string{"requestKind"}, + []string{"request_kind"}, + ) + currentInqueueRequests = compbasemetrics.NewGaugeVec( + &compbasemetrics.GaugeOpts{ + Name: "apiserver_current_inqueue_requests", + Help: "Maximal number of queued requests in this apiserver per request kind in last second.", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{"request_kind"}, ) requestTerminationsTotal = compbasemetrics.NewCounterVec( @@ -191,6 +199,7 @@ var ( WatchEvents, WatchEventsSizes, currentInflightRequests, + currentInqueueRequests, requestTerminationsTotal, } @@ -231,6 +240,11 @@ const ( ReadOnlyKind = "readOnly" // MutatingKind is a string identifying mutating request kind MutatingKind = "mutating" + + // WaitingPhase is the phase value for a request waiting in a queue + WaitingPhase = "waiting" + // ExecutingPhase is the phase value for an executing request + ExecutingPhase = "executing" ) const ( @@ -261,9 +275,19 @@ func Reset() { } } -func UpdateInflightRequestMetrics(nonmutating, mutating int) { - currentInflightRequests.WithLabelValues(ReadOnlyKind).Set(float64(nonmutating)) - currentInflightRequests.WithLabelValues(MutatingKind).Set(float64(mutating)) +// UpdateInflightRequestMetrics reports concurrency metrics classified by +// mutating vs Readonly. +func UpdateInflightRequestMetrics(phase string, nonmutating, mutating int) { + for _, kc := range []struct { + kind string + count int + }{{ReadOnlyKind, nonmutating}, {MutatingKind, mutating}} { + if phase == ExecutingPhase { + currentInflightRequests.WithLabelValues(kc.kind).Set(float64(kc.count)) + } else { + currentInqueueRequests.WithLabelValues(kc.kind).Set(float64(kc.count)) + } + } } // RecordRequestTermination records that the request was terminated early as part of a resource diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD b/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD index dc317e91e02..a13e51bf319 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD @@ -62,6 +62,7 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/httplog:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library", ], diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go index e06622002f0..946ab4e605d 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go @@ -27,6 +27,7 @@ import ( "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints/metrics" apirequest "k8s.io/apiserver/pkg/endpoints/request" + fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics" "k8s.io/klog/v2" ) @@ -50,13 +51,17 @@ func handleError(w http.ResponseWriter, r *http.Request, err error) { klog.Errorf(err.Error()) } -// requestWatermark is used to trak maximal usage of inflight requests. +// requestWatermark is used to track maximal numbers of requests in a particular phase of handling type requestWatermark struct { + phase string + readOnlyObserver, mutatingObserver fcmetrics.TimedObserver lock sync.Mutex readOnlyWatermark, mutatingWatermark int } func (w *requestWatermark) recordMutating(mutatingVal int) { + w.mutatingObserver.Set(float64(mutatingVal)) + w.lock.Lock() defer w.lock.Unlock() @@ -66,6 +71,8 @@ func (w *requestWatermark) recordMutating(mutatingVal int) { } func (w *requestWatermark) recordReadOnly(readOnlyVal int) { + w.readOnlyObserver.Set(float64(readOnlyVal)) + w.lock.Lock() defer w.lock.Unlock() @@ -74,9 +81,14 @@ func (w *requestWatermark) recordReadOnly(readOnlyVal int) { } } -var watermark = &requestWatermark{} +// watermark tracks requests being executed (not waiting in a queue) +var watermark = &requestWatermark{ + phase: metrics.ExecutingPhase, + readOnlyObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{metrics.ReadOnlyKind}).RequestsExecuting, + mutatingObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{metrics.MutatingKind}).RequestsExecuting, +} -func startRecordingUsage() { +func startRecordingUsage(watermark *requestWatermark) { go func() { wait.Forever(func() { watermark.lock.Lock() @@ -86,7 +98,7 @@ func startRecordingUsage() { watermark.mutatingWatermark = 0 watermark.lock.Unlock() - metrics.UpdateInflightRequestMetrics(readOnlyWatermark, mutatingWatermark) + metrics.UpdateInflightRequestMetrics(watermark.phase, readOnlyWatermark, mutatingWatermark) }, inflightUsageMetricUpdatePeriod) }() } @@ -100,7 +112,7 @@ func WithMaxInFlightLimit( mutatingLimit int, longRunningRequestCheck apirequest.LongRunningRequestCheck, ) http.Handler { - startOnce.Do(startRecordingUsage) + startOnce.Do(func() { startRecordingUsage(watermark) }) if nonMutatingLimit == 0 && mutatingLimit == 0 { return handler } @@ -108,9 +120,11 @@ func WithMaxInFlightLimit( var mutatingChan chan bool if nonMutatingLimit != 0 { nonMutatingChan = make(chan bool, nonMutatingLimit) + watermark.readOnlyObserver.SetX1(float64(nonMutatingLimit)) } if mutatingLimit != 0 { mutatingChan = make(chan bool, mutatingLimit) + watermark.mutatingObserver.SetX1(float64(mutatingLimit)) } return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -141,21 +155,22 @@ func WithMaxInFlightLimit( select { case c <- true: - var mutatingLen, readOnlyLen int + // We note the concurrency level both while the + // request is being served and after it is done being + // served, because both states contribute to the + // sampled stats on concurrency. if isMutatingRequest { - mutatingLen = len(mutatingChan) + watermark.recordMutating(len(c)) } else { - readOnlyLen = len(nonMutatingChan) + watermark.recordReadOnly(len(c)) } - defer func() { <-c if isMutatingRequest { - watermark.recordMutating(mutatingLen) + watermark.recordMutating(len(c)) } else { - watermark.recordReadOnly(readOnlyLen) + watermark.recordReadOnly(len(c)) } - }() handler.ServeHTTP(w, r) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go index e3459161e64..224f371374a 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go @@ -24,8 +24,10 @@ import ( fcv1a1 "k8s.io/api/flowcontrol/v1alpha1" apitypes "k8s.io/apimachinery/pkg/types" + epmetrics "k8s.io/apiserver/pkg/endpoints/metrics" apirequest "k8s.io/apiserver/pkg/endpoints/request" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" + fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics" "k8s.io/klog/v2" ) @@ -53,7 +55,15 @@ func GetClassification(ctx context.Context) *PriorityAndFairnessClassification { return ctx.Value(priorityAndFairnessKey).(*PriorityAndFairnessClassification) } -var atomicMutatingLen, atomicNonMutatingLen int32 +// waitingMark tracks requests waiting rather than being executed +var waitingMark = &requestWatermark{ + phase: epmetrics.WaitingPhase, + readOnlyObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{epmetrics.ReadOnlyKind}).RequestsWaiting, + mutatingObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{epmetrics.MutatingKind}).RequestsWaiting, +} + +var atomicMutatingExecuting, atomicReadOnlyExecuting int32 +var atomicMutatingWaiting, atomicReadOnlyWaiting int32 // WithPriorityAndFairness limits the number of in-flight // requests in a fine-grained way. @@ -66,7 +76,10 @@ func WithPriorityAndFairness( klog.Warningf("priority and fairness support not found, skipping") return handler } - startOnce.Do(startRecordingUsage) + startOnce.Do(func() { + startRecordingUsage(watermark) + startRecordingUsage(waitingMark) + }) return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() requestInfo, ok := apirequest.RequestInfoFrom(ctx) @@ -98,22 +111,23 @@ func WithPriorityAndFairness( var served bool isMutatingRequest := !nonMutatingRequestVerbs.Has(requestInfo.Verb) - execute := func() { - var mutatingLen, readOnlyLen int + noteExecutingDelta := func(delta int32) { if isMutatingRequest { - mutatingLen = int(atomic.AddInt32(&atomicMutatingLen, 1)) + watermark.recordMutating(int(atomic.AddInt32(&atomicMutatingExecuting, delta))) } else { - readOnlyLen = int(atomic.AddInt32(&atomicNonMutatingLen, 1)) + watermark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyExecuting, delta))) } - defer func() { - if isMutatingRequest { - atomic.AddInt32(&atomicMutatingLen, -11) - watermark.recordMutating(mutatingLen) - } else { - atomic.AddInt32(&atomicNonMutatingLen, -1) - watermark.recordReadOnly(readOnlyLen) - } - }() + } + noteWaitingDelta := func(delta int32) { + if isMutatingRequest { + waitingMark.recordMutating(int(atomic.AddInt32(&atomicMutatingWaiting, delta))) + } else { + waitingMark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyWaiting, delta))) + } + } + execute := func() { + noteExecutingDelta(1) + defer noteExecutingDelta(-1) served = true innerCtx := context.WithValue(ctx, priorityAndFairnessKey, classification) innerReq := r.Clone(innerCtx) @@ -122,10 +136,15 @@ func WithPriorityAndFairness( handler.ServeHTTP(w, innerReq) } digest := utilflowcontrol.RequestDigest{requestInfo, user} - fcIfc.Handle(ctx, digest, note, execute) + fcIfc.Handle(ctx, digest, note, func(inQueue bool) { + if inQueue { + noteWaitingDelta(1) + } else { + noteWaitingDelta(-1) + } + }, execute) if !served { tooManyRequests(r, w) - return } }) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/BUILD b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/BUILD index 32a9d47dbbe..12a9c16e91c 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/BUILD @@ -85,6 +85,7 @@ go_test( "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/format:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/flowcontrol/v1alpha1:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index 577ae2eb1f6..91f35702343 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -84,7 +84,8 @@ type RequestDigest struct { // this type and cfgMeal follow the convention that the suffix // "Locked" means that the caller must hold the configController lock. type configController struct { - queueSetFactory fq.QueueSetFactory + queueSetFactory fq.QueueSetFactory + obsPairGenerator metrics.TimedObserverPairGenerator // configQueue holds `(interface{})(0)` when the configuration // objects need to be reprocessed. @@ -144,6 +145,9 @@ type priorityLevelState struct { // number of goroutines between Controller::Match and calling the // returned StartFunction numPending int + + // Observers tracking number waiting, executing + obsPair metrics.TimedObserverPair } // NewTestableController is extra flexible to facilitate testing @@ -152,104 +156,106 @@ func newTestableController( flowcontrolClient fcclientv1a1.FlowcontrolV1alpha1Interface, serverConcurrencyLimit int, requestWaitLimit time.Duration, + obsPairGenerator metrics.TimedObserverPairGenerator, queueSetFactory fq.QueueSetFactory, ) *configController { - cfgCtl := &configController{ + cfgCtlr := &configController{ queueSetFactory: queueSetFactory, + obsPairGenerator: obsPairGenerator, serverConcurrencyLimit: serverConcurrencyLimit, requestWaitLimit: requestWaitLimit, flowcontrolClient: flowcontrolClient, priorityLevelStates: make(map[string]*priorityLevelState), } klog.V(2).Infof("NewTestableController with serverConcurrencyLimit=%d, requestWaitLimit=%s", serverConcurrencyLimit, requestWaitLimit) - cfgCtl.initializeConfigController(informerFactory) + cfgCtlr.initializeConfigController(informerFactory) // ensure the data structure reflects the mandatory config - cfgCtl.lockAndDigestConfigObjects(nil, nil) - return cfgCtl + cfgCtlr.lockAndDigestConfigObjects(nil, nil) + return cfgCtlr } // initializeConfigController sets up the controller that processes // config API objects. -func (cfgCtl *configController) initializeConfigController(informerFactory kubeinformers.SharedInformerFactory) { - cfgCtl.configQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 8*time.Hour), "priority_and_fairness_config_queue") +func (cfgCtlr *configController) initializeConfigController(informerFactory kubeinformers.SharedInformerFactory) { + cfgCtlr.configQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 8*time.Hour), "priority_and_fairness_config_queue") fci := informerFactory.Flowcontrol().V1alpha1() pli := fci.PriorityLevelConfigurations() fsi := fci.FlowSchemas() - cfgCtl.plLister = pli.Lister() - cfgCtl.plInformerSynced = pli.Informer().HasSynced - cfgCtl.fsLister = fsi.Lister() - cfgCtl.fsInformerSynced = fsi.Informer().HasSynced + cfgCtlr.plLister = pli.Lister() + cfgCtlr.plInformerSynced = pli.Informer().HasSynced + cfgCtlr.fsLister = fsi.Lister() + cfgCtlr.fsInformerSynced = fsi.Informer().HasSynced pli.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { pl := obj.(*fctypesv1a1.PriorityLevelConfiguration) klog.V(7).Infof("Triggered API priority and fairness config reloading due to creation of PLC %s", pl.Name) - cfgCtl.configQueue.Add(0) + cfgCtlr.configQueue.Add(0) }, UpdateFunc: func(oldObj, newObj interface{}) { newPL := newObj.(*fctypesv1a1.PriorityLevelConfiguration) oldPL := oldObj.(*fctypesv1a1.PriorityLevelConfiguration) if !apiequality.Semantic.DeepEqual(oldPL.Spec, newPL.Spec) { klog.V(7).Infof("Triggered API priority and fairness config reloading due to spec update of PLC %s", newPL.Name) - cfgCtl.configQueue.Add(0) + cfgCtlr.configQueue.Add(0) } }, DeleteFunc: func(obj interface{}) { name, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) klog.V(7).Infof("Triggered API priority and fairness config reloading due to deletion of PLC %s", name) - cfgCtl.configQueue.Add(0) + cfgCtlr.configQueue.Add(0) }}) fsi.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { fs := obj.(*fctypesv1a1.FlowSchema) klog.V(7).Infof("Triggered API priority and fairness config reloading due to creation of FS %s", fs.Name) - cfgCtl.configQueue.Add(0) + cfgCtlr.configQueue.Add(0) }, UpdateFunc: func(oldObj, newObj interface{}) { newFS := newObj.(*fctypesv1a1.FlowSchema) oldFS := oldObj.(*fctypesv1a1.FlowSchema) if !apiequality.Semantic.DeepEqual(oldFS.Spec, newFS.Spec) { klog.V(7).Infof("Triggered API priority and fairness config reloading due to spec update of FS %s", newFS.Name) - cfgCtl.configQueue.Add(0) + cfgCtlr.configQueue.Add(0) } }, DeleteFunc: func(obj interface{}) { name, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) klog.V(7).Infof("Triggered API priority and fairness config reloading due to deletion of FS %s", name) - cfgCtl.configQueue.Add(0) + cfgCtlr.configQueue.Add(0) }}) } -func (cfgCtl *configController) Run(stopCh <-chan struct{}) error { - defer cfgCtl.configQueue.ShutDown() +func (cfgCtlr *configController) Run(stopCh <-chan struct{}) error { + defer cfgCtlr.configQueue.ShutDown() klog.Info("Starting API Priority and Fairness config controller") - if ok := cache.WaitForCacheSync(stopCh, cfgCtl.plInformerSynced, cfgCtl.fsInformerSynced); !ok { + if ok := cache.WaitForCacheSync(stopCh, cfgCtlr.plInformerSynced, cfgCtlr.fsInformerSynced); !ok { return fmt.Errorf("Never achieved initial sync") } klog.Info("Running API Priority and Fairness config worker") - wait.Until(cfgCtl.runWorker, time.Second, stopCh) + wait.Until(cfgCtlr.runWorker, time.Second, stopCh) klog.Info("Shutting down API Priority and Fairness config worker") return nil } -func (cfgCtl *configController) runWorker() { - for cfgCtl.processNextWorkItem() { +func (cfgCtlr *configController) runWorker() { + for cfgCtlr.processNextWorkItem() { } } -func (cfgCtl *configController) processNextWorkItem() bool { - obj, shutdown := cfgCtl.configQueue.Get() +func (cfgCtlr *configController) processNextWorkItem() bool { + obj, shutdown := cfgCtlr.configQueue.Get() if shutdown { return false } func(obj interface{}) { - defer cfgCtl.configQueue.Done(obj) - if !cfgCtl.syncOne() { - cfgCtl.configQueue.AddRateLimited(obj) + defer cfgCtlr.configQueue.Done(obj) + if !cfgCtlr.syncOne() { + cfgCtlr.configQueue.AddRateLimited(obj) } else { - cfgCtl.configQueue.Forget(obj) + cfgCtlr.configQueue.Forget(obj) } }(obj) @@ -259,19 +265,19 @@ func (cfgCtl *configController) processNextWorkItem() bool { // syncOne attempts to sync all the API Priority and Fairness config // objects. It either succeeds and returns `true` or logs an error // and returns `false`. -func (cfgCtl *configController) syncOne() bool { +func (cfgCtlr *configController) syncOne() bool { all := labels.Everything() - newPLs, err := cfgCtl.plLister.List(all) + newPLs, err := cfgCtlr.plLister.List(all) if err != nil { klog.Errorf("Unable to list PriorityLevelConfiguration objects: %s", err.Error()) return false } - newFSs, err := cfgCtl.fsLister.List(all) + newFSs, err := cfgCtlr.fsLister.List(all) if err != nil { klog.Errorf("Unable to list FlowSchema objects: %s", err.Error()) return false } - err = cfgCtl.digestConfigObjects(newPLs, newFSs) + err = cfgCtlr.digestConfigObjects(newPLs, newFSs) if err == nil { return true } @@ -288,7 +294,7 @@ func (cfgCtl *configController) syncOne() bool { // FlowSchemas --- with the work dvided among the passes according to // those dependencies. type cfgMeal struct { - cfgCtl *configController + cfgCtlr *configController newPLStates map[string]*priorityLevelState @@ -315,9 +321,9 @@ type fsStatusUpdate struct { } // digestConfigObjects is given all the API objects that configure -// cfgCtl and writes its consequent new configState. -func (cfgCtl *configController) digestConfigObjects(newPLs []*fctypesv1a1.PriorityLevelConfiguration, newFSs []*fctypesv1a1.FlowSchema) error { - fsStatusUpdates := cfgCtl.lockAndDigestConfigObjects(newPLs, newFSs) +// cfgCtlr and writes its consequent new configState. +func (cfgCtlr *configController) digestConfigObjects(newPLs []*fctypesv1a1.PriorityLevelConfiguration, newFSs []*fctypesv1a1.FlowSchema) error { + fsStatusUpdates := cfgCtlr.lockAndDigestConfigObjects(newPLs, newFSs) var errs []error for _, fsu := range fsStatusUpdates { enc, err := json.Marshal(fsu.condition) @@ -326,7 +332,7 @@ func (cfgCtl *configController) digestConfigObjects(newPLs []*fctypesv1a1.Priori panic(fmt.Sprintf("Failed to json.Marshall(%#+v): %s", fsu.condition, err.Error())) } klog.V(4).Infof("Writing Condition %s to FlowSchema %s because its previous value was %s", string(enc), fsu.flowSchema.Name, fcfmt.Fmt(fsu.oldValue)) - _, err = cfgCtl.flowcontrolClient.FlowSchemas().Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, []byte(fmt.Sprintf(`{"status": {"conditions": [ %s ] } }`, string(enc))), metav1.PatchOptions{FieldManager: "api-priority-and-fairness-config-consumer-v1"}, "status") + _, err = cfgCtlr.flowcontrolClient.FlowSchemas().Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, []byte(fmt.Sprintf(`{"status": {"conditions": [ %s ] } }`, string(enc))), metav1.PatchOptions{FieldManager: "api-priority-and-fairness-config-consumer-v1"}, "status") if err != nil { errs = append(errs, errors.Wrap(err, fmt.Sprintf("failed to set a status.condition for FlowSchema %s", fsu.flowSchema.Name))) } @@ -337,11 +343,11 @@ func (cfgCtl *configController) digestConfigObjects(newPLs []*fctypesv1a1.Priori return apierrors.NewAggregate(errs) } -func (cfgCtl *configController) lockAndDigestConfigObjects(newPLs []*fctypesv1a1.PriorityLevelConfiguration, newFSs []*fctypesv1a1.FlowSchema) []fsStatusUpdate { - cfgCtl.lock.Lock() - defer cfgCtl.lock.Unlock() +func (cfgCtlr *configController) lockAndDigestConfigObjects(newPLs []*fctypesv1a1.PriorityLevelConfiguration, newFSs []*fctypesv1a1.FlowSchema) []fsStatusUpdate { + cfgCtlr.lock.Lock() + defer cfgCtlr.lock.Unlock() meal := cfgMeal{ - cfgCtl: cfgCtl, + cfgCtlr: cfgCtlr, newPLStates: make(map[string]*priorityLevelState), } @@ -351,16 +357,16 @@ func (cfgCtl *configController) lockAndDigestConfigObjects(newPLs []*fctypesv1a1 // Supply missing mandatory PriorityLevelConfiguration objects if !meal.haveExemptPL { - meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationExempt, cfgCtl.requestWaitLimit) + meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationExempt, cfgCtlr.requestWaitLimit) } if !meal.haveCatchAllPL { - meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationCatchAll, cfgCtl.requestWaitLimit) + meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationCatchAll, cfgCtlr.requestWaitLimit) } meal.finishQueueSetReconfigsLocked() // The new config has been constructed - cfgCtl.priorityLevelStates = meal.newPLStates + cfgCtlr.priorityLevelStates = meal.newPLStates klog.V(5).Infof("Switched to new API Priority and Fairness configuration") return meal.fsStatusUpdates } @@ -369,11 +375,11 @@ func (cfgCtl *configController) lockAndDigestConfigObjects(newPLs []*fctypesv1a1 // Pretend broken ones do not exist. func (meal *cfgMeal) digestNewPLsLocked(newPLs []*fctypesv1a1.PriorityLevelConfiguration) { for _, pl := range newPLs { - state := meal.cfgCtl.priorityLevelStates[pl.Name] + state := meal.cfgCtlr.priorityLevelStates[pl.Name] if state == nil { - state = &priorityLevelState{} + state = &priorityLevelState{obsPair: meal.cfgCtlr.obsPairGenerator.Generate(1, 1, []string{pl.Name})} } - qsCompleter, err := qscOfPL(meal.cfgCtl.queueSetFactory, state.queues, pl, meal.cfgCtl.requestWaitLimit) + qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.obsPair) if err != nil { klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err) continue @@ -439,7 +445,7 @@ func (meal *cfgMeal) digestFlowSchemasLocked(newFSs []*fctypesv1a1.FlowSchema) { fsSeq = append(fsSeq, fcboot.MandatoryFlowSchemaCatchAll) } - meal.cfgCtl.flowSchemas = fsSeq + meal.cfgCtlr.flowSchemas = fsSeq if klog.V(5).Enabled() { for _, fs := range fsSeq { klog.Infof("Using FlowSchema %s", fcfmt.Fmt(fs)) @@ -453,7 +459,7 @@ func (meal *cfgMeal) digestFlowSchemasLocked(newFSs []*fctypesv1a1.FlowSchema) { // queues, otherwise start the quiescing process if that has not // already been started. func (meal *cfgMeal) processOldPLsLocked() { - for plName, plState := range meal.cfgCtl.priorityLevelStates { + for plName, plState := range meal.cfgCtlr.priorityLevelStates { if meal.newPLStates[plName] != nil { // Still desired and already updated continue @@ -476,9 +482,9 @@ func (meal *cfgMeal) processOldPLsLocked() { } } var err error - plState.qsCompleter, err = qscOfPL(meal.cfgCtl.queueSetFactory, plState.queues, plState.pl, meal.cfgCtl.requestWaitLimit) + plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.obsPair) if err != nil { - // This can not happen because qscOfPL already approved this config + // This can not happen because queueSetCompleterForPL already approved this config panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec))) } if plState.pl.Spec.Limited != nil { @@ -509,7 +515,7 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() { // The use of math.Ceil here means that the results might sum // to a little more than serverConcurrencyLimit but the // difference will be negligible. - concurrencyLimit := int(math.Ceil(float64(meal.cfgCtl.serverConcurrencyLimit) * float64(plState.pl.Spec.Limited.AssuredConcurrencyShares) / meal.shareSum)) + concurrencyLimit := int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit) * float64(plState.pl.Spec.Limited.AssuredConcurrencyShares) / meal.shareSum)) metrics.UpdateSharedConcurrencyLimit(plName, concurrencyLimit) if plState.queues == nil { @@ -521,10 +527,11 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() { } } -// qscOfPL returns a pointer to an appropriate QueuingConfig or nil -// if no limiting is called for. Returns nil and an error if the given +// queueSetCompleterForPL returns an appropriate QueueSetCompleter for the +// given priority level configuration. Returns nil if that config +// does not call for limiting. Returns nil and an error if the given // object is malformed in a way that is a problem for this package. -func qscOfPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *fctypesv1a1.PriorityLevelConfiguration, requestWaitLimit time.Duration) (fq.QueueSetCompleter, error) { +func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *fctypesv1a1.PriorityLevelConfiguration, requestWaitLimit time.Duration, intPair metrics.TimedObserverPair) (fq.QueueSetCompleter, error) { if (pl.Spec.Type == fctypesv1a1.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) { return nil, errors.New("broken union structure at the top") } @@ -553,7 +560,7 @@ func qscOfPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *fctypesv1a1.Priorit if queues != nil { qsc, err = queues.BeginConfigChange(qcQS) } else { - qsc, err = qsf.BeginConstruction(qcQS) + qsc, err = qsf.BeginConstruction(qcQS, intPair) } if err != nil { err = errors.Wrap(err, fmt.Sprintf("priority level %q has QueuingConfiguration %#+v, which is invalid", pl.Name, qcAPI)) @@ -594,9 +601,11 @@ func (meal *cfgMeal) presyncFlowSchemaStatus(fs *fctypesv1a1.FlowSchema, isDangl } // imaginePL adds a priority level based on one of the mandatory ones +// that does not actually exist (right now) as a real API object. func (meal *cfgMeal) imaginePL(proto *fctypesv1a1.PriorityLevelConfiguration, requestWaitLimit time.Duration) { klog.V(3).Infof("No %s PriorityLevelConfiguration found, imagining one", proto.Name) - qsCompleter, err := qscOfPL(meal.cfgCtl.queueSetFactory, nil, proto, requestWaitLimit) + obsPair := meal.cfgCtlr.obsPairGenerator.Generate(1, 1, []string{proto.Name}) + qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, obsPair) if err != nil { // This can not happen because proto is one of the mandatory // objects and these are not erroneous @@ -605,6 +614,7 @@ func (meal *cfgMeal) imaginePL(proto *fctypesv1a1.PriorityLevelConfiguration, re meal.newPLStates[proto.Name] = &priorityLevelState{ pl: proto, qsCompleter: qsCompleter, + obsPair: obsPair, } if proto.Spec.Limited != nil { meal.shareSum += float64(proto.Spec.Limited.AssuredConcurrencyShares) @@ -624,14 +634,14 @@ func (immediateRequest) Finish(execute func()) bool { // The returned bool indicates whether the request is exempt from // limitation. The startWaitingTime is when the request started // waiting in its queue, or `Time{}` if this did not happen. -func (cfgCtl *configController) startRequest(ctx context.Context, rd RequestDigest) (fs *fctypesv1a1.FlowSchema, pl *fctypesv1a1.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) { +func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, queueNoteFn fq.QueueNoteFn) (fs *fctypesv1a1.FlowSchema, pl *fctypesv1a1.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) { klog.V(7).Infof("startRequest(%#+v)", rd) - cfgCtl.lock.Lock() - defer cfgCtl.lock.Unlock() - for _, fs := range cfgCtl.flowSchemas { + cfgCtlr.lock.Lock() + defer cfgCtlr.lock.Unlock() + for _, fs := range cfgCtlr.flowSchemas { if matchesFlowSchema(rd, fs) { plName := fs.Spec.PriorityLevelConfiguration.Name - plState := cfgCtl.priorityLevelStates[plName] + plState := cfgCtlr.priorityLevelStates[plName] if plState.pl.Spec.Type == fctypesv1a1.PriorityLevelEnablementExempt { klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, fs.Name, fs.Spec.DistinguisherMethod, plName) return fs, plState.pl, true, immediateRequest{}, time.Time{} @@ -649,9 +659,9 @@ func (cfgCtl *configController) startRequest(ctx context.Context, rd RequestDige } startWaitingTime = time.Now() klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, fs.Name, fs.Spec.DistinguisherMethod, plName, numQueues) - req, idle := plState.queues.StartRequest(ctx, hashValue, flowDistinguisher, fs.Name, rd.RequestInfo, rd.User) + req, idle := plState.queues.StartRequest(ctx, hashValue, flowDistinguisher, fs.Name, rd.RequestInfo, rd.User, queueNoteFn) if idle { - cfgCtl.maybeReapLocked(plName, plState) + cfgCtlr.maybeReapLocked(plName, plState) } return fs, plState.pl, false, req, startWaitingTime } @@ -660,7 +670,7 @@ func (cfgCtl *configController) startRequest(ctx context.Context, rd RequestDige // FlowSchema that matches everything. If somehow control reaches // here, panic with some relevant information. var catchAll *fctypesv1a1.FlowSchema - for _, fs := range cfgCtl.flowSchemas { + for _, fs := range cfgCtlr.flowSchemas { if fs.Name == fctypesv1a1.FlowSchemaNameCatchAll { catchAll = fs } @@ -669,10 +679,10 @@ func (cfgCtl *configController) startRequest(ctx context.Context, rd RequestDige } // Call this after getting a clue that the given priority level is undesired and idle -func (cfgCtl *configController) maybeReap(plName string) { - cfgCtl.lock.Lock() - defer cfgCtl.lock.Unlock() - plState := cfgCtl.priorityLevelStates[plName] +func (cfgCtlr *configController) maybeReap(plName string) { + cfgCtlr.lock.Lock() + defer cfgCtlr.lock.Unlock() + plState := cfgCtlr.priorityLevelStates[plName] if plState == nil { klog.V(7).Infof("plName=%s, plState==nil", plName) return @@ -685,17 +695,17 @@ func (cfgCtl *configController) maybeReap(plName string) { } } klog.V(3).Infof("Triggered API priority and fairness config reloading because priority level %s is undesired and idle", plName) - cfgCtl.configQueue.Add(0) + cfgCtlr.configQueue.Add(0) } // Call this if both (1) plState.queues is non-nil and reported being -// idle, and (2) cfgCtl's lock has not been released since then. -func (cfgCtl *configController) maybeReapLocked(plName string, plState *priorityLevelState) { +// idle, and (2) cfgCtlr's lock has not been released since then. +func (cfgCtlr *configController) maybeReapLocked(plName string, plState *priorityLevelState) { if !(plState.quiescing && plState.numPending == 0) { return } klog.V(3).Infof("Triggered API priority and fairness config reloading because priority level %s is undesired and idle", plName) - cfgCtl.configQueue.Add(0) + cfgCtlr.configQueue.Add(0) } // computeFlowDistinguisher extracts the flow distinguisher according to the given method diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller_debug.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller_debug.go index 4a467b6d9f2..04d4df7c73c 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller_debug.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller_debug.go @@ -34,20 +34,20 @@ const ( queryIncludeRequestDetails = "includeRequestDetails" ) -func (cfgCtl *configController) Install(c *mux.PathRecorderMux) { +func (cfgCtlr *configController) Install(c *mux.PathRecorderMux) { // TODO(yue9944882): handle "Accept" header properly // debugging dumps a CSV content for three levels of granularity // 1. row per priority-level - c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_priority_levels", cfgCtl.dumpPriorityLevels) + c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_priority_levels", cfgCtlr.dumpPriorityLevels) // 2. row per queue - c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_queues", cfgCtl.dumpQueues) + c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_queues", cfgCtlr.dumpQueues) // 3. row per request - c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_requests", cfgCtl.dumpRequests) + c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_requests", cfgCtlr.dumpRequests) } -func (cfgCtl *configController) dumpPriorityLevels(w http.ResponseWriter, r *http.Request) { - cfgCtl.lock.Lock() - defer cfgCtl.lock.Unlock() +func (cfgCtlr *configController) dumpPriorityLevels(w http.ResponseWriter, r *http.Request) { + cfgCtlr.lock.Lock() + defer cfgCtlr.lock.Unlock() tabWriter := tabwriter.NewWriter(w, 8, 0, 1, ' ', 0) columnHeaders := []string{ "PriorityLevelName", // 1 @@ -59,7 +59,7 @@ func (cfgCtl *configController) dumpPriorityLevels(w http.ResponseWriter, r *htt } tabPrint(tabWriter, rowForHeaders(columnHeaders)) endline(tabWriter) - for _, plState := range cfgCtl.priorityLevelStates { + for _, plState := range cfgCtlr.priorityLevelStates { if plState.queues == nil { tabPrint(tabWriter, row( plState.pl.Name, // 1 @@ -93,9 +93,9 @@ func (cfgCtl *configController) dumpPriorityLevels(w http.ResponseWriter, r *htt runtime.HandleError(tabWriter.Flush()) } -func (cfgCtl *configController) dumpQueues(w http.ResponseWriter, r *http.Request) { - cfgCtl.lock.Lock() - defer cfgCtl.lock.Unlock() +func (cfgCtlr *configController) dumpQueues(w http.ResponseWriter, r *http.Request) { + cfgCtlr.lock.Lock() + defer cfgCtlr.lock.Unlock() tabWriter := tabwriter.NewWriter(w, 8, 0, 1, ' ', 0) columnHeaders := []string{ "PriorityLevelName", // 1 @@ -106,7 +106,7 @@ func (cfgCtl *configController) dumpQueues(w http.ResponseWriter, r *http.Reques } tabPrint(tabWriter, rowForHeaders(columnHeaders)) endline(tabWriter) - for _, plState := range cfgCtl.priorityLevelStates { + for _, plState := range cfgCtlr.priorityLevelStates { if plState.queues == nil { tabPrint(tabWriter, row( plState.pl.Name, // 1 @@ -133,9 +133,9 @@ func (cfgCtl *configController) dumpQueues(w http.ResponseWriter, r *http.Reques runtime.HandleError(tabWriter.Flush()) } -func (cfgCtl *configController) dumpRequests(w http.ResponseWriter, r *http.Request) { - cfgCtl.lock.Lock() - defer cfgCtl.lock.Unlock() +func (cfgCtlr *configController) dumpRequests(w http.ResponseWriter, r *http.Request) { + cfgCtlr.lock.Lock() + defer cfgCtlr.lock.Unlock() includeRequestDetails := len(r.URL.Query().Get(queryIncludeRequestDetails)) > 0 @@ -161,7 +161,7 @@ func (cfgCtl *configController) dumpRequests(w http.ResponseWriter, r *http.Requ })) } endline(tabWriter) - for _, plState := range cfgCtl.priorityLevelStates { + for _, plState := range cfgCtlr.priorityLevelStates { if plState.queues == nil { tabPrint(tabWriter, row( plState.pl.Name, // 1 diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go index 0ee68f2340f..99c36005be2 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go @@ -45,6 +45,7 @@ type Interface interface { Handle(ctx context.Context, requestDigest RequestDigest, noteFn func(fs *fctypesv1a1.FlowSchema, pl *fctypesv1a1.PriorityLevelConfiguration), + queueNoteFn fq.QueueNoteFn, execFn func(), ) @@ -72,6 +73,7 @@ func New( flowcontrolClient, serverConcurrencyLimit, requestWaitLimit, + metrics.PriorityLevelConcurrencyObserverPairGenerator, fqs.NewQueueSetFactory(&clock.RealClock{}, grc), ) } @@ -82,15 +84,17 @@ func NewTestable( flowcontrolClient fcclientv1a1.FlowcontrolV1alpha1Interface, serverConcurrencyLimit int, requestWaitLimit time.Duration, + obsPairGenerator metrics.TimedObserverPairGenerator, queueSetFactory fq.QueueSetFactory, ) Interface { - return newTestableController(informerFactory, flowcontrolClient, serverConcurrencyLimit, requestWaitLimit, queueSetFactory) + return newTestableController(informerFactory, flowcontrolClient, serverConcurrencyLimit, requestWaitLimit, obsPairGenerator, queueSetFactory) } -func (cfgCtl *configController) Handle(ctx context.Context, requestDigest RequestDigest, +func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest RequestDigest, noteFn func(fs *fctypesv1a1.FlowSchema, pl *fctypesv1a1.PriorityLevelConfiguration), + queueNoteFn fq.QueueNoteFn, execFn func()) { - fs, pl, isExempt, req, startWaitingTime := cfgCtl.startRequest(ctx, requestDigest) + fs, pl, isExempt, req, startWaitingTime := cfgCtlr.startRequest(ctx, requestDigest, queueNoteFn) queued := startWaitingTime != time.Time{} noteFn(fs, pl) if req == nil { @@ -117,6 +121,6 @@ func (cfgCtl *configController) Handle(ctx context.Context, requestDigest Reques } klog.V(7).Infof("Handle(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, isExempt=%v, queued=%v, Finish() => idle=%v", requestDigest, fs.Name, fs.Spec.DistinguisherMethod, pl.Name, isExempt, queued, idle) if idle { - cfgCtl.maybeReap(pl.Name) + cfgCtlr.maybeReap(pl.Name) } } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go index faa8c74080f..1d762d4ba75 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go @@ -31,6 +31,7 @@ import ( "k8s.io/apiserver/pkg/util/flowcontrol/debug" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format" + "k8s.io/apiserver/pkg/util/flowcontrol/metrics" "k8s.io/client-go/informers" clientsetfake "k8s.io/client-go/kubernetes/fake" fcclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1alpha1" @@ -50,16 +51,16 @@ var mandPLs = func() map[string]*fcv1a1.PriorityLevelConfiguration { return ans }() -type ctlTestState struct { +type ctlrTestState struct { t *testing.T - cfgCtl *configController + cfgCtlr *configController fcIfc fcclient.FlowcontrolV1alpha1Interface existingPLs map[string]*fcv1a1.PriorityLevelConfiguration existingFSs map[string]*fcv1a1.FlowSchema heldRequestsMap map[string][]heldRequest requestWG sync.WaitGroup lock sync.Mutex - queues map[string]*ctlTestQueueSet + queues map[string]*ctlrTestQueueSet } type heldRequest struct { @@ -67,45 +68,45 @@ type heldRequest struct { finishCh chan struct{} } -var _ fq.QueueSetFactory = (*ctlTestState)(nil) +var _ fq.QueueSetFactory = (*ctlrTestState)(nil) -type ctlTestQueueSetCompleter struct { - cts *ctlTestState - cqs *ctlTestQueueSet +type ctlrTestQueueSetCompleter struct { + cts *ctlrTestState + cqs *ctlrTestQueueSet qc fq.QueuingConfig } -type ctlTestQueueSet struct { - cts *ctlTestState +type ctlrTestQueueSet struct { + cts *ctlrTestState qc fq.QueuingConfig dc fq.DispatchingConfig countActive int } -type ctlTestRequest struct { - cqs *ctlTestQueueSet +type ctlrTestRequest struct { + cqs *ctlrTestQueueSet qsName string descr1, descr2 interface{} } -func (cts *ctlTestState) BeginConstruction(qc fq.QueuingConfig) (fq.QueueSetCompleter, error) { - return ctlTestQueueSetCompleter{cts, nil, qc}, nil +func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, ip metrics.TimedObserverPair) (fq.QueueSetCompleter, error) { + return ctlrTestQueueSetCompleter{cts, nil, qc}, nil } -func (cqs *ctlTestQueueSet) BeginConfigChange(qc fq.QueuingConfig) (fq.QueueSetCompleter, error) { - return ctlTestQueueSetCompleter{cqs.cts, cqs, qc}, nil +func (cqs *ctlrTestQueueSet) BeginConfigChange(qc fq.QueuingConfig) (fq.QueueSetCompleter, error) { + return ctlrTestQueueSetCompleter{cqs.cts, cqs, qc}, nil } -func (cqs *ctlTestQueueSet) Dump(bool) debug.QueueSetDump { +func (cqs *ctlrTestQueueSet) Dump(bool) debug.QueueSetDump { return debug.QueueSetDump{} } -func (cqc ctlTestQueueSetCompleter) Complete(dc fq.DispatchingConfig) fq.QueueSet { +func (cqc ctlrTestQueueSetCompleter) Complete(dc fq.DispatchingConfig) fq.QueueSet { cqc.cts.lock.Lock() defer cqc.cts.lock.Unlock() qs := cqc.cqs if qs == nil { - qs = &ctlTestQueueSet{cts: cqc.cts, qc: cqc.qc, dc: dc} + qs = &ctlrTestQueueSet{cts: cqc.cts, qc: cqc.qc, dc: dc} cqc.cts.queues[cqc.qc.Name] = qs } else { qs.qc, qs.dc = cqc.qc, dc @@ -113,22 +114,22 @@ func (cqc ctlTestQueueSetCompleter) Complete(dc fq.DispatchingConfig) fq.QueueSe return qs } -func (cqs *ctlTestQueueSet) IsIdle() bool { +func (cqs *ctlrTestQueueSet) IsIdle() bool { cqs.cts.lock.Lock() defer cqs.cts.lock.Unlock() klog.V(7).Infof("For %p QS %s, countActive==%d", cqs, cqs.qc.Name, cqs.countActive) return cqs.countActive == 0 } -func (cqs *ctlTestQueueSet) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) (req fq.Request, idle bool) { +func (cqs *ctlrTestQueueSet) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (req fq.Request, idle bool) { cqs.cts.lock.Lock() defer cqs.cts.lock.Unlock() cqs.countActive++ cqs.cts.t.Logf("Queued %q %#+v %#+v for %p QS=%s, countActive:=%d", fsName, descr1, descr2, cqs, cqs.qc.Name, cqs.countActive) - return &ctlTestRequest{cqs, cqs.qc.Name, descr1, descr2}, false + return &ctlrTestRequest{cqs, cqs.qc.Name, descr1, descr2}, false } -func (ctr *ctlTestRequest) Finish(execute func()) bool { +func (ctr *ctlrTestRequest) Finish(execute func()) bool { execute() ctr.cqs.cts.lock.Lock() defer ctr.cqs.cts.lock.Unlock() @@ -137,13 +138,13 @@ func (ctr *ctlTestRequest) Finish(execute func()) bool { return ctr.cqs.countActive == 0 } -func (cts *ctlTestState) getQueueSetNames() sets.String { +func (cts *ctlrTestState) getQueueSetNames() sets.String { cts.lock.Lock() defer cts.lock.Unlock() return sets.StringKeySet(cts.queues) } -func (cts *ctlTestState) getNonIdleQueueSetNames() sets.String { +func (cts *ctlrTestState) getNonIdleQueueSetNames() sets.String { cts.lock.Lock() defer cts.lock.Unlock() ans := sets.NewString() @@ -155,14 +156,14 @@ func (cts *ctlTestState) getNonIdleQueueSetNames() sets.String { return ans } -func (cts *ctlTestState) hasNonIdleQueueSet(name string) bool { +func (cts *ctlrTestState) hasNonIdleQueueSet(name string) bool { cts.lock.Lock() defer cts.lock.Unlock() qs := cts.queues[name] return qs != nil && qs.countActive > 0 } -func (cts *ctlTestState) addHeldRequest(plName string, rd RequestDigest, finishCh chan struct{}) { +func (cts *ctlrTestState) addHeldRequest(plName string, rd RequestDigest, finishCh chan struct{}) { cts.lock.Lock() defer cts.lock.Unlock() hrs := cts.heldRequestsMap[plName] @@ -171,7 +172,7 @@ func (cts *ctlTestState) addHeldRequest(plName string, rd RequestDigest, finishC cts.t.Logf("Holding %#+v for %s, count:=%d", rd, plName, len(hrs)) } -func (cts *ctlTestState) popHeldRequest() (plName string, hr *heldRequest, nCount int) { +func (cts *ctlrTestState) popHeldRequest() (plName string, hr *heldRequest, nCount int) { cts.lock.Lock() defer cts.lock.Unlock() var hrs []heldRequest @@ -219,21 +220,22 @@ func TestConfigConsumer(t *testing.T) { clientset := clientsetfake.NewSimpleClientset() informerFactory := informers.NewSharedInformerFactory(clientset, 0) flowcontrolClient := clientset.FlowcontrolV1alpha1() - cts := &ctlTestState{t: t, + cts := &ctlrTestState{t: t, fcIfc: flowcontrolClient, existingFSs: map[string]*fcv1a1.FlowSchema{}, existingPLs: map[string]*fcv1a1.PriorityLevelConfiguration{}, heldRequestsMap: map[string][]heldRequest{}, - queues: map[string]*ctlTestQueueSet{}, + queues: map[string]*ctlrTestQueueSet{}, } - ctl := newTestableController( + ctlr := newTestableController( informerFactory, flowcontrolClient, 100, // server concurrency limit time.Minute, // request wait limit + metrics.PriorityLevelConcurrencyObserverPairGenerator, cts, ) - cts.cfgCtl = ctl + cts.cfgCtlr = ctlr persistingPLNames := sets.NewString() trialStep := fmt.Sprintf("trial%d-0", i) _, _, desiredPLNames, newBadPLNames := genPLs(rng, trialStep, persistingPLNames, 0) @@ -290,7 +292,7 @@ func TestConfigConsumer(t *testing.T) { for _, newFS := range newFSs { t.Logf("For %s, digesting newFS=%s", trialStep, fcfmt.Fmt(newFS)) } - _ = ctl.lockAndDigestConfigObjects(newPLs, newFSs) + _ = ctlr.lockAndDigestConfigObjects(newPLs, newFSs) } for plName, hr, nCount := cts.popHeldRequest(); hr != nil; plName, hr, nCount = cts.popHeldRequest() { desired := desiredPLNames.Has(plName) || mandPLs[plName] != nil @@ -302,9 +304,9 @@ func TestConfigConsumer(t *testing.T) { } } -func checkNewFS(cts *ctlTestState, rng *rand.Rand, trialName string, ftr *fsTestingRecord, catchAlls map[bool]*fcv1a1.FlowSchema) { +func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTestingRecord, catchAlls map[bool]*fcv1a1.FlowSchema) { t := cts.t - ctl := cts.cfgCtl + ctlr := cts.cfgCtlr fs := ftr.fs expectedPLName := fs.Spec.PriorityLevelConfiguration.Name ctx := context.Background() @@ -320,17 +322,18 @@ func checkNewFS(cts *ctlTestState, rng *rand.Rand, trialName string, ftr *fsTest startWG.Add(1) go func(matches, isResource bool, rdu RequestDigest) { expectedMatch := matches && ftr.wellFormed && (fsPrecedes(fs, catchAlls[isResource]) || fs.Name == catchAlls[isResource].Name) - ctl.Handle(ctx, rdu, func(matchFS *fcv1a1.FlowSchema, matchPL *fcv1a1.PriorityLevelConfiguration) { + ctlr.Handle(ctx, rdu, func(matchFS *fcv1a1.FlowSchema, matchPL *fcv1a1.PriorityLevelConfiguration) { matchIsExempt := matchPL.Spec.Type == fcv1a1.PriorityLevelEnablementExempt t.Logf("Considering FlowSchema %s, expectedMatch=%v, isResource=%v: Handle(%#+v) => note(fs=%s, pl=%s, isExempt=%v)", fs.Name, expectedMatch, isResource, rdu, matchFS.Name, matchPL.Name, matchIsExempt) - if e, a := expectedMatch, matchFS.Name == fs.Name; e != a { - t.Errorf("Fail at %s/%s: rd=%#+v, expectedMatch=%v, actualMatch=%v, matchFSName=%q, catchAlls=%#+v", trialName, fs.Name, rdu, e, a, matchFS.Name, catchAlls) + if a := matchFS.Name == fs.Name; expectedMatch != a { + t.Errorf("Fail at %s/%s: rd=%#+v, expectedMatch=%v, actualMatch=%v, matchFSName=%q, catchAlls=%#+v", trialName, fs.Name, rdu, expectedMatch, a, matchFS.Name, catchAlls) } if matchFS.Name == fs.Name { - if e, a := fs.Spec.PriorityLevelConfiguration.Name, matchPL.Name; e != a { - t.Errorf("Fail at %s/%s: e=%v, a=%v", trialName, fs.Name, e, a) + if fs.Spec.PriorityLevelConfiguration.Name != matchPL.Name { + t.Errorf("Fail at %s/%s: expected=%v, actual=%v", trialName, fs.Name, fs.Spec.PriorityLevelConfiguration.Name, matchPL.Name) } } + }, func(inQueue bool) { }, func() { startWG.Done() _ = <-finishCh diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/BUILD b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/BUILD index ee1de9ce75f..c8403a7ade3 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/BUILD @@ -1,12 +1,19 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", - srcs = ["interface.go"], + srcs = [ + "integrator.go", + "interface.go", + ], importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing", importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing", visibility = ["//visibility:public"], - deps = ["//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug:go_default_library"], + deps = [ + "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library", + ], ) filegroup( @@ -27,3 +34,10 @@ filegroup( tags = ["automanaged"], visibility = ["//visibility:public"], ) + +go_test( + name = "go_default_test", + srcs = ["integrator_test.go"], + embed = [":go_default_library"], + deps = ["//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library"], +) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator.go new file mode 100644 index 00000000000..dcba6f2c2e5 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator.go @@ -0,0 +1,180 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fairqueuing + +import ( + "math" + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apiserver/pkg/util/flowcontrol/metrics" +) + +// Integrator computes the moments of some variable X over time as +// read from a particular clock. The integrals start when the +// Integrator is created, and ends at the latest operation on the +// Integrator. As a `metrics.TimedObserver` this fixes X1=1 and +// ignores attempts to change X1. +type Integrator interface { + metrics.TimedObserver + + GetResults() IntegratorResults + + // Return the results of integrating to now, and reset integration to start now + Reset() IntegratorResults +} + +// IntegratorResults holds statistical abstracts of the integration +type IntegratorResults struct { + Duration float64 //seconds + Average float64 //time-weighted + Deviation float64 //standard deviation: sqrt(avg((value-avg)^2)) + Min, Max float64 +} + +// Equal tests for semantic equality. +// This considers all NaN values to be equal to each other. +func (x *IntegratorResults) Equal(y *IntegratorResults) bool { + return x == y || x != nil && y != nil && x.Duration == y.Duration && x.Min == y.Min && x.Max == y.Max && (x.Average == y.Average || math.IsNaN(x.Average) && math.IsNaN(y.Average)) && (x.Deviation == y.Deviation || math.IsNaN(x.Deviation) && math.IsNaN(y.Deviation)) +} + +type integrator struct { + clock clock.PassiveClock + sync.Mutex + lastTime time.Time + x float64 + moments Moments + min, max float64 +} + +// NewIntegrator makes one that uses the given clock +func NewIntegrator(clock clock.PassiveClock) Integrator { + return &integrator{ + clock: clock, + lastTime: clock.Now(), + } +} + +func (igr *integrator) SetX1(x1 float64) { +} + +func (igr *integrator) Set(x float64) { + igr.Lock() + igr.setLocked(x) + igr.Unlock() +} + +func (igr *integrator) setLocked(x float64) { + igr.updateLocked() + igr.x = x + if x < igr.min { + igr.min = x + } + if x > igr.max { + igr.max = x + } +} + +func (igr *integrator) Add(deltaX float64) { + igr.Lock() + igr.setLocked(igr.x + deltaX) + igr.Unlock() +} + +func (igr *integrator) updateLocked() { + now := igr.clock.Now() + dt := now.Sub(igr.lastTime).Seconds() + igr.lastTime = now + igr.moments = igr.moments.Add(ConstantMoments(dt, igr.x)) +} + +func (igr *integrator) GetResults() IntegratorResults { + igr.Lock() + defer igr.Unlock() + return igr.getResultsLocked() +} + +func (igr *integrator) Reset() IntegratorResults { + igr.Lock() + defer igr.Unlock() + results := igr.getResultsLocked() + igr.moments = Moments{} + igr.min = igr.x + igr.max = igr.x + return results +} + +func (igr *integrator) getResultsLocked() (results IntegratorResults) { + igr.updateLocked() + results.Min, results.Max = igr.min, igr.max + results.Duration = igr.moments.ElapsedSeconds + results.Average, results.Deviation = igr.moments.AvgAndStdDev() + return +} + +// Moments are the integrals of the 0, 1, and 2 powers of some +// variable X over some range of time. +type Moments struct { + ElapsedSeconds float64 // integral of dt + IntegralX float64 // integral of x dt + IntegralXX float64 // integral of x*x dt +} + +// ConstantMoments is for a constant X +func ConstantMoments(dt, x float64) Moments { + return Moments{ + ElapsedSeconds: dt, + IntegralX: x * dt, + IntegralXX: x * x * dt, + } +} + +// Add combines over two ranges of time +func (igr Moments) Add(ogr Moments) Moments { + return Moments{ + ElapsedSeconds: igr.ElapsedSeconds + ogr.ElapsedSeconds, + IntegralX: igr.IntegralX + ogr.IntegralX, + IntegralXX: igr.IntegralXX + ogr.IntegralXX, + } +} + +// Sub finds the difference between a range of time and a subrange +func (igr Moments) Sub(ogr Moments) Moments { + return Moments{ + ElapsedSeconds: igr.ElapsedSeconds - ogr.ElapsedSeconds, + IntegralX: igr.IntegralX - ogr.IntegralX, + IntegralXX: igr.IntegralXX - ogr.IntegralXX, + } +} + +// AvgAndStdDev returns the average and standard devation +func (igr Moments) AvgAndStdDev() (float64, float64) { + if igr.ElapsedSeconds <= 0 { + return math.NaN(), math.NaN() + } + avg := igr.IntegralX / igr.ElapsedSeconds + // standard deviation is sqrt( average( (x - xbar)^2 ) ) + // = sqrt( Integral( x^2 + xbar^2 -2*x*xbar dt ) / Duration ) + // = sqrt( ( Integral( x^2 dt ) + Duration * xbar^2 - 2*xbar*Integral(x dt) ) / Duration) + // = sqrt( Integral(x^2 dt)/Duration - xbar^2 ) + variance := igr.IntegralXX/igr.ElapsedSeconds - avg*avg + if variance >= 0 { + return avg, math.Sqrt(variance) + } + return avg, math.NaN() +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator_test.go new file mode 100644 index 00000000000..3873ce574c1 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/integrator_test.go @@ -0,0 +1,53 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fairqueuing + +import ( + "math" + "testing" + "time" + + "k8s.io/apimachinery/pkg/util/clock" +) + +func TestIntegrator(t *testing.T) { + now := time.Now() + clk := clock.NewFakeClock(now) + igr := NewIntegrator(clk) + igr.Add(3) + clk.Step(time.Second) + results := igr.GetResults() + rToo := igr.Reset() + if e := (IntegratorResults{Duration: time.Second.Seconds(), Average: 3, Deviation: 0, Min: 0, Max: 3}); !e.Equal(&results) { + t.Errorf("expected %#+v, got %#+v", e, results) + } + if !results.Equal(&rToo) { + t.Errorf("expected %#+v, got %#+v", results, rToo) + } + igr.Set(2) + results = igr.GetResults() + if e := (IntegratorResults{Duration: 0, Average: math.NaN(), Deviation: math.NaN(), Min: 2, Max: 3}); !e.Equal(&results) { + t.Errorf("expected %#+v, got %#+v", e, results) + } + clk.Step(time.Millisecond) + igr.Add(-1) + clk.Step(time.Millisecond) + results = igr.GetResults() + if e := (IntegratorResults{Duration: 2 * time.Millisecond.Seconds(), Average: 1.5, Deviation: 0.5, Min: 1, Max: 3}); !e.Equal(&results) { + t.Errorf("expected %#+v, got %#+v", e, results) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go index 3ac03f78758..e0b628ecd0f 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go @@ -21,6 +21,7 @@ import ( "time" "k8s.io/apiserver/pkg/util/flowcontrol/debug" + "k8s.io/apiserver/pkg/util/flowcontrol/metrics" ) // QueueSetFactory is used to create QueueSet objects. Creation, like @@ -30,7 +31,7 @@ import ( // before committing to a concurrency allotment for the second. type QueueSetFactory interface { // BeginConstruction does the first phase of creating a QueueSet - BeginConstruction(QueuingConfig) (QueueSetCompleter, error) + BeginConstruction(QueuingConfig, metrics.TimedObserverPair) (QueueSetCompleter, error) } // QueueSetCompleter finishes the two-step process of creating or @@ -79,7 +80,7 @@ type QueueSet interface { // was idle at the moment of the return. Otherwise idle==false // and the client must call the Finish method of the Request // exactly once. - StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) (req Request, idle bool) + StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn QueueNoteFn) (req Request, idle bool) // Dump saves and returns the instant internal state of the queue-set. // Note that dumping process will stop the queue-set from proceeding @@ -88,6 +89,9 @@ type QueueSet interface { Dump(includeRequestDetails bool) debug.QueueSetDump } +// QueueNoteFn is called when a request enters and leaves a queue +type QueueNoteFn func(inQueue bool) + // Request represents the remainder of the handling of one request type Request interface { // Finish determines whether to execute or reject the request and diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/BUILD b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/BUILD index 75a78ece462..fc4d0e44d39 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/BUILD @@ -31,6 +31,7 @@ go_test( srcs = ["queueset_test.go"], embed = [":go_default_library"], deps = [ + "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 3deceb9127e..adcb56d856a 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -49,6 +49,7 @@ type queueSetFactory struct { // the fields `factory` and `theSet` is non-nil. type queueSetCompleter struct { factory *queueSetFactory + obsPair metrics.TimedObserverPair theSet *queueSet qCfg fq.QueuingConfig dealer *shufflesharding.Dealer @@ -67,6 +68,7 @@ type queueSet struct { clock clock.PassiveClock counter counter.GoRoutineCounter estimatedServiceTime float64 + obsPair metrics.TimedObserverPair lock sync.Mutex @@ -116,13 +118,14 @@ func NewQueueSetFactory(c clock.PassiveClock, counter counter.GoRoutineCounter) } } -func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) { +func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, obsPair metrics.TimedObserverPair) (fq.QueueSetCompleter, error) { dealer, err := checkConfig(qCfg) if err != nil { return nil, err } return &queueSetCompleter{ factory: qsf, + obsPair: obsPair, qCfg: qCfg, dealer: dealer}, nil } @@ -148,6 +151,7 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet { clock: qsc.factory.clock, counter: qsc.factory.counter, estimatedServiceTime: 60, + obsPair: qsc.obsPair, qCfg: qsc.qCfg, virtualTime: 0, lastRealTime: qsc.factory.clock.Now(), @@ -203,6 +207,12 @@ func (qs *queueSet) setConfiguration(qCfg fq.QueuingConfig, dealer *shuffleshard qs.qCfg = qCfg qs.dCfg = dCfg qs.dealer = dealer + qll := qCfg.QueueLengthLimit + if qll < 1 { + qll = 1 + } + qs.obsPair.RequestsWaiting.SetX1(float64(qll)) + qs.obsPair.RequestsExecuting.SetX1(float64(dCfg.ConcurrencyLimit)) qs.dispatchAsMuchAsPossibleLocked() } @@ -222,7 +232,7 @@ const ( // executing at each point where there is a change in that quantity, // because the metrics --- and only the metrics --- track that // quantity per FlowSchema. -func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) (fq.Request, bool) { +func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) { qs.lockAndSyncTime() defer qs.lock.Unlock() var req *request @@ -247,7 +257,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist // 3) Reject current request if there is not enough concurrency shares and // we are at max queue length // 4) If not rejected, create a request and enqueue - req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, hashValue, flowDistinguisher, fsName, descr1, descr2) + req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, hashValue, flowDistinguisher, fsName, descr1, descr2, queueNoteFn) // req == nil means that the request was rejected - no remaining // concurrency shares and at max queue length already if req == nil { @@ -295,6 +305,12 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist return req, false } +func (req *request) NoteQueued(inQueue bool) { + if req.queueNoteFn != nil { + req.queueNoteFn(inQueue) + } +} + func (req *request) Finish(execFn func()) bool { exec, idle := req.wait() if !exec { @@ -399,7 +415,7 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 { // returns the enqueud request on a successful enqueue // returns nil in the case that there is no available concurrency or // the queuelengthlimit has been reached -func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request { +func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request { // Start with the shuffle sharding, to pick a queue. queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2) queue := qs.queues[queueIdx] @@ -420,6 +436,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte queue: queue, descr1: descr1, descr2: descr2, + queueNoteFn: queueNoteFn, } if ok := qs.rejectOrEnqueueLocked(req); !ok { return nil @@ -463,6 +480,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s // get index for timed out requests timeoutIdx = i metrics.AddRequestsInQueues(qs.qCfg.Name, req.fsName, -1) + req.NoteQueued(false) } else { break } @@ -475,6 +493,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s queue.requests = reqs[removeIdx:] // decrement the # of requestsEnqueued qs.totRequestsWaiting -= removeIdx + qs.obsPair.RequestsWaiting.Add(float64(-removeIdx)) } } @@ -498,16 +517,19 @@ func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool { // enqueues a request into its queue. func (qs *queueSet) enqueueLocked(request *request) { queue := request.queue + now := qs.clock.Now() if len(queue.requests) == 0 && queue.requestsExecuting == 0 { // the queue’s virtual start time is set to the virtual time. queue.virtualStart = qs.virtualTime if klog.V(6).Enabled() { - klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2) + klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2) } } queue.Enqueue(request) qs.totRequestsWaiting++ metrics.AddRequestsInQueues(qs.qCfg.Name, request.fsName, 1) + request.NoteQueued(true) + qs.obsPair.RequestsWaiting.Add(1) } // dispatchAsMuchAsPossibleLocked runs a loop, as long as there @@ -541,6 +563,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, flowDistinguish req.decision.SetLocked(decisionExecute) qs.totRequestsExecuting++ metrics.AddRequestsExecuting(qs.qCfg.Name, fsName, 1) + qs.obsPair.RequestsExecuting.Add(1) if klog.V(5).Enabled() { klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting) } @@ -570,7 +593,10 @@ func (qs *queueSet) dispatchLocked() bool { qs.totRequestsExecuting++ queue.requestsExecuting++ metrics.AddRequestsInQueues(qs.qCfg.Name, request.fsName, -1) + request.NoteQueued(false) metrics.AddRequestsExecuting(qs.qCfg.Name, request.fsName, 1) + qs.obsPair.RequestsWaiting.Add(-1) + qs.obsPair.RequestsExecuting.Add(1) if klog.V(6).Enabled() { klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.index, queue.virtualStart, len(queue.requests), queue.requestsExecuting) } @@ -599,6 +625,8 @@ func (qs *queueSet) cancelWait(req *request) { queue.requests = append(queue.requests[:i], queue.requests[i+1:]...) qs.totRequestsWaiting-- metrics.AddRequestsInQueues(qs.qCfg.Name, req.fsName, -1) + req.NoteQueued(false) + qs.obsPair.RequestsWaiting.Add(-1) break } } @@ -650,17 +678,19 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool // previously dispatched request has completed it's service. This // callback updates important state in the queueSet func (qs *queueSet) finishRequestLocked(r *request) { + now := qs.clock.Now() qs.totRequestsExecuting-- metrics.AddRequestsExecuting(qs.qCfg.Name, r.fsName, -1) + qs.obsPair.RequestsExecuting.Add(-1) if r.queue == nil { if klog.V(6).Enabled() { - klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting) + klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting) } return } - S := qs.clock.Since(r.startTime).Seconds() + S := now.Sub(r.startTime).Seconds() // When a request finishes being served, and the actual service time was S, // the queue’s virtual start time is decremented by G - S. @@ -670,7 +700,7 @@ func (qs *queueSet) finishRequestLocked(r *request) { r.queue.requestsExecuting-- if klog.V(6).Enabled() { - klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index, r.queue.virtualStart, S, len(r.queue.requests), r.queue.requestsExecuting) + klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index, r.queue.virtualStart, S, len(r.queue.requests), r.queue.requestsExecuting) } // If there are more queues than desired and this one has no diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index c4157a696f1..86c086ad574 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -26,10 +26,11 @@ import ( "testing" "time" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apiserver/pkg/util/flowcontrol/counter" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" test "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing" - "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock" + testclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock" "k8s.io/apiserver/pkg/util/flowcontrol/metrics" "k8s.io/klog/v2" ) @@ -128,7 +129,7 @@ type uniformScenario struct { expectAllRequests bool evalInqueueMetrics, evalExecutingMetrics bool rejectReason string - clk *clock.FakeEventClock + clk *testclock.FakeEventClock counter counter.GoRoutineCounter } @@ -137,7 +138,7 @@ func (us uniformScenario) exercise(t *testing.T) { t: t, uniformScenario: us, startTime: time.Now(), - integrators: make([]test.Integrator, len(us.clients)), + integrators: make([]fq.Integrator, len(us.clients)), executions: make([]int32, len(us.clients)), rejects: make([]int32, len(us.clients)), } @@ -152,7 +153,7 @@ type uniformScenarioState struct { uniformScenario startTime time.Time doSplit bool - integrators []test.Integrator + integrators []fq.Integrator failedCount uint64 expectedInqueue, expectedExecuting string executions, rejects []int32 @@ -164,7 +165,7 @@ func (uss *uniformScenarioState) exercise() { metrics.Reset() } for i, uc := range uss.clients { - uss.integrators[i] = test.NewIntegrator(uss.clk) + uss.integrators[i] = fq.NewIntegrator(uss.clk) fsName := fmt.Sprintf("client%d", i) uss.expectedInqueue = uss.expectedInqueue + fmt.Sprintf(` apiserver_flowcontrol_current_inqueue_requests{flowSchema=%q,priorityLevel=%q} 0%s`, fsName, uss.name, "\n") for j := 0; j < uc.nThreads; j++ { @@ -193,7 +194,7 @@ type uniformScenarioThread struct { i, j int nCalls int uc uniformClient - igr test.Integrator + igr fq.Integrator fsName string } @@ -223,7 +224,7 @@ func (ust *uniformScenarioThread) callK(k int) { if k >= ust.nCalls { return } - req, idle := ust.uss.qs.StartRequest(context.Background(), ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}) + req, idle := ust.uss.qs.StartRequest(context.Background(), ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil) ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle) if req == nil { atomic.AddUint64(&ust.uss.failedCount, 1) @@ -236,7 +237,8 @@ func (ust *uniformScenarioThread) callK(k int) { var executed bool idle2 := req.Finish(func() { executed = true - ust.uss.t.Logf("%s: %d, %d, %d executing", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k) + execStart := ust.uss.clk.Now() + ust.uss.t.Logf("%s: %d, %d, %d executing", execStart.Format(nsTimeFmt), ust.i, ust.j, k) atomic.AddInt32(&ust.uss.executions[ust.i], 1) ust.igr.Add(1) ust.uss.clk.EventAfterDuration(ust.genCallK(k+1), ust.uc.execDuration+ust.uc.thinkDuration) @@ -339,7 +341,7 @@ func (uss *uniformScenarioState) finalReview() { } } -func ClockWait(clk *clock.FakeEventClock, counter counter.GoRoutineCounter, duration time.Duration) { +func ClockWait(clk *testclock.FakeEventClock, counter counter.GoRoutineCounter, duration time.Duration) { dunch := make(chan struct{}) clk.EventAfterDuration(func(time.Time) { counter.Add(1) @@ -359,8 +361,8 @@ func init() { func TestNoRestraint(t *testing.T) { metrics.Register() now := time.Now() - clk, counter := clock.NewFakeEventClock(now, 0, nil) - nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}) + clk, counter := testclock.NewFakeEventClock(now, 0, nil) + nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newObserverPair(clk)) if err != nil { t.Fatal(err) } @@ -384,7 +386,7 @@ func TestUniformFlowsHandSize1(t *testing.T) { metrics.Register() now := time.Now() - clk, counter := clock.NewFakeEventClock(now, 0, nil) + clk, counter := testclock.NewFakeEventClock(now, 0, nil) qsf := NewQueueSetFactory(clk, counter) qCfg := fq.QueuingConfig{ Name: "TestUniformFlowsHandSize1", @@ -393,7 +395,7 @@ func TestUniformFlowsHandSize1(t *testing.T) { HandSize: 1, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) if err != nil { t.Fatal(err) } @@ -420,7 +422,7 @@ func TestUniformFlowsHandSize3(t *testing.T) { metrics.Register() now := time.Now() - clk, counter := clock.NewFakeEventClock(now, 0, nil) + clk, counter := testclock.NewFakeEventClock(now, 0, nil) qsf := NewQueueSetFactory(clk, counter) qCfg := fq.QueuingConfig{ Name: "TestUniformFlowsHandSize3", @@ -429,7 +431,7 @@ func TestUniformFlowsHandSize3(t *testing.T) { HandSize: 3, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) if err != nil { t.Fatal(err) } @@ -455,7 +457,7 @@ func TestDifferentFlowsExpectEqual(t *testing.T) { metrics.Register() now := time.Now() - clk, counter := clock.NewFakeEventClock(now, 0, nil) + clk, counter := testclock.NewFakeEventClock(now, 0, nil) qsf := NewQueueSetFactory(clk, counter) qCfg := fq.QueuingConfig{ Name: "DiffFlowsExpectEqual", @@ -464,7 +466,7 @@ func TestDifferentFlowsExpectEqual(t *testing.T) { HandSize: 1, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) if err != nil { t.Fatal(err) } @@ -491,7 +493,7 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) { metrics.Register() now := time.Now() - clk, counter := clock.NewFakeEventClock(now, 0, nil) + clk, counter := testclock.NewFakeEventClock(now, 0, nil) qsf := NewQueueSetFactory(clk, counter) qCfg := fq.QueuingConfig{ Name: "DiffFlowsExpectUnequal", @@ -500,7 +502,7 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) { HandSize: 1, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) if err != nil { t.Fatal(err) } @@ -527,7 +529,7 @@ func TestWindup(t *testing.T) { metrics.Register() now := time.Now() - clk, counter := clock.NewFakeEventClock(now, 0, nil) + clk, counter := testclock.NewFakeEventClock(now, 0, nil) qsf := NewQueueSetFactory(clk, counter) qCfg := fq.QueuingConfig{ Name: "TestWindup", @@ -536,7 +538,7 @@ func TestWindup(t *testing.T) { HandSize: 1, RequestWaitLimit: 10 * time.Minute, } - qsc, err := qsf.BeginConstruction(qCfg) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) if err != nil { t.Fatal(err) } @@ -562,13 +564,13 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) { metrics.Register() now := time.Now() - clk, counter := clock.NewFakeEventClock(now, 0, nil) + clk, counter := testclock.NewFakeEventClock(now, 0, nil) qsf := NewQueueSetFactory(clk, counter) qCfg := fq.QueuingConfig{ Name: "TestDifferentFlowsWithoutQueuing", DesiredNumQueues: 0, } - qsc, err := qsf.BeginConstruction(qCfg) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) if err != nil { t.Fatal(err) } @@ -594,7 +596,7 @@ func TestTimeout(t *testing.T) { metrics.Register() now := time.Now() - clk, counter := clock.NewFakeEventClock(now, 0, nil) + clk, counter := testclock.NewFakeEventClock(now, 0, nil) qsf := NewQueueSetFactory(clk, counter) qCfg := fq.QueuingConfig{ Name: "TestTimeout", @@ -603,7 +605,7 @@ func TestTimeout(t *testing.T) { HandSize: 1, RequestWaitLimit: 0, } - qsc, err := qsf.BeginConstruction(qCfg) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) if err != nil { t.Fatal(err) } @@ -629,7 +631,7 @@ func TestContextCancel(t *testing.T) { metrics.Register() metrics.Reset() now := time.Now() - clk, counter := clock.NewFakeEventClock(now, 0, nil) + clk, counter := testclock.NewFakeEventClock(now, 0, nil) qsf := NewQueueSetFactory(clk, counter) qCfg := fq.QueuingConfig{ Name: "TestContextCancel", @@ -638,18 +640,26 @@ func TestContextCancel(t *testing.T) { HandSize: 1, RequestWaitLimit: 15 * time.Second, } - qsc, err := qsf.BeginConstruction(qCfg) + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) if err != nil { t.Fatal(err) } qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1}) counter.Add(1) // account for the goroutine running this test ctx1 := context.Background() - req1, _ := qs.StartRequest(ctx1, 1, "", "fs1", "test", "one") + b2i := map[bool]int{false: 0, true: 1} + var qnc [2][2]int32 + req1, _ := qs.StartRequest(ctx1, 1, "", "fs1", "test", "one", func(inQueue bool) { atomic.AddInt32(&qnc[0][b2i[inQueue]], 1) }) if req1 == nil { t.Error("Request rejected") return } + if a := atomic.AddInt32(&qnc[0][0], 0); a != 1 { + t.Errorf("Got %d calls to queueNoteFn1(false), expected 1", a) + } + if a := atomic.AddInt32(&qnc[0][1], 0); a != 1 { + t.Errorf("Got %d calls to queueNoteFn1(true), expected 1", a) + } var executed1 bool idle1 := req1.Finish(func() { executed1 = true @@ -657,11 +667,17 @@ func TestContextCancel(t *testing.T) { tBefore := time.Now() go func() { time.Sleep(time.Second) + if a := atomic.AddInt32(&qnc[1][0], 0); a != 0 { + t.Errorf("Got %d calls to queueNoteFn2(false), expected 0", a) + } + if a := atomic.AddInt32(&qnc[1][1], 0); a != 1 { + t.Errorf("Got %d calls to queueNoteFn2(true), expected 1", a) + } // account for unblocking the goroutine that waits on cancelation counter.Add(1) cancel2() }() - req2, idle2a := qs.StartRequest(ctx2, 2, "", "fs2", "test", "two") + req2, idle2a := qs.StartRequest(ctx2, 2, "", "fs2", "test", "two", func(inQueue bool) { atomic.AddInt32(&qnc[1][b2i[inQueue]], 1) }) if idle2a { t.Error("2nd StartRequest returned idle") } @@ -672,6 +688,9 @@ func TestContextCancel(t *testing.T) { if idle2b { t.Error("2nd Finish returned idle") } + if a := atomic.AddInt32(&qnc[1][0], 0); a != 1 { + t.Errorf("Got %d calls to queueNoteFn2(false), expected 1", a) + } } tAfter := time.Now() dt := tAfter.Sub(tBefore) @@ -686,3 +705,7 @@ func TestContextCancel(t *testing.T) { t.Error("Not idle at the end") } } + +func newObserverPair(clk clock.PassiveClock) metrics.TimedObserverPair { + return metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, []string{"test"}) +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index 1bcb8cfb32c..a720230600e 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -22,6 +22,7 @@ import ( genericrequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/util/flowcontrol/debug" + fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise" ) @@ -58,6 +59,8 @@ type request struct { // Indicates whether client has called Request::Wait() waitStarted bool + + queueNoteFn fq.QueueNoteFn } // queue is an array of requests with additional metadata required for diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/BUILD b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/BUILD index 558001431b2..82157b36ba6 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/BUILD @@ -2,17 +2,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", - srcs = [ - "integrator.go", - "no-restraint.go", - ], + srcs = ["no-restraint.go"], importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing", importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing", visibility = ["//visibility:public"], deps = [ - "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/integrator.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/integrator.go deleted file mode 100644 index e7ab1532242..00000000000 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/integrator.go +++ /dev/null @@ -1,116 +0,0 @@ -/* -Copyright 2019 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package testing - -import ( - "math" - "sync" - "time" - - "k8s.io/apimachinery/pkg/util/clock" -) - -// Integrator computes the integral of some variable X over time as -// read from a particular clock. The integral starts when the -// Integrator is created, and ends at the latest operation on the -// Integrator. -type Integrator interface { - Set(float64) // set the value of X - Add(float64) // add the given quantity to X - GetResults() IntegratorResults - Reset() IntegratorResults // restart the integration from now -} - -// IntegratorResults holds statistical abstracts of the integration -type IntegratorResults struct { - Duration float64 //seconds - Average float64 - Deviation float64 //sqrt(avg((value-avg)^2)) -} - -type integrator struct { - sync.Mutex - clk clock.PassiveClock - lastTime time.Time - x float64 - integrals [3]float64 // integral of x^0, x^1, and x^2 -} - -// NewIntegrator makes one that uses the given clock -func NewIntegrator(clk clock.PassiveClock) Integrator { - return &integrator{ - clk: clk, - lastTime: clk.Now(), - } -} - -func (igr *integrator) Set(x float64) { - igr.Lock() - igr.updateLocked() - igr.x = x - igr.Unlock() -} - -func (igr *integrator) Add(deltaX float64) { - igr.Lock() - igr.updateLocked() - igr.x += deltaX - igr.Unlock() -} - -func (igr *integrator) updateLocked() { - now := igr.clk.Now() - dt := now.Sub(igr.lastTime).Seconds() - igr.lastTime = now - igr.integrals[0] += dt - igr.integrals[1] += dt * igr.x - igr.integrals[2] += dt * igr.x * igr.x -} - -func (igr *integrator) GetResults() (results IntegratorResults) { - igr.Lock() - defer func() { igr.Unlock() }() - return igr.getResultsLocked() -} - -func (igr *integrator) getResultsLocked() (results IntegratorResults) { - igr.updateLocked() - results.Duration = igr.integrals[0] - if results.Duration <= 0 { - results.Average = math.NaN() - results.Deviation = math.NaN() - return - } - results.Average = igr.integrals[1] / igr.integrals[0] - // Deviation is sqrt( Integral( (x - xbar)^2 dt) / Duration ) - // = sqrt( Integral( x^2 + xbar^2 -2*x*xbar dt ) / Duration ) - // = sqrt( ( Integral( x^2 dt ) + Duration * xbar^2 - 2*xbar*Integral(x dt) ) / Duration) - // = sqrt( Integral(x^2 dt)/Duration - xbar^2 ) - variance := igr.integrals[2]/igr.integrals[0] - results.Average*results.Average - if variance > 0 { - results.Deviation = math.Sqrt(variance) - } - return -} - -func (igr *integrator) Reset() (results IntegratorResults) { - igr.Lock() - defer func() { igr.Unlock() }() - results = igr.getResultsLocked() - igr.integrals = [3]float64{0, 0, 0} - return -} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go index 72e7f5706ef..53d3795d05f 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go @@ -21,6 +21,7 @@ import ( "k8s.io/apiserver/pkg/util/flowcontrol/debug" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" + "k8s.io/apiserver/pkg/util/flowcontrol/metrics" ) // NewNoRestraintFactory makes a QueueSetFactory that produces @@ -38,7 +39,7 @@ type noRestraint struct{} type noRestraintRequest struct{} -func (noRestraintFactory) BeginConstruction(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) { +func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.TimedObserverPair) (fq.QueueSetCompleter, error) { return noRestraintCompleter{}, nil } @@ -54,7 +55,7 @@ func (noRestraint) IsIdle() bool { return false } -func (noRestraint) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) (fq.Request, bool) { +func (noRestraint) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) { return noRestraintRequest{}, false } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/gen_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/gen_test.go index 64551ee2eb1..ed0f236b614 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/gen_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/gen_test.go @@ -31,6 +31,7 @@ import ( "k8s.io/apiserver/pkg/endpoints/request" fqtesting "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing" fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format" + "k8s.io/apiserver/pkg/util/flowcontrol/metrics" ) var noRestraintQSF = fqtesting.NewNoRestraintFactory() @@ -55,7 +56,7 @@ func genPL(rng *rand.Rand, name string) *fcv1a1.PriorityLevelConfiguration { HandSize: hs, QueueLengthLimit: 5} } - _, err := qscOfPL(noRestraintQSF, nil, plc, time.Minute) + _, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, time.Minute, metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, []string{"test"})) if err != nil { panic(err) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/BUILD b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/BUILD index 1074189822a..cf4d2f2427b 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/BUILD @@ -2,14 +2,20 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", - srcs = ["metrics.go"], + srcs = [ + "metrics.go", + "sample_and_watermark.go", + "timed_observer.go", + ], importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/metrics", importpath = "k8s.io/apiserver/pkg/util/flowcontrol/metrics", visibility = ["//visibility:public"], deps = [ + "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/component-base/metrics:go_default_library", "//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library", "//staging/src/k8s.io/component-base/metrics/testutil:go_default_library", + "//vendor/k8s.io/klog/v2:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go index b2b6dab845b..940dac48104 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go @@ -21,6 +21,7 @@ import ( "sync" "time" + "k8s.io/apimachinery/pkg/util/clock" compbasemetrics "k8s.io/component-base/metrics" "k8s.io/component-base/metrics/legacyregistry" basemetricstestutil "k8s.io/component-base/metrics/testutil" @@ -32,8 +33,11 @@ const ( ) const ( + requestKind = "request_kind" priorityLevel = "priorityLevel" flowSchema = "flowSchema" + phase = "phase" + mark = "mark" ) var ( @@ -69,6 +73,14 @@ func GatherAndCompare(expected string, metricNames ...string) error { return basemetricstestutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...) } +// Registerables is a slice of Registerable +type Registerables []compbasemetrics.Registerable + +// Append adds more +func (rs Registerables) Append(more ...compbasemetrics.Registerable) Registerables { + return append(rs, more...) +} + var ( apiserverRejectedRequestsTotal = compbasemetrics.NewCounterVec( &compbasemetrics.CounterOpts{ @@ -88,6 +100,47 @@ var ( }, []string{priorityLevel, flowSchema}, ) + + // PriorityLevelConcurrencyObserverPairGenerator creates pairs that observe concurrency for priority levels + PriorityLevelConcurrencyObserverPairGenerator = NewSampleAndWaterMarkHistogramsPairGenerator(clock.RealClock{}, time.Millisecond, + &compbasemetrics.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "priority_level_request_count_samples", + Help: "Periodic observations of the number of requests", + Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1}, + StabilityLevel: compbasemetrics.ALPHA, + }, + &compbasemetrics.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "priority_level_request_count_watermarks", + Help: "Watermarks of the number of requests", + Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1}, + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{priorityLevel}) + + // ReadWriteConcurrencyObserverPairGenerator creates pairs that observe concurrency broken down by mutating vs readonly + ReadWriteConcurrencyObserverPairGenerator = NewSampleAndWaterMarkHistogramsPairGenerator(clock.RealClock{}, time.Millisecond, + &compbasemetrics.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "read_vs_write_request_count_samples", + Help: "Periodic observations of the number of requests", + Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1}, + StabilityLevel: compbasemetrics.ALPHA, + }, + &compbasemetrics.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "read_vs_write_request_count_watermarks", + Help: "Watermarks of the number of requests", + Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1}, + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{requestKind}) + apiserverCurrentInqueueRequests = compbasemetrics.NewGaugeVec( &compbasemetrics.GaugeOpts{ Namespace: namespace, @@ -145,7 +198,7 @@ var ( }, []string{priorityLevel, flowSchema}, ) - metrics = []compbasemetrics.Registerable{ + metrics = Registerables{ apiserverRejectedRequestsTotal, apiserverDispatchedRequestsTotal, apiserverCurrentInqueueRequests, @@ -154,7 +207,9 @@ var ( apiserverCurrentExecutingRequests, apiserverRequestWaitingSeconds, apiserverRequestExecutionSeconds, - } + }. + Append(PriorityLevelConcurrencyObserverPairGenerator.metrics()...). + Append(ReadWriteConcurrencyObserverPairGenerator.metrics()...) ) // AddRequestsInQueues adds the given delta to the gauge of the # of requests in the queues of the specified flowSchema and priorityLevel diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/sample_and_watermark.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/sample_and_watermark.go new file mode 100644 index 00000000000..f5df0e6492e --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/sample_and_watermark.go @@ -0,0 +1,192 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/clock" + compbasemetrics "k8s.io/component-base/metrics" + "k8s.io/klog/v2" +) + +const ( + labelNameMark = "mark" + labelValueLo = "low" + labelValueHi = "high" + labelNamePhase = "phase" + labelValueWaiting = "waiting" + labelValueExecuting = "executing" +) + +// SampleAndWaterMarkPairGenerator makes pairs of TimedObservers that +// track samples and watermarks. +type SampleAndWaterMarkPairGenerator struct { + urGenerator SampleAndWaterMarkObserverGenerator +} + +var _ TimedObserverPairGenerator = SampleAndWaterMarkPairGenerator{} + +// NewSampleAndWaterMarkHistogramsPairGenerator makes a new pair generator +func NewSampleAndWaterMarkHistogramsPairGenerator(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkPairGenerator { + return SampleAndWaterMarkPairGenerator{ + urGenerator: NewSampleAndWaterMarkHistogramsGenerator(clock, samplePeriod, sampleOpts, waterMarkOpts, append([]string{labelNamePhase}, labelNames...)), + } +} + +// Generate makes a new pair +func (spg SampleAndWaterMarkPairGenerator) Generate(waiting1, executing1 float64, labelValues []string) TimedObserverPair { + return TimedObserverPair{ + RequestsWaiting: spg.urGenerator.Generate(0, waiting1, append([]string{labelValueWaiting}, labelValues...)), + RequestsExecuting: spg.urGenerator.Generate(0, executing1, append([]string{labelValueExecuting}, labelValues...)), + } +} + +func (spg SampleAndWaterMarkPairGenerator) metrics() Registerables { + return spg.urGenerator.metrics() +} + +// SampleAndWaterMarkObserverGenerator creates TimedObservers that +// populate histograms of samples and low- and high-water-marks. The +// generator has a samplePeriod, and the histograms get an observation +// every samplePeriod. +type SampleAndWaterMarkObserverGenerator struct { + *sampleAndWaterMarkObserverGenerator +} + +type sampleAndWaterMarkObserverGenerator struct { + clock clock.PassiveClock + samplePeriod time.Duration + samples *compbasemetrics.HistogramVec + waterMarks *compbasemetrics.HistogramVec +} + +var _ TimedObserverGenerator = (*sampleAndWaterMarkObserverGenerator)(nil) + +// NewSampleAndWaterMarkHistogramsGenerator makes a new one +func NewSampleAndWaterMarkHistogramsGenerator(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkObserverGenerator { + return SampleAndWaterMarkObserverGenerator{ + &sampleAndWaterMarkObserverGenerator{ + clock: clock, + samplePeriod: samplePeriod, + samples: compbasemetrics.NewHistogramVec(sampleOpts, labelNames), + waterMarks: compbasemetrics.NewHistogramVec(waterMarkOpts, append([]string{labelNameMark}, labelNames...)), + }} +} + +func (swg *sampleAndWaterMarkObserverGenerator) quantize(when time.Time) int64 { + return when.UnixNano() / int64(swg.samplePeriod) +} + +// Generate makes a new TimedObserver +func (swg *sampleAndWaterMarkObserverGenerator) Generate(x, x1 float64, labelValues []string) TimedObserver { + relX := x / x1 + when := swg.clock.Now() + return &sampleAndWaterMarkHistograms{ + sampleAndWaterMarkObserverGenerator: swg, + labelValues: labelValues, + loLabelValues: append([]string{labelValueLo}, labelValues...), + hiLabelValues: append([]string{labelValueHi}, labelValues...), + x1: x1, + sampleAndWaterMarkAccumulator: sampleAndWaterMarkAccumulator{ + lastSet: when, + lastSetInt: swg.quantize(when), + x: x, + relX: relX, + loRelX: relX, + hiRelX: relX, + }} +} + +func (swg *sampleAndWaterMarkObserverGenerator) metrics() Registerables { + return Registerables{swg.samples, swg.waterMarks} +} + +type sampleAndWaterMarkHistograms struct { + *sampleAndWaterMarkObserverGenerator + labelValues []string + loLabelValues, hiLabelValues []string + + sync.Mutex + x1 float64 + sampleAndWaterMarkAccumulator +} + +type sampleAndWaterMarkAccumulator struct { + lastSet time.Time + lastSetInt int64 // lastSet / samplePeriod + x float64 + relX float64 // x / x1 + loRelX, hiRelX float64 +} + +var _ TimedObserver = (*sampleAndWaterMarkHistograms)(nil) + +func (saw *sampleAndWaterMarkHistograms) Add(deltaX float64) { + saw.innerSet(func() { + saw.x += deltaX + }) +} + +func (saw *sampleAndWaterMarkHistograms) Set(x float64) { + saw.innerSet(func() { + saw.x = x + }) +} + +func (saw *sampleAndWaterMarkHistograms) SetX1(x1 float64) { + saw.innerSet(func() { + saw.x1 = x1 + }) +} + +func (saw *sampleAndWaterMarkHistograms) innerSet(updateXOrX1 func()) { + saw.Lock() + when := saw.clock.Now() + whenInt := saw.quantize(when) + acc := saw.sampleAndWaterMarkAccumulator + wellOrdered := !when.Before(acc.lastSet) + if wellOrdered { + updateXOrX1() + saw.relX = saw.x / saw.x1 + if acc.lastSetInt < whenInt { + saw.loRelX, saw.hiRelX = acc.relX, acc.relX + saw.lastSetInt = whenInt + } + if saw.relX < saw.loRelX { + saw.loRelX = saw.relX + } else if saw.relX > saw.hiRelX { + saw.hiRelX = saw.relX + } + saw.lastSet = when + } + saw.Unlock() + if !wellOrdered { + lastSetS := acc.lastSet.Format(time.RFC3339Nano) + whenS := when.Format(time.RFC3339Nano) + klog.Fatalf("Time went backwards from %s to %s for labelValues=%#+v", lastSetS, whenS, saw.labelValues) + panic(append([]string{lastSetS, whenS}, saw.labelValues...)) + } + for acc.lastSetInt < whenInt { + saw.samples.WithLabelValues(saw.labelValues...).Observe(acc.relX) + saw.waterMarks.WithLabelValues(saw.loLabelValues...).Observe(acc.loRelX) + saw.waterMarks.WithLabelValues(saw.hiLabelValues...).Observe(acc.hiRelX) + acc.lastSetInt++ + acc.loRelX, acc.hiRelX = acc.relX, acc.relX + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/timed_observer.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/timed_observer.go new file mode 100644 index 00000000000..25f41493c3e --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/timed_observer.go @@ -0,0 +1,52 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +// TimedObserver gets informed about the values assigned to a variable +// `X float64` over time, and reports on the ratio `X/X1`. +type TimedObserver interface { + // Add notes a change to the variable + Add(deltaX float64) + + // Set notes a setting of the variable + Set(x float64) + + // SetX1 changes the value to use for X1 + SetX1(x1 float64) +} + +// TimedObserverGenerator creates related observers that are +// differentiated by a series of label values +type TimedObserverGenerator interface { + Generate(x, x1 float64, labelValues []string) TimedObserver +} + +// TimedObserverPair is a corresponding pair of observers, one for the +// number of requests waiting in queue(s) and one for the number of +// requests being executed +type TimedObserverPair struct { + // RequestsWaiting is given observations of the number of currently queued requests + RequestsWaiting TimedObserver + + // RequestsExecuting is given observations of the number of requests currently executing + RequestsExecuting TimedObserver +} + +// TimedObserverPairGenerator generates pairs +type TimedObserverPairGenerator interface { + Generate(waiting1, executing1 float64, labelValues []string) TimedObserverPair +}