apiserver: exclude APF queue wait time from SLO latency metrics (#116420)

* apiserver: add latency tracker for priority & fairness queue wait time

Signed-off-by: Andrew Sy Kim <andrewsy@google.com>

* apiserver: exclude priority & fairness wait times to SLO/SLI latency metrics

Signed-off-by: Andrew Sy Kim <andrewsy@google.com>

* apiserver: update TestLatencyTrackersFrom to check latency from PriorityAndFairnessTracker

Signed-off-by: Andrew Sy Kim <andrewsy@google.com>

* flowcontrol: add helper function observeQueueWaitTime to consolidate metric and latency tracker calls

Signed-off-by: Andrew Sy Kim <andrewsy@google.com>

* flowcontrol: replace time.Now() / time.Since() with clock.Now() / clock.Since() for better testability

Signed-off-by: Andrew Sy Kim <andrewsy@google.com>

* flowcontrol: add unit test TestQueueWaitTimeLatencyTracker to validate queue wait times recorded by latency tracker

Signed-off-by: Andrew Sy Kim <andrewsy@google.com>

---------

Signed-off-by: Andrew Sy Kim <andrewsy@google.com>
This commit is contained in:
Andrew Sy Kim 2023-03-14 05:15:20 -04:00 committed by GitHub
parent 0e06be57a6
commit ee18f60252
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 202 additions and 11 deletions

View File

@ -106,7 +106,7 @@ var (
&compbasemetrics.HistogramOpts{
Subsystem: APIServerComponent,
Name: "request_slo_duration_seconds",
Help: "Response latency distribution (not counting webhook duration) in seconds for each verb, group, version, resource, subresource, scope and component.",
Help: "Response latency distribution (not counting webhook duration and priority & fairness queue wait times) in seconds for each verb, group, version, resource, subresource, scope and component.",
// This metric is supplementary to the requestLatencies metric.
// It measures request duration excluding webhooks as they are mostly
// dependant on user configuration.
@ -121,7 +121,7 @@ var (
&compbasemetrics.HistogramOpts{
Subsystem: APIServerComponent,
Name: "request_sli_duration_seconds",
Help: "Response latency distribution (not counting webhook duration) in seconds for each verb, group, version, resource, subresource, scope and component.",
Help: "Response latency distribution (not counting webhook duration and priority & fairness queue wait times) in seconds for each verb, group, version, resource, subresource, scope and component.",
// This metric is supplementary to the requestLatencies metric.
// It measures request duration excluding webhooks as they are mostly
// dependant on user configuration.
@ -544,7 +544,7 @@ func MonitorRequest(req *http.Request, verb, group, version, resource, subresour
fieldValidationRequestLatencies.WithContext(req.Context()).WithLabelValues(fieldValidation)
if wd, ok := request.LatencyTrackersFrom(req.Context()); ok {
sliLatency := elapsedSeconds - (wd.MutatingWebhookTracker.GetLatency() + wd.ValidatingWebhookTracker.GetLatency()).Seconds()
sliLatency := elapsedSeconds - (wd.MutatingWebhookTracker.GetLatency() + wd.ValidatingWebhookTracker.GetLatency() + wd.APFQueueWaitTracker.GetLatency()).Seconds()
requestSloLatencies.WithContext(req.Context()).WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component).Observe(sliLatency)
requestSliLatencies.WithContext(req.Context()).WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component).Observe(sliLatency)
}

View File

@ -116,6 +116,10 @@ type LatencyTrackers struct {
// Validate webhooks are done in parallel, so max function is used.
ValidatingWebhookTracker DurationTracker
// APFQueueWaitTracker tracks the latency incurred by queue wait times
// from priority & fairness.
APFQueueWaitTracker DurationTracker
// StorageTracker tracks the latency incurred inside the storage layer,
// it accounts for the time it takes to send data to the underlying
// storage layer (etcd) and get the complete response back.
@ -168,6 +172,7 @@ func WithLatencyTrackersAndCustomClock(parent context.Context, c clock.Clock) co
return WithValue(parent, latencyTrackersKey, &LatencyTrackers{
MutatingWebhookTracker: newSumLatencyTracker(c),
ValidatingWebhookTracker: newMaxLatencyTracker(c),
APFQueueWaitTracker: newMaxLatencyTracker(c),
StorageTracker: newSumLatencyTracker(c),
TransformTracker: newSumLatencyTracker(c),
SerializationTracker: newSumLatencyTracker(c),
@ -230,6 +235,14 @@ func TrackResponseWriteLatency(ctx context.Context, d time.Duration) {
}
}
// TrackAPFQueueWaitLatency is used to track latency incurred
// by priority and fairness queues.
func TrackAPFQueueWaitLatency(ctx context.Context, d time.Duration) {
if tracker, ok := LatencyTrackersFrom(ctx); ok {
tracker.APFQueueWaitTracker.TrackDuration(d)
}
}
// AuditAnnotationsFromLatencyTrackers will inspect each latency tracker
// associated with the request context and return a set of audit
// annotations that can be added to the API audit entry.

View File

@ -35,7 +35,7 @@ func TestLatencyTrackersFrom(t *testing.T) {
SumDurations: 1600,
MaxDuration: 400,
}
t.Run("TestWebhookDurationFrom", func(t *testing.T) {
t.Run("TestLatencyTrackersFrom", func(t *testing.T) {
parent := context.TODO()
_, ok := LatencyTrackersFrom(parent)
if ok {
@ -48,13 +48,14 @@ func TestLatencyTrackersFrom(t *testing.T) {
if !ok {
t.Error("expected LatencyTrackersFrom to be initialized")
}
if wd.MutatingWebhookTracker.GetLatency() != 0 || wd.ValidatingWebhookTracker.GetLatency() != 0 {
if wd.MutatingWebhookTracker.GetLatency() != 0 || wd.ValidatingWebhookTracker.GetLatency() != 0 || wd.APFQueueWaitTracker.GetLatency() != 0 {
t.Error("expected values to be initialized to 0")
}
for _, d := range tc.Durations {
wd.MutatingWebhookTracker.Track(func() { clk.Step(d) })
wd.ValidatingWebhookTracker.Track(func() { clk.Step(d) })
wd.APFQueueWaitTracker.Track(func() { clk.Step(d) })
}
wd, ok = LatencyTrackersFrom(ctx)
@ -69,5 +70,9 @@ func TestLatencyTrackersFrom(t *testing.T) {
if wd.ValidatingWebhookTracker.GetLatency() != tc.MaxDuration {
t.Errorf("expected validate duration: %q, but got: %q", tc.MaxDuration, wd.ValidatingWebhookTracker.GetLatency())
}
if wd.APFQueueWaitTracker.GetLatency() != tc.MaxDuration {
t.Errorf("expected priority & fairness duration: %q, but got: %q", tc.MaxDuration, wd.APFQueueWaitTracker.GetLatency())
}
})
}

View File

@ -1026,7 +1026,7 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig
noteFn(selectedFlowSchema, plState.pl, flowDistinguisher)
workEstimate := workEstimator()
startWaitingTime = time.Now()
startWaitingTime = cfgCtlr.clock.Now()
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues)
req, idle := plState.queues.StartRequest(ctx, &workEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn)
if idle {

View File

@ -21,6 +21,7 @@ import (
"strconv"
"time"
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/server/httplog"
"k8s.io/apiserver/pkg/server/mux"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
@ -161,7 +162,7 @@ func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest Reque
queued := startWaitingTime != time.Time{}
if req == nil {
if queued {
metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
observeQueueWaitTime(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), cfgCtlr.clock.Since(startWaitingTime))
}
klog.V(7).Infof("Handle(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, isExempt=%v, reject", requestDigest, fs.Name, fs.Spec.DistinguisherMethod, pl.Name, isExempt)
return
@ -178,21 +179,26 @@ func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest Reque
}()
idle = req.Finish(func() {
if queued {
metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
observeQueueWaitTime(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), cfgCtlr.clock.Since(startWaitingTime))
}
metrics.AddDispatch(ctx, pl.Name, fs.Name)
fqs.OnRequestDispatched(req)
executed = true
startExecutionTime := time.Now()
startExecutionTime := cfgCtlr.clock.Now()
defer func() {
executionTime := time.Since(startExecutionTime)
executionTime := cfgCtlr.clock.Since(startExecutionTime)
httplog.AddKeyValue(ctx, "apf_execution_time", executionTime)
metrics.ObserveExecutionDuration(ctx, pl.Name, fs.Name, executionTime)
}()
execFn()
})
if queued && !executed {
metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
observeQueueWaitTime(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), cfgCtlr.clock.Since(startWaitingTime))
}
panicking = false
}
func observeQueueWaitTime(ctx context.Context, priorityLevelName, flowSchemaName, execute string, waitTime time.Duration) {
metrics.ObserveWaitingDuration(ctx, priorityLevelName, flowSchemaName, execute, waitTime)
endpointsrequest.TrackAPFQueueWaitLatency(ctx, waitTime)
}

View File

@ -0,0 +1,167 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flowcontrol
import (
"context"
"testing"
"time"
flowcontrol "k8s.io/api/flowcontrol/v1beta3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/endpoints/request"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/eventclock"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
)
// TestQueueWaitTimeLatencyTracker tests the queue wait times recorded by the P&F latency tracker
// when calling Handle.
func TestQueueWaitTimeLatencyTracker(t *testing.T) {
metrics.Register()
var fsObj *flowcontrol.FlowSchema
var plcObj *flowcontrol.PriorityLevelConfiguration
cfgObjs := []runtime.Object{}
plName := "test-pl"
username := "test-user"
fsName := "test-fs"
lendable := int32(0)
borrowingLimit := int32(0)
fsObj = &flowcontrol.FlowSchema{
ObjectMeta: metav1.ObjectMeta{
Name: fsName,
},
Spec: flowcontrol.FlowSchemaSpec{
MatchingPrecedence: 100,
PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{
Name: plName,
},
DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{
Type: flowcontrol.FlowDistinguisherMethodByUserType,
},
Rules: []flowcontrol.PolicyRulesWithSubjects{{
Subjects: []flowcontrol.Subject{{
Kind: flowcontrol.SubjectKindUser,
User: &flowcontrol.UserSubject{Name: username},
}},
NonResourceRules: []flowcontrol.NonResourcePolicyRule{{
Verbs: []string{"*"},
NonResourceURLs: []string{"*"},
}},
}},
},
}
plcObj = &flowcontrol.PriorityLevelConfiguration{
ObjectMeta: metav1.ObjectMeta{
Name: plName,
},
Spec: flowcontrol.PriorityLevelConfigurationSpec{
Type: flowcontrol.PriorityLevelEnablementLimited,
Limited: &flowcontrol.LimitedPriorityLevelConfiguration{
NominalConcurrencyShares: 100,
LendablePercent: &lendable,
BorrowingLimitPercent: &borrowingLimit,
LimitResponse: flowcontrol.LimitResponse{
Type: flowcontrol.LimitResponseTypeQueue,
Queuing: &flowcontrol.QueuingConfiguration{
Queues: 10,
HandSize: 2,
QueueLengthLimit: 10,
},
},
},
},
}
cfgObjs = append(cfgObjs, fsObj, plcObj)
clientset := clientsetfake.NewSimpleClientset(cfgObjs...)
informerFactory := informers.NewSharedInformerFactory(clientset, time.Second)
flowcontrolClient := clientset.FlowcontrolV1beta3()
startTime := time.Now()
clk, _ := eventclock.NewFake(startTime, 0, nil)
controller := newTestableController(TestableConfig{
Name: "Controller",
Clock: clk,
AsFieldManager: ConfigConsumerAsFieldManager,
FoundToDangling: func(found bool) bool { return !found },
InformerFactory: informerFactory,
FlowcontrolClient: flowcontrolClient,
ServerConcurrencyLimit: 24,
RequestWaitLimit: time.Minute,
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
QueueSetFactory: fqs.NewQueueSetFactory(clk),
})
informerFactory.Start(nil)
status := informerFactory.WaitForCacheSync(nil)
if names := unsynced(status); len(names) > 0 {
t.Fatalf("WaitForCacheSync did not successfully complete, resources=%#v", names)
}
go func() {
controller.Run(nil)
}()
// ensure that the controller has run its first loop.
err := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) {
return controller.hasPriorityLevelState(plcObj.Name), nil
})
if err != nil {
t.Errorf("expected the controller to reconcile the priority level configuration object: %s, error: %s", plcObj.Name, err)
}
reqInfo := &request.RequestInfo{
IsResourceRequest: false,
Path: "/foobar",
Verb: "GET",
}
noteFn := func(fs *flowcontrol.FlowSchema, plc *flowcontrol.PriorityLevelConfiguration, fd string) {}
workEstr := func() fcrequest.WorkEstimate { return fcrequest.WorkEstimate{InitialSeats: 1} }
flowUser := testUser{name: "test-user"}
rd := RequestDigest{
RequestInfo: reqInfo,
User: flowUser,
}
// Add 1 second to the fake clock during QueueNoteFn
newTime := startTime.Add(time.Second)
qnf := fq.QueueNoteFn(func(bool) { clk.FakePassiveClock.SetTime(newTime) })
ctx := request.WithLatencyTrackers(context.Background())
controller.Handle(ctx, rd, noteFn, workEstr, qnf, func() {})
latencyTracker, ok := request.LatencyTrackersFrom(ctx)
if !ok {
t.Fatalf("error getting latency tracker: %v", err)
}
expectedLatency := time.Second // newTime - startTime
latency := latencyTracker.APFQueueWaitTracker.GetLatency()
if latency != expectedLatency {
t.Errorf("unexpected latency, got %s, expected %s", latency, expectedLatency)
}
}