From b77c8198f002f9a9c7bdca11d28cac1710bbb185 Mon Sep 17 00:00:00 2001 From: "Tim St. Clair" Date: Wed, 31 May 2017 13:38:33 -0700 Subject: [PATCH] Instrument advanced auditing --- staging/src/k8s.io/apiserver/pkg/audit/BUILD | 3 + .../src/k8s.io/apiserver/pkg/audit/format.go | 73 ++++++++++++++++ .../src/k8s.io/apiserver/pkg/audit/metrics.go | 87 +++++++++++++++++++ .../apiserver/pkg/endpoints/filters/audit.go | 16 ++-- .../apiserver/plugin/pkg/audit/log/BUILD | 1 - .../apiserver/plugin/pkg/audit/log/backend.go | 41 +-------- .../plugin/pkg/audit/webhook/webhook.go | 13 ++- 7 files changed, 186 insertions(+), 48 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/audit/format.go create mode 100644 staging/src/k8s.io/apiserver/pkg/audit/metrics.go diff --git a/staging/src/k8s.io/apiserver/pkg/audit/BUILD b/staging/src/k8s.io/apiserver/pkg/audit/BUILD index 4e67f429935..7a644aea223 100644 --- a/staging/src/k8s.io/apiserver/pkg/audit/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/audit/BUILD @@ -11,6 +11,8 @@ load( go_library( name = "go_default_library", srcs = [ + "format.go", + "metrics.go", "request.go", "scheme.go", "types.go", @@ -20,6 +22,7 @@ go_library( deps = [ "//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/pborman/uuid:go_default_library", + "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/audit/format.go b/staging/src/k8s.io/apiserver/pkg/audit/format.go new file mode 100644 index 00000000000..79c6ae7f04c --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/audit/format.go @@ -0,0 +1,73 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package audit + +import ( + "fmt" + "strconv" + "strings" + "time" + + auditinternal "k8s.io/apiserver/pkg/apis/audit" +) + +// EventString creates a 1-line text representation of an audit event, using a subset of the +// information in the event struct. +func EventString(ev *auditinternal.Event) string { + username := "" + groups := "" + if len(ev.User.Username) > 0 { + username = ev.User.Username + if len(ev.User.Groups) > 0 { + groups = auditStringSlice(ev.User.Groups) + } + } + asuser := "" + asgroups := "" + if ev.ImpersonatedUser != nil { + asuser = ev.ImpersonatedUser.Username + if ev.ImpersonatedUser.Groups != nil { + asgroups = auditStringSlice(ev.ImpersonatedUser.Groups) + } + } + + namespace := "" + if ev.ObjectRef != nil && len(ev.ObjectRef.Namespace) != 0 { + namespace = ev.ObjectRef.Namespace + } + + response := "" + if ev.ResponseStatus != nil { + response = strconv.Itoa(int(ev.ResponseStatus.Code)) + } + + ip := "" + if len(ev.SourceIPs) > 0 { + ip = ev.SourceIPs[0] + } + + return fmt.Sprintf("%s AUDIT: id=%q stage=%q ip=%q method=%q user=%q groups=%q as=%q asgroups=%q namespace=%q uri=%q response=\"%s\"\n", + ev.Timestamp.Format(time.RFC3339Nano), ev.AuditID, ev.Stage, ip, ev.Verb, username, groups, asuser, asgroups, namespace, ev.RequestURI, response) +} + +func auditStringSlice(inList []string) string { + quotedElements := make([]string, len(inList)) + for i, in := range inList { + quotedElements[i] = fmt.Sprintf("%q", in) + } + return strings.Join(quotedElements, ",") +} diff --git a/staging/src/k8s.io/apiserver/pkg/audit/metrics.go b/staging/src/k8s.io/apiserver/pkg/audit/metrics.go new file mode 100644 index 00000000000..1701332fab6 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/audit/metrics.go @@ -0,0 +1,87 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package audit + +import ( + "fmt" + + "github.com/golang/glog" + "github.com/prometheus/client_golang/prometheus" + auditinternal "k8s.io/apiserver/pkg/apis/audit" +) + +const ( + subsystem = "apiserver_audit" +) + +var ( + eventCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Subsystem: subsystem, + Name: "event_count", + Help: "Counter of audit events generated and sent to the audit backend.", + }) + errorCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: subsystem, + Name: "error_count", + Help: "Counter of audit events that failed to be audited properly. " + + "Plugin identifies the plugin affected by the error.", + }, + []string{"plugin"}, + ) + levelCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: subsystem, + Name: "level_count", + Help: "Counter of policy levels for audit events (1 per request).", + }, + []string{"level"}, + ) +) + +func init() { + prometheus.MustRegister(eventCounter) + prometheus.MustRegister(errorCounter) + prometheus.MustRegister(levelCounter) +} + +// ObserveEvent updates the relevant prometheus metrics for the generated audit event. +func ObserveEvent() { + eventCounter.Inc() +} + +// ObservePolicyLevel updates the relevant prometheus metrics with the audit level for a request. +func ObservePolicyLevel(level auditinternal.Level) { + levelCounter.WithLabelValues(string(level)).Inc() +} + +// HandlePluginError handles an error that occurred in an audit plugin. This method should only be +// used if the error may have prevented the audit event from being properly recorded. The events are +// modified. +func HandlePluginError(plugin string, err error, impacted ...*auditinternal.Event) { + // Count the error. + errorCounter.WithLabelValues(plugin).Add(float64(len(impacted))) + + // Log the audit events to the debug log. + msg := fmt.Sprintf("Error in audit plugin '%s' affecting %d audit events: %v\nImpacted events:\n", + plugin, len(impacted), err) + for _, ev := range impacted { + msg = msg + EventString(ev) + "\n" + } + glog.Error(msg) +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit.go index 02b32a00745..441a38a6116 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit.go @@ -68,6 +68,7 @@ func WithAudit(handler http.Handler, requestContextMapper request.RequestContext } level := policy.Level(attribs) + audit.ObservePolicyLevel(level) if level == auditinternal.LevelNone { // Don't audit. handler.ServeHTTP(w, req) @@ -89,7 +90,7 @@ func WithAudit(handler http.Handler, requestContextMapper request.RequestContext } ev.Stage = auditinternal.StageRequestReceived - sink.ProcessEvents(ev) + processEvent(sink, ev) // intercept the status code var longRunningSink audit.Sink @@ -113,7 +114,7 @@ func WithAudit(handler http.Handler, requestContextMapper request.RequestContext Reason: metav1.StatusReasonInternalError, Message: fmt.Sprintf("APIServer panic'd: %v", r), } - sink.ProcessEvents(ev) + processEvent(sink, ev) return } @@ -126,19 +127,24 @@ func WithAudit(handler http.Handler, requestContextMapper request.RequestContext if ev.ResponseStatus == nil && longRunningSink != nil { ev.ResponseStatus = fakedSuccessStatus ev.Stage = auditinternal.StageResponseStarted - longRunningSink.ProcessEvents(ev) + processEvent(longRunningSink, ev) } ev.Stage = auditinternal.StageResponseComplete if ev.ResponseStatus == nil { ev.ResponseStatus = fakedSuccessStatus } - sink.ProcessEvents(ev) + processEvent(sink, ev) }() handler.ServeHTTP(respWriter, req) }) } +func processEvent(sink audit.Sink, ev *auditinternal.Event) { + audit.ObserveEvent() + sink.ProcessEvents(ev) +} + func decorateResponseWriter(responseWriter http.ResponseWriter, ev *auditinternal.Event, sink audit.Sink) http.ResponseWriter { delegate := &auditResponseWriter{ ResponseWriter: responseWriter, @@ -177,7 +183,7 @@ func (a *auditResponseWriter) processCode(code int) { a.event.Stage = auditinternal.StageResponseStarted if a.sink != nil { - a.sink.ProcessEvents(a.event) + processEvent(a.sink, a.event) } }) } diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/log/BUILD b/staging/src/k8s.io/apiserver/plugin/pkg/audit/log/BUILD index 97d92e33cf8..82005d6ab32 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/log/BUILD +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/log/BUILD @@ -12,7 +12,6 @@ go_library( srcs = ["backend.go"], tags = ["automanaged"], deps = [ - "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library", "//vendor/k8s.io/apiserver/pkg/audit:go_default_library", ], diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/log/backend.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/log/backend.go index 3712d728c7a..702599785ef 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/log/backend.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/log/backend.go @@ -19,11 +19,7 @@ package log import ( "fmt" "io" - "strconv" "strings" - "time" - - "github.com/golang/glog" auditinternal "k8s.io/apiserver/pkg/apis/audit" "k8s.io/apiserver/pkg/audit" @@ -50,42 +46,9 @@ func (b *backend) ProcessEvents(events ...*auditinternal.Event) { } func (b *backend) logEvent(ev *auditinternal.Event) { - username := "" - groups := "" - if len(ev.User.Username) > 0 { - username = ev.User.Username - if len(ev.User.Groups) > 0 { - groups = auditStringSlice(ev.User.Groups) - } - } - asuser := "" - asgroups := "" - if ev.ImpersonatedUser != nil { - asuser = ev.ImpersonatedUser.Username - if ev.ImpersonatedUser.Groups != nil { - asgroups = auditStringSlice(ev.ImpersonatedUser.Groups) - } - } - - namespace := "" - if ev.ObjectRef != nil && len(ev.ObjectRef.Namespace) != 0 { - namespace = ev.ObjectRef.Namespace - } - - response := "" - if ev.ResponseStatus != nil { - response = strconv.Itoa(int(ev.ResponseStatus.Code)) - } - - ip := "" - if len(ev.SourceIPs) > 0 { - ip = ev.SourceIPs[0] - } - - line := fmt.Sprintf("%s AUDIT: id=%q stage=%q ip=%q method=%q user=%q groups=%q as=%q asgroups=%q namespace=%q uri=%q response=\"%s\"\n", - ev.Timestamp.Format(time.RFC3339Nano), ev.AuditID, ev.Stage, ip, ev.Verb, username, groups, asuser, asgroups, namespace, ev.RequestURI, response) + line := audit.EventString(ev) if _, err := fmt.Fprint(b.out, line); err != nil { - glog.Errorf("Unable to write audit log: %s, the error is: %v", line, err) + audit.HandlePluginError("log", err, ev) } } diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go index 7dfc1b7158a..aad5de01675 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go @@ -65,6 +65,9 @@ const ( defaultBatchMaxWait = time.Minute // Send events at least once a minute. ) +// The plugin name reported in error metrics. +const pluginName = "webhook" + // NewBackend returns an audit backend that sends events over HTTP to an external service. // The mode indicates the caching behavior of the webhook. Either blocking (ModeBlocking) // or buffered with batch POSTs (ModeBatch). @@ -119,7 +122,7 @@ func (b *blockingBackend) Run(stopCh <-chan struct{}) error { func (b *blockingBackend) ProcessEvents(ev ...*auditinternal.Event) { if err := b.processEvents(ev...); err != nil { - glog.Errorf("failed to POST webhook events: %v", err) + audit.HandlePluginError(pluginName, err, ev...) } } @@ -259,7 +262,11 @@ L: return b.w.RestClient.Post().Body(&list).Do().Error() }) if err != nil { - glog.Errorf("failed to POST webhook events: %v", err) + impacted := make([]*auditinternal.Event, len(events)) + for i := range events { + impacted[i] = &events[i] + } + audit.HandlePluginError(pluginName, err, impacted...) } }() return @@ -278,7 +285,7 @@ func (b *batchBackend) ProcessEvents(ev ...*auditinternal.Event) { select { case b.buffer <- event: default: - glog.Errorf("audit webhook queue blocked, failed to send %d event(s)", len(ev)-i) + audit.HandlePluginError(pluginName, fmt.Errorf("audit webhook queue blocked"), ev[i:]...) return } }