diff --git a/staging/src/k8s.io/apiserver/pkg/audit/metrics.go b/staging/src/k8s.io/apiserver/pkg/audit/metrics.go index 96166e65454..dc1eb5a7f15 100644 --- a/staging/src/k8s.io/apiserver/pkg/audit/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/audit/metrics.go @@ -17,6 +17,7 @@ limitations under the License. package audit import ( + "context" "fmt" auditinternal "k8s.io/apiserver/pkg/apis/audit" @@ -84,13 +85,13 @@ func init() { } // ObserveEvent updates the relevant prometheus metrics for the generated audit event. -func ObserveEvent() { - eventCounter.Inc() +func ObserveEvent(ctx context.Context) { + eventCounter.WithContext(ctx).Inc() } // ObservePolicyLevel updates the relevant prometheus metrics with the audit level for a request. -func ObservePolicyLevel(level auditinternal.Level) { - levelCounter.WithLabelValues(string(level)).Inc() +func ObservePolicyLevel(ctx context.Context, level auditinternal.Level) { + levelCounter.WithContext(ctx).WithLabelValues(string(level)).Inc() } // HandlePluginError handles an error that occurred in an audit plugin. This method should only be diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit.go index 891d6093549..2f78ff1de64 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit.go @@ -18,6 +18,7 @@ package filters import ( "bufio" + "context" "errors" "fmt" "net" @@ -56,8 +57,8 @@ func WithAudit(handler http.Handler, sink audit.Sink, policy policy.Checker, lon } ev.Stage = auditinternal.StageRequestReceived - if processed := processAuditEvent(sink, ev, omitStages); !processed { - audit.ApiserverAuditDroppedCounter.Inc() + if processed := processAuditEvent(ctx, sink, ev, omitStages); !processed { + audit.ApiserverAuditDroppedCounter.WithContext(ctx).Inc() responsewriters.InternalError(w, req, errors.New("failed to store audit event")) return } @@ -70,7 +71,7 @@ func WithAudit(handler http.Handler, sink audit.Sink, policy policy.Checker, lon longRunningSink = sink } } - respWriter := decorateResponseWriter(w, ev, longRunningSink, omitStages) + respWriter := decorateResponseWriter(ctx, w, ev, longRunningSink, omitStages) // send audit event when we leave this func, either via a panic or cleanly. In the case of long // running requests, this will be the second audit event. @@ -84,7 +85,7 @@ func WithAudit(handler http.Handler, sink audit.Sink, policy policy.Checker, lon Reason: metav1.StatusReasonInternalError, Message: fmt.Sprintf("APIServer panic'd: %v", r), } - processAuditEvent(sink, ev, omitStages) + processAuditEvent(ctx, sink, ev, omitStages) return } @@ -98,14 +99,14 @@ func WithAudit(handler http.Handler, sink audit.Sink, policy policy.Checker, lon if ev.ResponseStatus == nil && longRunningSink != nil { ev.ResponseStatus = fakedSuccessStatus ev.Stage = auditinternal.StageResponseStarted - processAuditEvent(longRunningSink, ev, omitStages) + processAuditEvent(ctx, longRunningSink, ev, omitStages) } ev.Stage = auditinternal.StageResponseComplete if ev.ResponseStatus == nil { ev.ResponseStatus = fakedSuccessStatus } - processAuditEvent(sink, ev, omitStages) + processAuditEvent(ctx, sink, ev, omitStages) }() handler.ServeHTTP(respWriter, req) }) @@ -125,7 +126,7 @@ func createAuditEventAndAttachToContext(req *http.Request, policy policy.Checker } level, omitStages := policy.LevelAndStages(attribs) - audit.ObservePolicyLevel(level) + audit.ObservePolicyLevel(ctx, level) if level == auditinternal.LevelNone { // Don't audit. return req, nil, nil, nil @@ -145,7 +146,7 @@ func createAuditEventAndAttachToContext(req *http.Request, policy policy.Checker return req, ev, omitStages, nil } -func processAuditEvent(sink audit.Sink, ev *auditinternal.Event, omitStages []auditinternal.Stage) bool { +func processAuditEvent(ctx context.Context, sink audit.Sink, ev *auditinternal.Event, omitStages []auditinternal.Stage) bool { for _, stage := range omitStages { if ev.Stage == stage { return true @@ -157,12 +158,13 @@ func processAuditEvent(sink audit.Sink, ev *auditinternal.Event, omitStages []au } else { ev.StageTimestamp = metav1.NewMicroTime(time.Now()) } - audit.ObserveEvent() + audit.ObserveEvent(ctx) return sink.ProcessEvents(ev) } -func decorateResponseWriter(responseWriter http.ResponseWriter, ev *auditinternal.Event, sink audit.Sink, omitStages []auditinternal.Stage) http.ResponseWriter { +func decorateResponseWriter(ctx context.Context, responseWriter http.ResponseWriter, ev *auditinternal.Event, sink audit.Sink, omitStages []auditinternal.Stage) http.ResponseWriter { delegate := &auditResponseWriter{ + ctx: ctx, ResponseWriter: responseWriter, event: ev, sink: sink, @@ -186,6 +188,7 @@ var _ http.ResponseWriter = &auditResponseWriter{} // create immediately an event (for long running requests). type auditResponseWriter struct { http.ResponseWriter + ctx context.Context event *auditinternal.Event once sync.Once sink audit.Sink @@ -205,7 +208,7 @@ func (a *auditResponseWriter) processCode(code int) { a.event.Stage = auditinternal.StageResponseStarted if a.sink != nil { - processAuditEvent(a.sink, a.event, a.omitStages) + processAuditEvent(a.ctx, a.sink, a.event, a.omitStages) } }) } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit_test.go index e885644f8bd..d2bb0facc6e 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit_test.go @@ -18,6 +18,7 @@ package filters import ( "bufio" + "context" "net" "net/http" "net/http/httptest" @@ -92,14 +93,14 @@ func (*fancyResponseWriter) Flush() {} func (*fancyResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { return nil, nil, nil } func TestConstructResponseWriter(t *testing.T) { - actual := decorateResponseWriter(&simpleResponseWriter{}, nil, nil, nil) + actual := decorateResponseWriter(context.TODO(), &simpleResponseWriter{}, nil, nil, nil) switch v := actual.(type) { case *auditResponseWriter: default: t.Errorf("Expected auditResponseWriter, got %v", reflect.TypeOf(v)) } - actual = decorateResponseWriter(&fancyResponseWriter{}, nil, nil, nil) + actual = decorateResponseWriter(context.TODO(), &fancyResponseWriter{}, nil, nil, nil) switch v := actual.(type) { case *fancyResponseWriterDelegator: default: @@ -109,7 +110,7 @@ func TestConstructResponseWriter(t *testing.T) { func TestDecorateResponseWriterWithoutChannel(t *testing.T) { ev := &auditinternal.Event{} - actual := decorateResponseWriter(&simpleResponseWriter{}, ev, nil, nil) + actual := decorateResponseWriter(context.TODO(), &simpleResponseWriter{}, ev, nil, nil) // write status. This will not block because firstEventSentCh is nil actual.WriteHeader(42) @@ -123,7 +124,7 @@ func TestDecorateResponseWriterWithoutChannel(t *testing.T) { func TestDecorateResponseWriterWithImplicitWrite(t *testing.T) { ev := &auditinternal.Event{} - actual := decorateResponseWriter(&simpleResponseWriter{}, ev, nil, nil) + actual := decorateResponseWriter(context.TODO(), &simpleResponseWriter{}, ev, nil, nil) // write status. This will not block because firstEventSentCh is nil actual.Write([]byte("foo")) @@ -138,7 +139,7 @@ func TestDecorateResponseWriterWithImplicitWrite(t *testing.T) { func TestDecorateResponseWriterChannel(t *testing.T) { sink := &fakeAuditSink{} ev := &auditinternal.Event{} - actual := decorateResponseWriter(&simpleResponseWriter{}, ev, sink, nil) + actual := decorateResponseWriter(context.TODO(), &simpleResponseWriter{}, ev, sink, nil) done := make(chan struct{}) go func() { diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/authn_audit.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/authn_audit.go index 09d7db8cc90..2de13f74706 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/authn_audit.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/authn_audit.go @@ -52,7 +52,7 @@ func WithFailedAuthenticationAudit(failedHandler http.Handler, sink audit.Sink, ev.ResponseStatus.Message = getAuthMethods(req) ev.Stage = auditinternal.StageResponseStarted - rw := decorateResponseWriter(w, ev, sink, omitStages) + rw := decorateResponseWriter(req.Context(), w, ev, sink, omitStages) failedHandler.ServeHTTP(rw, req) }) }