add context to metric in apiserver/audit

This commit is contained in:
yoyinzyc 2021-01-20 12:04:41 -08:00
parent f384925847
commit 4ba3f1a982
4 changed files with 26 additions and 21 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package audit package audit
import ( import (
"context"
"fmt" "fmt"
auditinternal "k8s.io/apiserver/pkg/apis/audit" auditinternal "k8s.io/apiserver/pkg/apis/audit"
@ -84,13 +85,13 @@ func init() {
} }
// ObserveEvent updates the relevant prometheus metrics for the generated audit event. // ObserveEvent updates the relevant prometheus metrics for the generated audit event.
func ObserveEvent() { func ObserveEvent(ctx context.Context) {
eventCounter.Inc() eventCounter.WithContext(ctx).Inc()
} }
// ObservePolicyLevel updates the relevant prometheus metrics with the audit level for a request. // ObservePolicyLevel updates the relevant prometheus metrics with the audit level for a request.
func ObservePolicyLevel(level auditinternal.Level) { func ObservePolicyLevel(ctx context.Context, level auditinternal.Level) {
levelCounter.WithLabelValues(string(level)).Inc() levelCounter.WithContext(ctx).WithLabelValues(string(level)).Inc()
} }
// HandlePluginError handles an error that occurred in an audit plugin. This method should only be // HandlePluginError handles an error that occurred in an audit plugin. This method should only be

View File

@ -18,6 +18,7 @@ package filters
import ( import (
"bufio" "bufio"
"context"
"errors" "errors"
"fmt" "fmt"
"net" "net"
@ -56,8 +57,8 @@ func WithAudit(handler http.Handler, sink audit.Sink, policy policy.Checker, lon
} }
ev.Stage = auditinternal.StageRequestReceived ev.Stage = auditinternal.StageRequestReceived
if processed := processAuditEvent(sink, ev, omitStages); !processed { if processed := processAuditEvent(ctx, sink, ev, omitStages); !processed {
audit.ApiserverAuditDroppedCounter.Inc() audit.ApiserverAuditDroppedCounter.WithContext(ctx).Inc()
responsewriters.InternalError(w, req, errors.New("failed to store audit event")) responsewriters.InternalError(w, req, errors.New("failed to store audit event"))
return return
} }
@ -70,7 +71,7 @@ func WithAudit(handler http.Handler, sink audit.Sink, policy policy.Checker, lon
longRunningSink = sink longRunningSink = sink
} }
} }
respWriter := decorateResponseWriter(w, ev, longRunningSink, omitStages) respWriter := decorateResponseWriter(ctx, w, ev, longRunningSink, omitStages)
// send audit event when we leave this func, either via a panic or cleanly. In the case of long // send audit event when we leave this func, either via a panic or cleanly. In the case of long
// running requests, this will be the second audit event. // running requests, this will be the second audit event.
@ -84,7 +85,7 @@ func WithAudit(handler http.Handler, sink audit.Sink, policy policy.Checker, lon
Reason: metav1.StatusReasonInternalError, Reason: metav1.StatusReasonInternalError,
Message: fmt.Sprintf("APIServer panic'd: %v", r), Message: fmt.Sprintf("APIServer panic'd: %v", r),
} }
processAuditEvent(sink, ev, omitStages) processAuditEvent(ctx, sink, ev, omitStages)
return return
} }
@ -98,14 +99,14 @@ func WithAudit(handler http.Handler, sink audit.Sink, policy policy.Checker, lon
if ev.ResponseStatus == nil && longRunningSink != nil { if ev.ResponseStatus == nil && longRunningSink != nil {
ev.ResponseStatus = fakedSuccessStatus ev.ResponseStatus = fakedSuccessStatus
ev.Stage = auditinternal.StageResponseStarted ev.Stage = auditinternal.StageResponseStarted
processAuditEvent(longRunningSink, ev, omitStages) processAuditEvent(ctx, longRunningSink, ev, omitStages)
} }
ev.Stage = auditinternal.StageResponseComplete ev.Stage = auditinternal.StageResponseComplete
if ev.ResponseStatus == nil { if ev.ResponseStatus == nil {
ev.ResponseStatus = fakedSuccessStatus ev.ResponseStatus = fakedSuccessStatus
} }
processAuditEvent(sink, ev, omitStages) processAuditEvent(ctx, sink, ev, omitStages)
}() }()
handler.ServeHTTP(respWriter, req) handler.ServeHTTP(respWriter, req)
}) })
@ -125,7 +126,7 @@ func createAuditEventAndAttachToContext(req *http.Request, policy policy.Checker
} }
level, omitStages := policy.LevelAndStages(attribs) level, omitStages := policy.LevelAndStages(attribs)
audit.ObservePolicyLevel(level) audit.ObservePolicyLevel(ctx, level)
if level == auditinternal.LevelNone { if level == auditinternal.LevelNone {
// Don't audit. // Don't audit.
return req, nil, nil, nil return req, nil, nil, nil
@ -145,7 +146,7 @@ func createAuditEventAndAttachToContext(req *http.Request, policy policy.Checker
return req, ev, omitStages, nil return req, ev, omitStages, nil
} }
func processAuditEvent(sink audit.Sink, ev *auditinternal.Event, omitStages []auditinternal.Stage) bool { func processAuditEvent(ctx context.Context, sink audit.Sink, ev *auditinternal.Event, omitStages []auditinternal.Stage) bool {
for _, stage := range omitStages { for _, stage := range omitStages {
if ev.Stage == stage { if ev.Stage == stage {
return true return true
@ -157,12 +158,13 @@ func processAuditEvent(sink audit.Sink, ev *auditinternal.Event, omitStages []au
} else { } else {
ev.StageTimestamp = metav1.NewMicroTime(time.Now()) ev.StageTimestamp = metav1.NewMicroTime(time.Now())
} }
audit.ObserveEvent() audit.ObserveEvent(ctx)
return sink.ProcessEvents(ev) return sink.ProcessEvents(ev)
} }
func decorateResponseWriter(responseWriter http.ResponseWriter, ev *auditinternal.Event, sink audit.Sink, omitStages []auditinternal.Stage) http.ResponseWriter { func decorateResponseWriter(ctx context.Context, responseWriter http.ResponseWriter, ev *auditinternal.Event, sink audit.Sink, omitStages []auditinternal.Stage) http.ResponseWriter {
delegate := &auditResponseWriter{ delegate := &auditResponseWriter{
ctx: ctx,
ResponseWriter: responseWriter, ResponseWriter: responseWriter,
event: ev, event: ev,
sink: sink, sink: sink,
@ -186,6 +188,7 @@ var _ http.ResponseWriter = &auditResponseWriter{}
// create immediately an event (for long running requests). // create immediately an event (for long running requests).
type auditResponseWriter struct { type auditResponseWriter struct {
http.ResponseWriter http.ResponseWriter
ctx context.Context
event *auditinternal.Event event *auditinternal.Event
once sync.Once once sync.Once
sink audit.Sink sink audit.Sink
@ -205,7 +208,7 @@ func (a *auditResponseWriter) processCode(code int) {
a.event.Stage = auditinternal.StageResponseStarted a.event.Stage = auditinternal.StageResponseStarted
if a.sink != nil { if a.sink != nil {
processAuditEvent(a.sink, a.event, a.omitStages) processAuditEvent(a.ctx, a.sink, a.event, a.omitStages)
} }
}) })
} }

View File

@ -18,6 +18,7 @@ package filters
import ( import (
"bufio" "bufio"
"context"
"net" "net"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
@ -92,14 +93,14 @@ func (*fancyResponseWriter) Flush() {}
func (*fancyResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { return nil, nil, nil } func (*fancyResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { return nil, nil, nil }
func TestConstructResponseWriter(t *testing.T) { func TestConstructResponseWriter(t *testing.T) {
actual := decorateResponseWriter(&simpleResponseWriter{}, nil, nil, nil) actual := decorateResponseWriter(context.TODO(), &simpleResponseWriter{}, nil, nil, nil)
switch v := actual.(type) { switch v := actual.(type) {
case *auditResponseWriter: case *auditResponseWriter:
default: default:
t.Errorf("Expected auditResponseWriter, got %v", reflect.TypeOf(v)) t.Errorf("Expected auditResponseWriter, got %v", reflect.TypeOf(v))
} }
actual = decorateResponseWriter(&fancyResponseWriter{}, nil, nil, nil) actual = decorateResponseWriter(context.TODO(), &fancyResponseWriter{}, nil, nil, nil)
switch v := actual.(type) { switch v := actual.(type) {
case *fancyResponseWriterDelegator: case *fancyResponseWriterDelegator:
default: default:
@ -109,7 +110,7 @@ func TestConstructResponseWriter(t *testing.T) {
func TestDecorateResponseWriterWithoutChannel(t *testing.T) { func TestDecorateResponseWriterWithoutChannel(t *testing.T) {
ev := &auditinternal.Event{} ev := &auditinternal.Event{}
actual := decorateResponseWriter(&simpleResponseWriter{}, ev, nil, nil) actual := decorateResponseWriter(context.TODO(), &simpleResponseWriter{}, ev, nil, nil)
// write status. This will not block because firstEventSentCh is nil // write status. This will not block because firstEventSentCh is nil
actual.WriteHeader(42) actual.WriteHeader(42)
@ -123,7 +124,7 @@ func TestDecorateResponseWriterWithoutChannel(t *testing.T) {
func TestDecorateResponseWriterWithImplicitWrite(t *testing.T) { func TestDecorateResponseWriterWithImplicitWrite(t *testing.T) {
ev := &auditinternal.Event{} ev := &auditinternal.Event{}
actual := decorateResponseWriter(&simpleResponseWriter{}, ev, nil, nil) actual := decorateResponseWriter(context.TODO(), &simpleResponseWriter{}, ev, nil, nil)
// write status. This will not block because firstEventSentCh is nil // write status. This will not block because firstEventSentCh is nil
actual.Write([]byte("foo")) actual.Write([]byte("foo"))
@ -138,7 +139,7 @@ func TestDecorateResponseWriterWithImplicitWrite(t *testing.T) {
func TestDecorateResponseWriterChannel(t *testing.T) { func TestDecorateResponseWriterChannel(t *testing.T) {
sink := &fakeAuditSink{} sink := &fakeAuditSink{}
ev := &auditinternal.Event{} ev := &auditinternal.Event{}
actual := decorateResponseWriter(&simpleResponseWriter{}, ev, sink, nil) actual := decorateResponseWriter(context.TODO(), &simpleResponseWriter{}, ev, sink, nil)
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {

View File

@ -52,7 +52,7 @@ func WithFailedAuthenticationAudit(failedHandler http.Handler, sink audit.Sink,
ev.ResponseStatus.Message = getAuthMethods(req) ev.ResponseStatus.Message = getAuthMethods(req)
ev.Stage = auditinternal.StageResponseStarted ev.Stage = auditinternal.StageResponseStarted
rw := decorateResponseWriter(w, ev, sink, omitStages) rw := decorateResponseWriter(req.Context(), w, ev, sink, omitStages)
failedHandler.ServeHTTP(rw, req) failedHandler.ServeHTTP(rw, req)
}) })
} }