add context to metrics in apiserver/endpoint

This commit is contained in:
yoyinzyc 2021-01-20 13:13:00 -08:00
parent 5311d711ec
commit 266d67bd51
10 changed files with 40 additions and 37 deletions

View File

@ -56,8 +56,8 @@ func TrackStarted(handler http.Handler, name string) http.Handler {
// TrackCompleted measures the timestamp the given handler has completed execution and then // TrackCompleted measures the timestamp the given handler has completed execution and then
// it updates the corresponding metric with the filter latency duration. // it updates the corresponding metric with the filter latency duration.
func TrackCompleted(handler http.Handler) http.Handler { func TrackCompleted(handler http.Handler) http.Handler {
return trackCompleted(handler, utilclock.RealClock{}, func(fr *requestFilterRecord, completedAt time.Time) { return trackCompleted(handler, utilclock.RealClock{}, func(ctx context.Context, fr *requestFilterRecord, completedAt time.Time) {
metrics.RecordFilterLatency(fr.name, completedAt.Sub(fr.startedTimestamp)) metrics.RecordFilterLatency(ctx, fr.name, completedAt.Sub(fr.startedTimestamp))
}) })
} }
@ -81,7 +81,7 @@ func trackStarted(handler http.Handler, name string, clock utilclock.PassiveCloc
}) })
} }
func trackCompleted(handler http.Handler, clock utilclock.PassiveClock, action func(*requestFilterRecord, time.Time)) http.Handler { func trackCompleted(handler http.Handler, clock utilclock.PassiveClock, action func(context.Context, *requestFilterRecord, time.Time)) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// The previous filter has just completed. // The previous filter has just completed.
completedAt := clock.Now() completedAt := clock.Now()
@ -90,7 +90,7 @@ func trackCompleted(handler http.Handler, clock utilclock.PassiveClock, action f
ctx := r.Context() ctx := r.Context()
if fr := requestFilterRecordFrom(ctx); fr != nil { if fr := requestFilterRecordFrom(ctx); fr != nil {
action(fr, completedAt) action(ctx, fr, completedAt)
} }
}) })
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package filterlatency package filterlatency
import ( import (
"context"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"testing" "testing"
@ -120,7 +121,7 @@ func TestTrackCompletedContextHasFilterRecord(t *testing.T) {
}) })
requestFilterEndedAt := time.Now() requestFilterEndedAt := time.Now()
wrapped := trackCompleted(handler, utilclock.NewFakeClock(requestFilterEndedAt), func(fr *requestFilterRecord, completedAt time.Time) { wrapped := trackCompleted(handler, utilclock.NewFakeClock(requestFilterEndedAt), func(_ context.Context, fr *requestFilterRecord, completedAt time.Time) {
actionCallCount++ actionCallCount++
filterRecordGot = fr filterRecordGot = fr
filterCompletedAtGot = completedAt filterCompletedAtGot = completedAt
@ -156,7 +157,7 @@ func TestTrackCompletedContextDoesNotHaveFilterRecord(t *testing.T) {
handlerCallCount++ handlerCallCount++
}) })
wrapped := trackCompleted(handler, utilclock.NewFakeClock(time.Now()), func(_ *requestFilterRecord, _ time.Time) { wrapped := trackCompleted(handler, utilclock.NewFakeClock(time.Now()), func(_ context.Context, _ *requestFilterRecord, _ time.Time) {
actionCallCount++ actionCallCount++
}) })

View File

@ -93,14 +93,14 @@ func (*fancyResponseWriter) Flush() {}
func (*fancyResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { return nil, nil, nil } func (*fancyResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { return nil, nil, nil }
func TestConstructResponseWriter(t *testing.T) { func TestConstructResponseWriter(t *testing.T) {
actual := decorateResponseWriter(context.TODO(), &simpleResponseWriter{}, nil, nil, nil) actual := decorateResponseWriter(context.Background(), &simpleResponseWriter{}, nil, nil, nil)
switch v := actual.(type) { switch v := actual.(type) {
case *auditResponseWriter: case *auditResponseWriter:
default: default:
t.Errorf("Expected auditResponseWriter, got %v", reflect.TypeOf(v)) t.Errorf("Expected auditResponseWriter, got %v", reflect.TypeOf(v))
} }
actual = decorateResponseWriter(context.TODO(), &fancyResponseWriter{}, nil, nil, nil) actual = decorateResponseWriter(context.Background(), &fancyResponseWriter{}, nil, nil, nil)
switch v := actual.(type) { switch v := actual.(type) {
case *fancyResponseWriterDelegator: case *fancyResponseWriterDelegator:
default: default:
@ -110,7 +110,7 @@ func TestConstructResponseWriter(t *testing.T) {
func TestDecorateResponseWriterWithoutChannel(t *testing.T) { func TestDecorateResponseWriterWithoutChannel(t *testing.T) {
ev := &auditinternal.Event{} ev := &auditinternal.Event{}
actual := decorateResponseWriter(context.TODO(), &simpleResponseWriter{}, ev, nil, nil) actual := decorateResponseWriter(context.Background(), &simpleResponseWriter{}, ev, nil, nil)
// write status. This will not block because firstEventSentCh is nil // write status. This will not block because firstEventSentCh is nil
actual.WriteHeader(42) actual.WriteHeader(42)
@ -124,7 +124,7 @@ func TestDecorateResponseWriterWithoutChannel(t *testing.T) {
func TestDecorateResponseWriterWithImplicitWrite(t *testing.T) { func TestDecorateResponseWriterWithImplicitWrite(t *testing.T) {
ev := &auditinternal.Event{} ev := &auditinternal.Event{}
actual := decorateResponseWriter(context.TODO(), &simpleResponseWriter{}, ev, nil, nil) actual := decorateResponseWriter(context.Background(), &simpleResponseWriter{}, ev, nil, nil)
// write status. This will not block because firstEventSentCh is nil // write status. This will not block because firstEventSentCh is nil
actual.Write([]byte("foo")) actual.Write([]byte("foo"))
@ -139,7 +139,7 @@ func TestDecorateResponseWriterWithImplicitWrite(t *testing.T) {
func TestDecorateResponseWriterChannel(t *testing.T) { func TestDecorateResponseWriterChannel(t *testing.T) {
sink := &fakeAuditSink{} sink := &fakeAuditSink{}
ev := &auditinternal.Event{} ev := &auditinternal.Event{}
actual := decorateResponseWriter(context.TODO(), &simpleResponseWriter{}, ev, sink, nil) actual := decorateResponseWriter(context.Background(), &simpleResponseWriter{}, ev, sink, nil)
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {

View File

@ -47,7 +47,7 @@ func WithAuthentication(handler http.Handler, auth authenticator.Request, failed
req = req.WithContext(authenticator.WithAudiences(req.Context(), apiAuds)) req = req.WithContext(authenticator.WithAudiences(req.Context(), apiAuds))
} }
resp, ok, err := auth.AuthenticateRequest(req) resp, ok, err := auth.AuthenticateRequest(req)
defer recordAuthMetrics(resp, ok, err, apiAuds, authenticationStart) defer recordAuthMetrics(req.Context(), resp, ok, err, apiAuds, authenticationStart)
if err != nil || !ok { if err != nil || !ok {
if err != nil { if err != nil {
klog.ErrorS(err, "Unable to authenticate the request") klog.ErrorS(err, "Unable to authenticate the request")

View File

@ -17,6 +17,7 @@ limitations under the License.
package filters package filters
import ( import (
"context"
"strings" "strings"
"time" "time"
@ -75,7 +76,7 @@ func init() {
legacyregistry.MustRegister(authenticationLatency) legacyregistry.MustRegister(authenticationLatency)
} }
func recordAuthMetrics(resp *authenticator.Response, ok bool, err error, apiAudiences authenticator.Audiences, authStart time.Time) { func recordAuthMetrics(ctx context.Context, resp *authenticator.Response, ok bool, err error, apiAudiences authenticator.Audiences, authStart time.Time) {
var resultLabel string var resultLabel string
switch { switch {
@ -85,11 +86,11 @@ func recordAuthMetrics(resp *authenticator.Response, ok bool, err error, apiAudi
resultLabel = failureLabel resultLabel = failureLabel
default: default:
resultLabel = successLabel resultLabel = successLabel
authenticatedUserCounter.WithLabelValues(compressUsername(resp.User.GetName())).Inc() authenticatedUserCounter.WithContext(ctx).WithLabelValues(compressUsername(resp.User.GetName())).Inc()
} }
authenticatedAttemptsCounter.WithLabelValues(resultLabel).Inc() authenticatedAttemptsCounter.WithContext(ctx).WithLabelValues(resultLabel).Inc()
authenticationLatency.WithLabelValues(resultLabel).Observe(time.Since(authStart).Seconds()) authenticationLatency.WithContext(ctx).WithLabelValues(resultLabel).Observe(time.Since(authStart).Seconds())
} }
// compressUsername maps all possible usernames onto a small set of categories // compressUsername maps all possible usernames onto a small set of categories

View File

@ -126,7 +126,7 @@ func withFailedRequestAudit(failedHandler http.Handler, statusErr *apierrors.Sta
ev.ResponseStatus.Message = statusErr.Error() ev.ResponseStatus.Message = statusErr.Error()
} }
rw := decorateResponseWriter(w, ev, sink, omitStages) rw := decorateResponseWriter(req.Context(), w, ev, sink, omitStages)
failedHandler.ServeHTTP(rw, req) failedHandler.ServeHTTP(rw, req)
}) })
} }

View File

@ -163,8 +163,8 @@ type WatchServer struct {
// or over a websocket connection. // or over a websocket connection.
func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
kind := s.Scope.Kind kind := s.Scope.Kind
metrics.RegisteredWatchers.WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc() metrics.RegisteredWatchers.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
defer metrics.RegisteredWatchers.WithLabelValues(kind.Group, kind.Version, kind.Kind).Dec() defer metrics.RegisteredWatchers.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Dec()
w = httplog.Unlogged(req, w) w = httplog.Unlogged(req, w)
@ -220,7 +220,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// End of results. // End of results.
return return
} }
metrics.WatchEvents.WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc() metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
obj := s.Fixup(event.Object) obj := s.Fixup(event.Object)
if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil { if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil {
@ -233,7 +233,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// type // type
unknown.Raw = buf.Bytes() unknown.Raw = buf.Bytes()
event.Object = &unknown event.Object = &unknown
metrics.WatchEventsSizes.WithLabelValues(kind.Group, kind.Version, kind.Kind).Observe(float64(len(unknown.Raw))) metrics.WatchEventsSizes.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Observe(float64(len(unknown.Raw)))
*outEvent = metav1.WatchEvent{} *outEvent = metav1.WatchEvent{}

View File

@ -18,6 +18,7 @@ package metrics
import ( import (
"bufio" "bufio"
"context"
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
@ -324,8 +325,8 @@ func UpdateInflightRequestMetrics(phase string, nonmutating, mutating int) {
} }
} }
func RecordFilterLatency(name string, elapsed time.Duration) { func RecordFilterLatency(ctx context.Context, name string, elapsed time.Duration) {
requestFilterDuration.WithLabelValues(name).Observe(elapsed.Seconds()) requestFilterDuration.WithContext(ctx).WithLabelValues(name).Observe(elapsed.Seconds())
} }
// RecordRequestAbort records that the request was aborted possibly due to a timeout. // RecordRequestAbort records that the request was aborted possibly due to a timeout.
@ -341,7 +342,7 @@ func RecordRequestAbort(req *http.Request, requestInfo *request.RequestInfo) {
group := requestInfo.APIGroup group := requestInfo.APIGroup
version := requestInfo.APIVersion version := requestInfo.APIVersion
requestAbortsTotal.WithLabelValues(reportedVerb, group, version, resource, subresource, scope).Inc() requestAbortsTotal.WithContext(req.Context()).WithLabelValues(reportedVerb, group, version, resource, subresource, scope).Inc()
} }
// RecordRequestTermination records that the request was terminated early as part of a resource // RecordRequestTermination records that the request was terminated early as part of a resource
@ -361,9 +362,9 @@ func RecordRequestTermination(req *http.Request, requestInfo *request.RequestInf
reportedVerb := cleanVerb(canonicalVerb(strings.ToUpper(req.Method), scope), req) reportedVerb := cleanVerb(canonicalVerb(strings.ToUpper(req.Method), scope), req)
if requestInfo.IsResourceRequest { if requestInfo.IsResourceRequest {
requestTerminationsTotal.WithLabelValues(reportedVerb, requestInfo.APIGroup, requestInfo.APIVersion, requestInfo.Resource, requestInfo.Subresource, scope, component, codeToString(code)).Inc() requestTerminationsTotal.WithContext(req.Context()).WithLabelValues(reportedVerb, requestInfo.APIGroup, requestInfo.APIVersion, requestInfo.Resource, requestInfo.Subresource, scope, component, codeToString(code)).Inc()
} else { } else {
requestTerminationsTotal.WithLabelValues(reportedVerb, "", "", "", requestInfo.Path, scope, component, codeToString(code)).Inc() requestTerminationsTotal.WithContext(req.Context()).WithLabelValues(reportedVerb, "", "", "", requestInfo.Path, scope, component, codeToString(code)).Inc()
} }
} }
@ -383,9 +384,9 @@ func RecordLongRunning(req *http.Request, requestInfo *request.RequestInfo, comp
reportedVerb := cleanVerb(canonicalVerb(strings.ToUpper(req.Method), scope), req) reportedVerb := cleanVerb(canonicalVerb(strings.ToUpper(req.Method), scope), req)
if requestInfo.IsResourceRequest { if requestInfo.IsResourceRequest {
g = longRunningRequestGauge.WithLabelValues(reportedVerb, requestInfo.APIGroup, requestInfo.APIVersion, requestInfo.Resource, requestInfo.Subresource, scope, component) g = longRunningRequestGauge.WithContext(req.Context()).WithLabelValues(reportedVerb, requestInfo.APIGroup, requestInfo.APIVersion, requestInfo.Resource, requestInfo.Subresource, scope, component)
} else { } else {
g = longRunningRequestGauge.WithLabelValues(reportedVerb, "", "", "", requestInfo.Path, scope, component) g = longRunningRequestGauge.WithContext(req.Context()).WithLabelValues(reportedVerb, "", "", "", requestInfo.Path, scope, component)
} }
g.Inc() g.Inc()
defer g.Dec() defer g.Dec()
@ -404,23 +405,23 @@ func MonitorRequest(req *http.Request, verb, group, version, resource, subresour
dryRun := cleanDryRun(req.URL) dryRun := cleanDryRun(req.URL)
elapsedSeconds := elapsed.Seconds() elapsedSeconds := elapsed.Seconds()
cleanContentType := cleanContentType(contentType) cleanContentType := cleanContentType(contentType)
requestCounter.WithLabelValues(reportedVerb, dryRun, group, version, resource, subresource, scope, component, cleanContentType, codeToString(httpCode)).Inc() requestCounter.WithContext(req.Context()).WithLabelValues(reportedVerb, dryRun, group, version, resource, subresource, scope, component, cleanContentType, codeToString(httpCode)).Inc()
// MonitorRequest happens after authentication, so we can trust the username given by the request // MonitorRequest happens after authentication, so we can trust the username given by the request
info, ok := request.UserFrom(req.Context()) info, ok := request.UserFrom(req.Context())
if ok && info.GetName() == user.APIServerUser { if ok && info.GetName() == user.APIServerUser {
apiSelfRequestCounter.WithLabelValues(reportedVerb, resource, subresource).Inc() apiSelfRequestCounter.WithContext(req.Context()).WithLabelValues(reportedVerb, resource, subresource).Inc()
} }
if deprecated { if deprecated {
deprecatedRequestGauge.WithLabelValues(group, version, resource, subresource, removedRelease).Set(1) deprecatedRequestGauge.WithContext(req.Context()).WithLabelValues(group, version, resource, subresource, removedRelease).Set(1)
audit.AddAuditAnnotation(req.Context(), deprecatedAnnotationKey, "true") audit.AddAuditAnnotation(req.Context(), deprecatedAnnotationKey, "true")
if len(removedRelease) > 0 { if len(removedRelease) > 0 {
audit.AddAuditAnnotation(req.Context(), removedReleaseAnnotationKey, removedRelease) audit.AddAuditAnnotation(req.Context(), removedReleaseAnnotationKey, removedRelease)
} }
} }
requestLatencies.WithLabelValues(reportedVerb, dryRun, group, version, resource, subresource, scope, component).Observe(elapsedSeconds) requestLatencies.WithContext(req.Context()).WithLabelValues(reportedVerb, dryRun, group, version, resource, subresource, scope, component).Observe(elapsedSeconds)
// We are only interested in response sizes of read requests. // We are only interested in response sizes of read requests.
if verb == "GET" || verb == "LIST" { if verb == "GET" || verb == "LIST" {
responseSizes.WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component).Observe(float64(respSize)) responseSizes.WithContext(req.Context()).WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component).Observe(float64(respSize))
} }
} }

View File

@ -196,9 +196,9 @@ func WithMaxInFlightLimit(
} }
// We need to split this data between buckets used for throttling. // We need to split this data between buckets used for throttling.
if isMutatingRequest { if isMutatingRequest {
metrics.DroppedRequests.WithLabelValues(metrics.MutatingKind).Inc() metrics.DroppedRequests.WithContext(ctx).WithLabelValues(metrics.MutatingKind).Inc()
} else { } else {
metrics.DroppedRequests.WithLabelValues(metrics.ReadOnlyKind).Inc() metrics.DroppedRequests.WithContext(ctx).WithLabelValues(metrics.ReadOnlyKind).Inc()
} }
metrics.RecordRequestTermination(r, requestInfo, metrics.APIServerComponent, http.StatusTooManyRequests) metrics.RecordRequestTermination(r, requestInfo, metrics.APIServerComponent, http.StatusTooManyRequests)
tooManyRequests(r, w) tooManyRequests(r, w)

View File

@ -141,9 +141,9 @@ func WithPriorityAndFairness(
}, execute) }, execute)
if !served { if !served {
if isMutatingRequest { if isMutatingRequest {
epmetrics.DroppedRequests.WithLabelValues(epmetrics.MutatingKind).Inc() epmetrics.DroppedRequests.WithContext(ctx).WithLabelValues(epmetrics.MutatingKind).Inc()
} else { } else {
epmetrics.DroppedRequests.WithLabelValues(epmetrics.ReadOnlyKind).Inc() epmetrics.DroppedRequests.WithContext(ctx).WithLabelValues(epmetrics.ReadOnlyKind).Inc()
} }
epmetrics.RecordRequestTermination(r, requestInfo, epmetrics.APIServerComponent, http.StatusTooManyRequests) epmetrics.RecordRequestTermination(r, requestInfo, epmetrics.APIServerComponent, http.StatusTooManyRequests)
tooManyRequests(r, w) tooManyRequests(r, w)