diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filterlatency/filterlatency.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filterlatency/filterlatency.go index 04264230d8d..d42e18233f1 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filterlatency/filterlatency.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filterlatency/filterlatency.go @@ -56,8 +56,8 @@ func TrackStarted(handler http.Handler, name string) http.Handler { // TrackCompleted measures the timestamp the given handler has completed execution and then // it updates the corresponding metric with the filter latency duration. func TrackCompleted(handler http.Handler) http.Handler { - return trackCompleted(handler, utilclock.RealClock{}, func(fr *requestFilterRecord, completedAt time.Time) { - metrics.RecordFilterLatency(fr.name, completedAt.Sub(fr.startedTimestamp)) + return trackCompleted(handler, utilclock.RealClock{}, func(ctx context.Context, fr *requestFilterRecord, completedAt time.Time) { + metrics.RecordFilterLatency(ctx, fr.name, completedAt.Sub(fr.startedTimestamp)) }) } @@ -81,7 +81,7 @@ func trackStarted(handler http.Handler, name string, clock utilclock.PassiveCloc }) } -func trackCompleted(handler http.Handler, clock utilclock.PassiveClock, action func(*requestFilterRecord, time.Time)) http.Handler { +func trackCompleted(handler http.Handler, clock utilclock.PassiveClock, action func(context.Context, *requestFilterRecord, time.Time)) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // The previous filter has just completed. completedAt := clock.Now() @@ -90,7 +90,7 @@ func trackCompleted(handler http.Handler, clock utilclock.PassiveClock, action f ctx := r.Context() if fr := requestFilterRecordFrom(ctx); fr != nil { - action(fr, completedAt) + action(ctx, fr, completedAt) } }) } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filterlatency/filterlatency_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filterlatency/filterlatency_test.go index 41437407ae6..f12e9456489 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filterlatency/filterlatency_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filterlatency/filterlatency_test.go @@ -17,6 +17,7 @@ limitations under the License. package filterlatency import ( + "context" "net/http" "net/http/httptest" "testing" @@ -120,7 +121,7 @@ func TestTrackCompletedContextHasFilterRecord(t *testing.T) { }) requestFilterEndedAt := time.Now() - wrapped := trackCompleted(handler, utilclock.NewFakeClock(requestFilterEndedAt), func(fr *requestFilterRecord, completedAt time.Time) { + wrapped := trackCompleted(handler, utilclock.NewFakeClock(requestFilterEndedAt), func(_ context.Context, fr *requestFilterRecord, completedAt time.Time) { actionCallCount++ filterRecordGot = fr filterCompletedAtGot = completedAt @@ -156,7 +157,7 @@ func TestTrackCompletedContextDoesNotHaveFilterRecord(t *testing.T) { handlerCallCount++ }) - wrapped := trackCompleted(handler, utilclock.NewFakeClock(time.Now()), func(_ *requestFilterRecord, _ time.Time) { + wrapped := trackCompleted(handler, utilclock.NewFakeClock(time.Now()), func(_ context.Context, _ *requestFilterRecord, _ time.Time) { actionCallCount++ }) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit_test.go index d2bb0facc6e..e1531b42103 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit_test.go @@ -93,14 +93,14 @@ func (*fancyResponseWriter) Flush() {} func (*fancyResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { return nil, nil, nil } func TestConstructResponseWriter(t *testing.T) { - actual := decorateResponseWriter(context.TODO(), &simpleResponseWriter{}, nil, nil, nil) + actual := decorateResponseWriter(context.Background(), &simpleResponseWriter{}, nil, nil, nil) switch v := actual.(type) { case *auditResponseWriter: default: t.Errorf("Expected auditResponseWriter, got %v", reflect.TypeOf(v)) } - actual = decorateResponseWriter(context.TODO(), &fancyResponseWriter{}, nil, nil, nil) + actual = decorateResponseWriter(context.Background(), &fancyResponseWriter{}, nil, nil, nil) switch v := actual.(type) { case *fancyResponseWriterDelegator: default: @@ -110,7 +110,7 @@ func TestConstructResponseWriter(t *testing.T) { func TestDecorateResponseWriterWithoutChannel(t *testing.T) { ev := &auditinternal.Event{} - actual := decorateResponseWriter(context.TODO(), &simpleResponseWriter{}, ev, nil, nil) + actual := decorateResponseWriter(context.Background(), &simpleResponseWriter{}, ev, nil, nil) // write status. This will not block because firstEventSentCh is nil actual.WriteHeader(42) @@ -124,7 +124,7 @@ func TestDecorateResponseWriterWithoutChannel(t *testing.T) { func TestDecorateResponseWriterWithImplicitWrite(t *testing.T) { ev := &auditinternal.Event{} - actual := decorateResponseWriter(context.TODO(), &simpleResponseWriter{}, ev, nil, nil) + actual := decorateResponseWriter(context.Background(), &simpleResponseWriter{}, ev, nil, nil) // write status. This will not block because firstEventSentCh is nil actual.Write([]byte("foo")) @@ -139,7 +139,7 @@ func TestDecorateResponseWriterWithImplicitWrite(t *testing.T) { func TestDecorateResponseWriterChannel(t *testing.T) { sink := &fakeAuditSink{} ev := &auditinternal.Event{} - actual := decorateResponseWriter(context.TODO(), &simpleResponseWriter{}, ev, sink, nil) + actual := decorateResponseWriter(context.Background(), &simpleResponseWriter{}, ev, sink, nil) done := make(chan struct{}) go func() { diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/authentication.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/authentication.go index c133c66721b..54e77467c0b 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/authentication.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/authentication.go @@ -47,7 +47,7 @@ func WithAuthentication(handler http.Handler, auth authenticator.Request, failed req = req.WithContext(authenticator.WithAudiences(req.Context(), apiAuds)) } resp, ok, err := auth.AuthenticateRequest(req) - defer recordAuthMetrics(resp, ok, err, apiAuds, authenticationStart) + defer recordAuthMetrics(req.Context(), resp, ok, err, apiAuds, authenticationStart) if err != nil || !ok { if err != nil { klog.ErrorS(err, "Unable to authenticate the request") diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/metrics.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/metrics.go index 421c0e0a2ba..c983b562331 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/metrics.go @@ -17,6 +17,7 @@ limitations under the License. package filters import ( + "context" "strings" "time" @@ -75,7 +76,7 @@ func init() { legacyregistry.MustRegister(authenticationLatency) } -func recordAuthMetrics(resp *authenticator.Response, ok bool, err error, apiAudiences authenticator.Audiences, authStart time.Time) { +func recordAuthMetrics(ctx context.Context, resp *authenticator.Response, ok bool, err error, apiAudiences authenticator.Audiences, authStart time.Time) { var resultLabel string switch { @@ -85,11 +86,11 @@ func recordAuthMetrics(resp *authenticator.Response, ok bool, err error, apiAudi resultLabel = failureLabel default: resultLabel = successLabel - authenticatedUserCounter.WithLabelValues(compressUsername(resp.User.GetName())).Inc() + authenticatedUserCounter.WithContext(ctx).WithLabelValues(compressUsername(resp.User.GetName())).Inc() } - authenticatedAttemptsCounter.WithLabelValues(resultLabel).Inc() - authenticationLatency.WithLabelValues(resultLabel).Observe(time.Since(authStart).Seconds()) + authenticatedAttemptsCounter.WithContext(ctx).WithLabelValues(resultLabel).Inc() + authenticationLatency.WithContext(ctx).WithLabelValues(resultLabel).Observe(time.Since(authStart).Seconds()) } // compressUsername maps all possible usernames onto a small set of categories diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/request_deadline.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/request_deadline.go index 1e43cdab20f..bba40b8f7bb 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/request_deadline.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/request_deadline.go @@ -126,7 +126,7 @@ func withFailedRequestAudit(failedHandler http.Handler, statusErr *apierrors.Sta ev.ResponseStatus.Message = statusErr.Error() } - rw := decorateResponseWriter(w, ev, sink, omitStages) + rw := decorateResponseWriter(req.Context(), w, ev, sink, omitStages) failedHandler.ServeHTTP(rw, req) }) } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go index 22945ccf6b8..47fe44b78d7 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go @@ -163,8 +163,8 @@ type WatchServer struct { // or over a websocket connection. func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { kind := s.Scope.Kind - metrics.RegisteredWatchers.WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc() - defer metrics.RegisteredWatchers.WithLabelValues(kind.Group, kind.Version, kind.Kind).Dec() + metrics.RegisteredWatchers.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc() + defer metrics.RegisteredWatchers.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Dec() w = httplog.Unlogged(req, w) @@ -220,7 +220,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { // End of results. return } - metrics.WatchEvents.WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc() + metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc() obj := s.Fixup(event.Object) if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil { @@ -233,7 +233,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { // type unknown.Raw = buf.Bytes() event.Object = &unknown - metrics.WatchEventsSizes.WithLabelValues(kind.Group, kind.Version, kind.Kind).Observe(float64(len(unknown.Raw))) + metrics.WatchEventsSizes.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Observe(float64(len(unknown.Raw))) *outEvent = metav1.WatchEvent{} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go index d4f6068b40e..75f244564ba 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go @@ -18,6 +18,7 @@ package metrics import ( "bufio" + "context" "net" "net/http" "net/url" @@ -324,8 +325,8 @@ func UpdateInflightRequestMetrics(phase string, nonmutating, mutating int) { } } -func RecordFilterLatency(name string, elapsed time.Duration) { - requestFilterDuration.WithLabelValues(name).Observe(elapsed.Seconds()) +func RecordFilterLatency(ctx context.Context, name string, elapsed time.Duration) { + requestFilterDuration.WithContext(ctx).WithLabelValues(name).Observe(elapsed.Seconds()) } // RecordRequestAbort records that the request was aborted possibly due to a timeout. @@ -341,7 +342,7 @@ func RecordRequestAbort(req *http.Request, requestInfo *request.RequestInfo) { group := requestInfo.APIGroup version := requestInfo.APIVersion - requestAbortsTotal.WithLabelValues(reportedVerb, group, version, resource, subresource, scope).Inc() + requestAbortsTotal.WithContext(req.Context()).WithLabelValues(reportedVerb, group, version, resource, subresource, scope).Inc() } // RecordRequestTermination records that the request was terminated early as part of a resource @@ -361,9 +362,9 @@ func RecordRequestTermination(req *http.Request, requestInfo *request.RequestInf reportedVerb := cleanVerb(canonicalVerb(strings.ToUpper(req.Method), scope), req) if requestInfo.IsResourceRequest { - requestTerminationsTotal.WithLabelValues(reportedVerb, requestInfo.APIGroup, requestInfo.APIVersion, requestInfo.Resource, requestInfo.Subresource, scope, component, codeToString(code)).Inc() + requestTerminationsTotal.WithContext(req.Context()).WithLabelValues(reportedVerb, requestInfo.APIGroup, requestInfo.APIVersion, requestInfo.Resource, requestInfo.Subresource, scope, component, codeToString(code)).Inc() } else { - requestTerminationsTotal.WithLabelValues(reportedVerb, "", "", "", requestInfo.Path, scope, component, codeToString(code)).Inc() + requestTerminationsTotal.WithContext(req.Context()).WithLabelValues(reportedVerb, "", "", "", requestInfo.Path, scope, component, codeToString(code)).Inc() } } @@ -383,9 +384,9 @@ func RecordLongRunning(req *http.Request, requestInfo *request.RequestInfo, comp reportedVerb := cleanVerb(canonicalVerb(strings.ToUpper(req.Method), scope), req) if requestInfo.IsResourceRequest { - g = longRunningRequestGauge.WithLabelValues(reportedVerb, requestInfo.APIGroup, requestInfo.APIVersion, requestInfo.Resource, requestInfo.Subresource, scope, component) + g = longRunningRequestGauge.WithContext(req.Context()).WithLabelValues(reportedVerb, requestInfo.APIGroup, requestInfo.APIVersion, requestInfo.Resource, requestInfo.Subresource, scope, component) } else { - g = longRunningRequestGauge.WithLabelValues(reportedVerb, "", "", "", requestInfo.Path, scope, component) + g = longRunningRequestGauge.WithContext(req.Context()).WithLabelValues(reportedVerb, "", "", "", requestInfo.Path, scope, component) } g.Inc() defer g.Dec() @@ -404,23 +405,23 @@ func MonitorRequest(req *http.Request, verb, group, version, resource, subresour dryRun := cleanDryRun(req.URL) elapsedSeconds := elapsed.Seconds() cleanContentType := cleanContentType(contentType) - requestCounter.WithLabelValues(reportedVerb, dryRun, group, version, resource, subresource, scope, component, cleanContentType, codeToString(httpCode)).Inc() + requestCounter.WithContext(req.Context()).WithLabelValues(reportedVerb, dryRun, group, version, resource, subresource, scope, component, cleanContentType, codeToString(httpCode)).Inc() // MonitorRequest happens after authentication, so we can trust the username given by the request info, ok := request.UserFrom(req.Context()) if ok && info.GetName() == user.APIServerUser { - apiSelfRequestCounter.WithLabelValues(reportedVerb, resource, subresource).Inc() + apiSelfRequestCounter.WithContext(req.Context()).WithLabelValues(reportedVerb, resource, subresource).Inc() } if deprecated { - deprecatedRequestGauge.WithLabelValues(group, version, resource, subresource, removedRelease).Set(1) + deprecatedRequestGauge.WithContext(req.Context()).WithLabelValues(group, version, resource, subresource, removedRelease).Set(1) audit.AddAuditAnnotation(req.Context(), deprecatedAnnotationKey, "true") if len(removedRelease) > 0 { audit.AddAuditAnnotation(req.Context(), removedReleaseAnnotationKey, removedRelease) } } - requestLatencies.WithLabelValues(reportedVerb, dryRun, group, version, resource, subresource, scope, component).Observe(elapsedSeconds) + requestLatencies.WithContext(req.Context()).WithLabelValues(reportedVerb, dryRun, group, version, resource, subresource, scope, component).Observe(elapsedSeconds) // We are only interested in response sizes of read requests. if verb == "GET" || verb == "LIST" { - responseSizes.WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component).Observe(float64(respSize)) + responseSizes.WithContext(req.Context()).WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component).Observe(float64(respSize)) } } diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go index e873351c70b..2484bfc76c8 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go @@ -196,9 +196,9 @@ func WithMaxInFlightLimit( } // We need to split this data between buckets used for throttling. if isMutatingRequest { - metrics.DroppedRequests.WithLabelValues(metrics.MutatingKind).Inc() + metrics.DroppedRequests.WithContext(ctx).WithLabelValues(metrics.MutatingKind).Inc() } else { - metrics.DroppedRequests.WithLabelValues(metrics.ReadOnlyKind).Inc() + metrics.DroppedRequests.WithContext(ctx).WithLabelValues(metrics.ReadOnlyKind).Inc() } metrics.RecordRequestTermination(r, requestInfo, metrics.APIServerComponent, http.StatusTooManyRequests) tooManyRequests(r, w) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go index 6b17d4c8462..6f604243112 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go @@ -141,9 +141,9 @@ func WithPriorityAndFairness( }, execute) if !served { if isMutatingRequest { - epmetrics.DroppedRequests.WithLabelValues(epmetrics.MutatingKind).Inc() + epmetrics.DroppedRequests.WithContext(ctx).WithLabelValues(epmetrics.MutatingKind).Inc() } else { - epmetrics.DroppedRequests.WithLabelValues(epmetrics.ReadOnlyKind).Inc() + epmetrics.DroppedRequests.WithContext(ctx).WithLabelValues(epmetrics.ReadOnlyKind).Inc() } epmetrics.RecordRequestTermination(r, requestInfo, epmetrics.APIServerComponent, http.StatusTooManyRequests) tooManyRequests(r, w)