diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go index 61f53c18eab..450a6653da6 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go @@ -106,7 +106,7 @@ var ( &compbasemetrics.HistogramOpts{ Subsystem: APIServerComponent, Name: "request_slo_duration_seconds", - Help: "Response latency distribution (not counting webhook duration) in seconds for each verb, group, version, resource, subresource, scope and component.", + Help: "Response latency distribution (not counting webhook duration and priority & fairness queue wait times) in seconds for each verb, group, version, resource, subresource, scope and component.", // This metric is supplementary to the requestLatencies metric. // It measures request duration excluding webhooks as they are mostly // dependant on user configuration. @@ -121,7 +121,7 @@ var ( &compbasemetrics.HistogramOpts{ Subsystem: APIServerComponent, Name: "request_sli_duration_seconds", - Help: "Response latency distribution (not counting webhook duration) in seconds for each verb, group, version, resource, subresource, scope and component.", + Help: "Response latency distribution (not counting webhook duration and priority & fairness queue wait times) in seconds for each verb, group, version, resource, subresource, scope and component.", // This metric is supplementary to the requestLatencies metric. // It measures request duration excluding webhooks as they are mostly // dependant on user configuration. @@ -544,7 +544,7 @@ func MonitorRequest(req *http.Request, verb, group, version, resource, subresour fieldValidationRequestLatencies.WithContext(req.Context()).WithLabelValues(fieldValidation) if wd, ok := request.LatencyTrackersFrom(req.Context()); ok { - sliLatency := elapsedSeconds - (wd.MutatingWebhookTracker.GetLatency() + wd.ValidatingWebhookTracker.GetLatency()).Seconds() + sliLatency := elapsedSeconds - (wd.MutatingWebhookTracker.GetLatency() + wd.ValidatingWebhookTracker.GetLatency() + wd.APFQueueWaitTracker.GetLatency()).Seconds() requestSloLatencies.WithContext(req.Context()).WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component).Observe(sliLatency) requestSliLatencies.WithContext(req.Context()).WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component).Observe(sliLatency) } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/request/webhook_duration.go b/staging/src/k8s.io/apiserver/pkg/endpoints/request/webhook_duration.go index 120bc46bf8b..612c671d85b 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/request/webhook_duration.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/request/webhook_duration.go @@ -116,6 +116,10 @@ type LatencyTrackers struct { // Validate webhooks are done in parallel, so max function is used. ValidatingWebhookTracker DurationTracker + // APFQueueWaitTracker tracks the latency incurred by queue wait times + // from priority & fairness. + APFQueueWaitTracker DurationTracker + // StorageTracker tracks the latency incurred inside the storage layer, // it accounts for the time it takes to send data to the underlying // storage layer (etcd) and get the complete response back. @@ -168,6 +172,7 @@ func WithLatencyTrackersAndCustomClock(parent context.Context, c clock.Clock) co return WithValue(parent, latencyTrackersKey, &LatencyTrackers{ MutatingWebhookTracker: newSumLatencyTracker(c), ValidatingWebhookTracker: newMaxLatencyTracker(c), + APFQueueWaitTracker: newMaxLatencyTracker(c), StorageTracker: newSumLatencyTracker(c), TransformTracker: newSumLatencyTracker(c), SerializationTracker: newSumLatencyTracker(c), @@ -230,6 +235,14 @@ func TrackResponseWriteLatency(ctx context.Context, d time.Duration) { } } +// TrackAPFQueueWaitLatency is used to track latency incurred +// by priority and fairness queues. +func TrackAPFQueueWaitLatency(ctx context.Context, d time.Duration) { + if tracker, ok := LatencyTrackersFrom(ctx); ok { + tracker.APFQueueWaitTracker.TrackDuration(d) + } +} + // AuditAnnotationsFromLatencyTrackers will inspect each latency tracker // associated with the request context and return a set of audit // annotations that can be added to the API audit entry. diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/request/webhook_duration_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/request/webhook_duration_test.go index 9119047c12a..0008b21cc14 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/request/webhook_duration_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/request/webhook_duration_test.go @@ -35,7 +35,7 @@ func TestLatencyTrackersFrom(t *testing.T) { SumDurations: 1600, MaxDuration: 400, } - t.Run("TestWebhookDurationFrom", func(t *testing.T) { + t.Run("TestLatencyTrackersFrom", func(t *testing.T) { parent := context.TODO() _, ok := LatencyTrackersFrom(parent) if ok { @@ -48,13 +48,14 @@ func TestLatencyTrackersFrom(t *testing.T) { if !ok { t.Error("expected LatencyTrackersFrom to be initialized") } - if wd.MutatingWebhookTracker.GetLatency() != 0 || wd.ValidatingWebhookTracker.GetLatency() != 0 { + if wd.MutatingWebhookTracker.GetLatency() != 0 || wd.ValidatingWebhookTracker.GetLatency() != 0 || wd.APFQueueWaitTracker.GetLatency() != 0 { t.Error("expected values to be initialized to 0") } for _, d := range tc.Durations { wd.MutatingWebhookTracker.Track(func() { clk.Step(d) }) wd.ValidatingWebhookTracker.Track(func() { clk.Step(d) }) + wd.APFQueueWaitTracker.Track(func() { clk.Step(d) }) } wd, ok = LatencyTrackersFrom(ctx) @@ -69,5 +70,9 @@ func TestLatencyTrackersFrom(t *testing.T) { if wd.ValidatingWebhookTracker.GetLatency() != tc.MaxDuration { t.Errorf("expected validate duration: %q, but got: %q", tc.MaxDuration, wd.ValidatingWebhookTracker.GetLatency()) } + + if wd.APFQueueWaitTracker.GetLatency() != tc.MaxDuration { + t.Errorf("expected priority & fairness duration: %q, but got: %q", tc.MaxDuration, wd.APFQueueWaitTracker.GetLatency()) + } }) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index 5718dd9e866..2048a6ef6b0 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -1026,7 +1026,7 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig noteFn(selectedFlowSchema, plState.pl, flowDistinguisher) workEstimate := workEstimator() - startWaitingTime = time.Now() + startWaitingTime = cfgCtlr.clock.Now() klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues) req, idle := plState.queues.StartRequest(ctx, &workEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn) if idle { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go index f93e6a828ee..2929048ecc7 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go @@ -21,6 +21,7 @@ import ( "strconv" "time" + endpointsrequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/server/httplog" "k8s.io/apiserver/pkg/server/mux" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" @@ -161,7 +162,7 @@ func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest Reque queued := startWaitingTime != time.Time{} if req == nil { if queued { - metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) + observeQueueWaitTime(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), cfgCtlr.clock.Since(startWaitingTime)) } klog.V(7).Infof("Handle(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, isExempt=%v, reject", requestDigest, fs.Name, fs.Spec.DistinguisherMethod, pl.Name, isExempt) return @@ -178,21 +179,26 @@ func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest Reque }() idle = req.Finish(func() { if queued { - metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) + observeQueueWaitTime(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), cfgCtlr.clock.Since(startWaitingTime)) } metrics.AddDispatch(ctx, pl.Name, fs.Name) fqs.OnRequestDispatched(req) executed = true - startExecutionTime := time.Now() + startExecutionTime := cfgCtlr.clock.Now() defer func() { - executionTime := time.Since(startExecutionTime) + executionTime := cfgCtlr.clock.Since(startExecutionTime) httplog.AddKeyValue(ctx, "apf_execution_time", executionTime) metrics.ObserveExecutionDuration(ctx, pl.Name, fs.Name, executionTime) }() execFn() }) if queued && !executed { - metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) + observeQueueWaitTime(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), cfgCtlr.clock.Since(startWaitingTime)) } panicking = false } + +func observeQueueWaitTime(ctx context.Context, priorityLevelName, flowSchemaName, execute string, waitTime time.Duration) { + metrics.ObserveWaitingDuration(ctx, priorityLevelName, flowSchemaName, execute, waitTime) + endpointsrequest.TrackAPFQueueWaitLatency(ctx, waitTime) +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter_test.go new file mode 100644 index 00000000000..742ada095f6 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter_test.go @@ -0,0 +1,167 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flowcontrol + +import ( + "context" + "testing" + "time" + + flowcontrol "k8s.io/api/flowcontrol/v1beta3" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/endpoints/request" + fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" + fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset" + "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/eventclock" + "k8s.io/apiserver/pkg/util/flowcontrol/metrics" + fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" + "k8s.io/client-go/informers" + clientsetfake "k8s.io/client-go/kubernetes/fake" +) + +// TestQueueWaitTimeLatencyTracker tests the queue wait times recorded by the P&F latency tracker +// when calling Handle. +func TestQueueWaitTimeLatencyTracker(t *testing.T) { + metrics.Register() + + var fsObj *flowcontrol.FlowSchema + var plcObj *flowcontrol.PriorityLevelConfiguration + cfgObjs := []runtime.Object{} + + plName := "test-pl" + username := "test-user" + fsName := "test-fs" + lendable := int32(0) + borrowingLimit := int32(0) + fsObj = &flowcontrol.FlowSchema{ + ObjectMeta: metav1.ObjectMeta{ + Name: fsName, + }, + Spec: flowcontrol.FlowSchemaSpec{ + MatchingPrecedence: 100, + PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{ + Name: plName, + }, + DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{ + Type: flowcontrol.FlowDistinguisherMethodByUserType, + }, + Rules: []flowcontrol.PolicyRulesWithSubjects{{ + Subjects: []flowcontrol.Subject{{ + Kind: flowcontrol.SubjectKindUser, + User: &flowcontrol.UserSubject{Name: username}, + }}, + NonResourceRules: []flowcontrol.NonResourcePolicyRule{{ + Verbs: []string{"*"}, + NonResourceURLs: []string{"*"}, + }}, + }}, + }, + } + plcObj = &flowcontrol.PriorityLevelConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + Name: plName, + }, + Spec: flowcontrol.PriorityLevelConfigurationSpec{ + Type: flowcontrol.PriorityLevelEnablementLimited, + Limited: &flowcontrol.LimitedPriorityLevelConfiguration{ + NominalConcurrencyShares: 100, + LendablePercent: &lendable, + BorrowingLimitPercent: &borrowingLimit, + LimitResponse: flowcontrol.LimitResponse{ + Type: flowcontrol.LimitResponseTypeQueue, + Queuing: &flowcontrol.QueuingConfiguration{ + Queues: 10, + HandSize: 2, + QueueLengthLimit: 10, + }, + }, + }, + }, + } + cfgObjs = append(cfgObjs, fsObj, plcObj) + + clientset := clientsetfake.NewSimpleClientset(cfgObjs...) + informerFactory := informers.NewSharedInformerFactory(clientset, time.Second) + flowcontrolClient := clientset.FlowcontrolV1beta3() + startTime := time.Now() + clk, _ := eventclock.NewFake(startTime, 0, nil) + controller := newTestableController(TestableConfig{ + Name: "Controller", + Clock: clk, + AsFieldManager: ConfigConsumerAsFieldManager, + FoundToDangling: func(found bool) bool { return !found }, + InformerFactory: informerFactory, + FlowcontrolClient: flowcontrolClient, + ServerConcurrencyLimit: 24, + RequestWaitLimit: time.Minute, + ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec, + ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, + QueueSetFactory: fqs.NewQueueSetFactory(clk), + }) + + informerFactory.Start(nil) + + status := informerFactory.WaitForCacheSync(nil) + if names := unsynced(status); len(names) > 0 { + t.Fatalf("WaitForCacheSync did not successfully complete, resources=%#v", names) + } + + go func() { + controller.Run(nil) + }() + + // ensure that the controller has run its first loop. + err := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) { + return controller.hasPriorityLevelState(plcObj.Name), nil + }) + if err != nil { + t.Errorf("expected the controller to reconcile the priority level configuration object: %s, error: %s", plcObj.Name, err) + } + + reqInfo := &request.RequestInfo{ + IsResourceRequest: false, + Path: "/foobar", + Verb: "GET", + } + noteFn := func(fs *flowcontrol.FlowSchema, plc *flowcontrol.PriorityLevelConfiguration, fd string) {} + workEstr := func() fcrequest.WorkEstimate { return fcrequest.WorkEstimate{InitialSeats: 1} } + + flowUser := testUser{name: "test-user"} + rd := RequestDigest{ + RequestInfo: reqInfo, + User: flowUser, + } + + // Add 1 second to the fake clock during QueueNoteFn + newTime := startTime.Add(time.Second) + qnf := fq.QueueNoteFn(func(bool) { clk.FakePassiveClock.SetTime(newTime) }) + ctx := request.WithLatencyTrackers(context.Background()) + controller.Handle(ctx, rd, noteFn, workEstr, qnf, func() {}) + + latencyTracker, ok := request.LatencyTrackersFrom(ctx) + if !ok { + t.Fatalf("error getting latency tracker: %v", err) + } + + expectedLatency := time.Second // newTime - startTime + latency := latencyTracker.APFQueueWaitTracker.GetLatency() + if latency != expectedLatency { + t.Errorf("unexpected latency, got %s, expected %s", latency, expectedLatency) + } +}