From 772b1f4cd84a738f632716e28d4067c00f0b7f13 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Tue, 19 Sep 2023 03:05:00 +0200 Subject: [PATCH 1/2] handlers/watch: calculate and record WatchList latency metric. --- .../apiserver/pkg/endpoints/handlers/get.go | 2 +- .../apiserver/pkg/endpoints/handlers/watch.go | 29 ++++++++++++++++++- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go index c110964fc42..d3b501cf52a 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go @@ -267,7 +267,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc } requestInfo, _ := request.RequestInfoFrom(ctx) metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() { - serveWatch(watcher, scope, outputMediaType, req, w, timeout) + serveWatch(watcher, scope, outputMediaType, req, w, timeout, metrics.CleanListScope(ctx, &opts)) }) return } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go index d15819f1145..e8eb0bfc263 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go @@ -34,6 +34,9 @@ import ( "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" "k8s.io/apiserver/pkg/endpoints/metrics" apirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/features" + "k8s.io/apiserver/pkg/storage" + utilfeature "k8s.io/apiserver/pkg/util/feature" ) // nothing will ever be sent down this channel @@ -61,7 +64,7 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) { // serveWatch will serve a watch response. // TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled. -func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration) { +func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration, metricsScope string) { defer watcher.Stop() options, err := optionsForTransform(mediaTypeOptions, req) @@ -153,6 +156,8 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n TimeoutFactory: &realTimeoutFactory{timeout}, ServerShuttingDownCh: serverShuttingDownCh, + + metricsScope: metricsScope, } server.ServeHTTP(w, req) @@ -176,6 +181,8 @@ type WatchServer struct { TimeoutFactory TimeoutFactory ServerShuttingDownCh <-chan struct{} + + metricsScope string } // ServeHTTP serves a series of encoded events via HTTP with Transfer-Encoding: chunked @@ -247,6 +254,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc() + isWatchListLatencyRecordingRequired := shouldRecordWatchListLatency(event) if err := s.EmbeddedEncoder.Encode(event.Object, buf); err != nil { // unexpected error @@ -280,6 +288,9 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { if len(ch) == 0 { flusher.Flush() } + if isWatchListLatencyRecordingRequired { + metrics.RecordWatchListLatency(req.Context(), s.Scope.Resource, s.metricsScope) + } buf.Reset() } @@ -360,3 +371,19 @@ func (s *WatchServer) HandleWS(ws *websocket.Conn) { } } } + +func shouldRecordWatchListLatency(event watch.Event) bool { + if event.Type != watch.Bookmark || !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) { + return false + } + // as of today the initial-events-end annotation is added only to a single event + // by the watch cache and only when certain conditions are met + // + // for more please read https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list + hasAnnotation, err := storage.HasInitialEventsEndBookmarkAnnotation(event.Object) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to determine if the obj has the required annotation for measuring watchlist latency, obj %T: %v", event.Object, err)) + return false + } + return hasAnnotation +} From a97f4b7a3123c9768ec7136b6ca32be926e16cd6 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Tue, 19 Sep 2023 03:05:37 +0200 Subject: [PATCH 2/2] endpoints/metrics: define watchListLatencies metric and associated functions --- .../pkg/endpoints/metrics/metrics.go | 73 +++++++++++++++++++ .../pkg/endpoints/metrics/metrics_test.go | 61 ++++++++++++++++ 2 files changed, 134 insertions(+) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go index ba2aed69d44..48fc951adee 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go @@ -18,6 +18,7 @@ package metrics import ( "context" + "fmt" "net/http" "net/url" "strconv" @@ -26,8 +27,12 @@ import ( "time" restful "github.com/emicklei/go-restful/v3" + + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" "k8s.io/apimachinery/pkg/apis/meta/v1/validation" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilsets "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/authentication/user" @@ -280,6 +285,17 @@ var ( []string{"code_path"}, ) + watchListLatencies = compbasemetrics.NewHistogramVec( + &compbasemetrics.HistogramOpts{ + Subsystem: APIServerComponent, + Name: "watch_list_duration_seconds", + Help: "Response latency distribution in seconds for watch list requests broken by group, version, resource and scope.", + Buckets: []float64{0.05, 0.1, 0.2, 0.4, 0.6, 0.8, 1.0, 2, 4, 6, 8, 10, 15, 20, 30, 45, 60}, + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{"group", "version", "resource", "scope"}, + ) + metrics = []resettableCollector{ deprecatedRequestGauge, requestCounter, @@ -300,6 +316,7 @@ var ( requestAbortsTotal, requestPostTimeoutTotal, requestTimestampComparisonDuration, + watchListLatencies, } // these are the valid request methods which we report in our metrics. Any other request methods @@ -511,6 +528,18 @@ func RecordLongRunning(req *http.Request, requestInfo *request.RequestInfo, comp fn() } +// RecordWatchListLatency simply records response latency for watch list requests. +func RecordWatchListLatency(ctx context.Context, gvr schema.GroupVersionResource, metricsScope string) { + requestReceivedTimestamp, ok := request.ReceivedTimestampFrom(ctx) + if !ok { + utilruntime.HandleError(fmt.Errorf("unable to measure watchlist latency because no received ts found in the ctx, gvr: %s", gvr)) + return + } + elapsedSeconds := time.Since(requestReceivedTimestamp).Seconds() + + watchListLatencies.WithContext(ctx).WithLabelValues(gvr.Group, gvr.Version, gvr.Resource, metricsScope).Observe(elapsedSeconds) +} + // MonitorRequest handles standard transformations for client and the reported verb and then invokes Monitor to record // a request. verb must be uppercase to be backwards compatible with existing monitoring tooling. func MonitorRequest(req *http.Request, verb, group, version, resource, subresource, scope, component string, deprecated bool, removedRelease string, httpCode, respSize int, elapsed time.Duration) { @@ -621,6 +650,26 @@ func CleanScope(requestInfo *request.RequestInfo) string { return "" } +// CleanListScope computes the request scope for metrics. +// +// Note that normally we would use CleanScope for computation. +// But due to the same reasons mentioned in determineRequestNamespaceAndName we cannot. +func CleanListScope(ctx context.Context, opts *metainternalversion.ListOptions) string { + namespace, name := determineRequestNamespaceAndName(ctx, opts) + if len(name) > 0 { + return "resource" + } + if len(namespace) > 0 { + return "namespace" + } + if requestInfo, ok := request.RequestInfoFrom(ctx); ok { + if requestInfo.IsResourceRequest { + return "cluster" + } + } + return "" +} + // CanonicalVerb distinguishes LISTs from GETs (and HEADs). It assumes verb is // UPPERCASE. func CanonicalVerb(verb string, scope string) string { @@ -655,6 +704,30 @@ func CleanVerb(verb string, request *http.Request, requestInfo *request.RequestI return reportedVerb } +// determineRequestNamespaceAndName computes name and namespace for the given requests +// +// note that the logic of this function was copy&pasted from cacher.go +// after an unsuccessful attempt of moving it to RequestInfo +// +// see: https://github.com/kubernetes/kubernetes/pull/120520 +func determineRequestNamespaceAndName(ctx context.Context, opts *metainternalversion.ListOptions) (namespace, name string) { + if requestNamespace, ok := request.NamespaceFrom(ctx); ok && len(requestNamespace) > 0 { + namespace = requestNamespace + } else if opts != nil && opts.FieldSelector != nil { + if selectorNamespace, ok := opts.FieldSelector.RequiresExactMatch("metadata.namespace"); ok { + namespace = selectorNamespace + } + } + if requestInfo, ok := request.RequestInfoFrom(ctx); ok && requestInfo != nil && len(requestInfo.Name) > 0 { + name = requestInfo.Name + } else if opts != nil && opts.FieldSelector != nil { + if selectorName, ok := opts.FieldSelector.RequiresExactMatch("metadata.name"); ok { + name = selectorName + } + } + return +} + // cleanVerb additionally ensures that unknown verbs don't clog up the metrics. func cleanVerb(verb, suggestedVerb string, request *http.Request, requestInfo *request.RequestInfo) string { // CanonicalVerb (being an input for this function) doesn't handle correctly the diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics_test.go index 562230f2137..d640dd4c62c 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics_test.go @@ -17,11 +17,14 @@ limitations under the License. package metrics import ( + "context" "net/http" "net/url" "strings" "testing" + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/endpoints/responsewriter" "k8s.io/component-base/metrics/legacyregistry" @@ -466,3 +469,61 @@ func TestRecordDroppedRequests(t *testing.T) { }) } } + +func TestCleanListScope(t *testing.T) { + scenarios := []struct { + name string + ctx context.Context + opts *metainternalversion.ListOptions + expectedScope string + }{ + { + name: "empty scope", + }, + { + name: "empty scope with empty request info", + ctx: request.WithRequestInfo(context.TODO(), &request.RequestInfo{}), + }, + { + name: "namespace from ctx", + ctx: request.WithNamespace(context.TODO(), "foo"), + expectedScope: "namespace", + }, + { + name: "namespace from field selector", + opts: &metainternalversion.ListOptions{ + FieldSelector: fields.ParseSelectorOrDie("metadata.namespace=foo"), + }, + expectedScope: "namespace", + }, + { + name: "name from request info", + ctx: request.WithRequestInfo(context.TODO(), &request.RequestInfo{Name: "bar"}), + expectedScope: "resource", + }, + { + name: "name from field selector", + opts: &metainternalversion.ListOptions{ + FieldSelector: fields.ParseSelectorOrDie("metadata.name=bar"), + }, + expectedScope: "resource", + }, + { + name: "cluster scope request", + ctx: request.WithRequestInfo(context.TODO(), &request.RequestInfo{IsResourceRequest: true}), + expectedScope: "cluster", + }, + } + + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + if scenario.ctx == nil { + scenario.ctx = context.TODO() + } + actualScope := CleanListScope(scenario.ctx, scenario.opts) + if actualScope != scenario.expectedScope { + t.Errorf("unexpected scope = %s, expected = %s", actualScope, scenario.expectedScope) + } + }) + } +}