Instrument advanced auditing

This commit is contained in:
Tim St. Clair 2017-05-31 13:38:33 -07:00
parent 68dd748ba1
commit b77c8198f0
No known key found for this signature in database
GPG Key ID: 434D16BCEF479EAB
7 changed files with 186 additions and 48 deletions

View File

@ -11,6 +11,8 @@ load(
go_library(
name = "go_default_library",
srcs = [
"format.go",
"metrics.go",
"request.go",
"scheme.go",
"types.go",
@ -20,6 +22,7 @@ go_library(
deps = [
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/pborman/uuid:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",

View File

@ -0,0 +1,73 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package audit
import (
"fmt"
"strconv"
"strings"
"time"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
)
// EventString creates a 1-line text representation of an audit event, using a subset of the
// information in the event struct.
func EventString(ev *auditinternal.Event) string {
username := "<none>"
groups := "<none>"
if len(ev.User.Username) > 0 {
username = ev.User.Username
if len(ev.User.Groups) > 0 {
groups = auditStringSlice(ev.User.Groups)
}
}
asuser := "<self>"
asgroups := "<lookup>"
if ev.ImpersonatedUser != nil {
asuser = ev.ImpersonatedUser.Username
if ev.ImpersonatedUser.Groups != nil {
asgroups = auditStringSlice(ev.ImpersonatedUser.Groups)
}
}
namespace := "<none>"
if ev.ObjectRef != nil && len(ev.ObjectRef.Namespace) != 0 {
namespace = ev.ObjectRef.Namespace
}
response := "<deferred>"
if ev.ResponseStatus != nil {
response = strconv.Itoa(int(ev.ResponseStatus.Code))
}
ip := "<unknown>"
if len(ev.SourceIPs) > 0 {
ip = ev.SourceIPs[0]
}
return fmt.Sprintf("%s AUDIT: id=%q stage=%q ip=%q method=%q user=%q groups=%q as=%q asgroups=%q namespace=%q uri=%q response=\"%s\"\n",
ev.Timestamp.Format(time.RFC3339Nano), ev.AuditID, ev.Stage, ip, ev.Verb, username, groups, asuser, asgroups, namespace, ev.RequestURI, response)
}
func auditStringSlice(inList []string) string {
quotedElements := make([]string, len(inList))
for i, in := range inList {
quotedElements[i] = fmt.Sprintf("%q", in)
}
return strings.Join(quotedElements, ",")
}

View File

@ -0,0 +1,87 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package audit
import (
"fmt"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
)
const (
subsystem = "apiserver_audit"
)
var (
eventCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: subsystem,
Name: "event_count",
Help: "Counter of audit events generated and sent to the audit backend.",
})
errorCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: subsystem,
Name: "error_count",
Help: "Counter of audit events that failed to be audited properly. " +
"Plugin identifies the plugin affected by the error.",
},
[]string{"plugin"},
)
levelCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: subsystem,
Name: "level_count",
Help: "Counter of policy levels for audit events (1 per request).",
},
[]string{"level"},
)
)
func init() {
prometheus.MustRegister(eventCounter)
prometheus.MustRegister(errorCounter)
prometheus.MustRegister(levelCounter)
}
// ObserveEvent updates the relevant prometheus metrics for the generated audit event.
func ObserveEvent() {
eventCounter.Inc()
}
// ObservePolicyLevel updates the relevant prometheus metrics with the audit level for a request.
func ObservePolicyLevel(level auditinternal.Level) {
levelCounter.WithLabelValues(string(level)).Inc()
}
// HandlePluginError handles an error that occurred in an audit plugin. This method should only be
// used if the error may have prevented the audit event from being properly recorded. The events are
// modified.
func HandlePluginError(plugin string, err error, impacted ...*auditinternal.Event) {
// Count the error.
errorCounter.WithLabelValues(plugin).Add(float64(len(impacted)))
// Log the audit events to the debug log.
msg := fmt.Sprintf("Error in audit plugin '%s' affecting %d audit events: %v\nImpacted events:\n",
plugin, len(impacted), err)
for _, ev := range impacted {
msg = msg + EventString(ev) + "\n"
}
glog.Error(msg)
}

View File

@ -68,6 +68,7 @@ func WithAudit(handler http.Handler, requestContextMapper request.RequestContext
}
level := policy.Level(attribs)
audit.ObservePolicyLevel(level)
if level == auditinternal.LevelNone {
// Don't audit.
handler.ServeHTTP(w, req)
@ -89,7 +90,7 @@ func WithAudit(handler http.Handler, requestContextMapper request.RequestContext
}
ev.Stage = auditinternal.StageRequestReceived
sink.ProcessEvents(ev)
processEvent(sink, ev)
// intercept the status code
var longRunningSink audit.Sink
@ -113,7 +114,7 @@ func WithAudit(handler http.Handler, requestContextMapper request.RequestContext
Reason: metav1.StatusReasonInternalError,
Message: fmt.Sprintf("APIServer panic'd: %v", r),
}
sink.ProcessEvents(ev)
processEvent(sink, ev)
return
}
@ -126,19 +127,24 @@ func WithAudit(handler http.Handler, requestContextMapper request.RequestContext
if ev.ResponseStatus == nil && longRunningSink != nil {
ev.ResponseStatus = fakedSuccessStatus
ev.Stage = auditinternal.StageResponseStarted
longRunningSink.ProcessEvents(ev)
processEvent(longRunningSink, ev)
}
ev.Stage = auditinternal.StageResponseComplete
if ev.ResponseStatus == nil {
ev.ResponseStatus = fakedSuccessStatus
}
sink.ProcessEvents(ev)
processEvent(sink, ev)
}()
handler.ServeHTTP(respWriter, req)
})
}
func processEvent(sink audit.Sink, ev *auditinternal.Event) {
audit.ObserveEvent()
sink.ProcessEvents(ev)
}
func decorateResponseWriter(responseWriter http.ResponseWriter, ev *auditinternal.Event, sink audit.Sink) http.ResponseWriter {
delegate := &auditResponseWriter{
ResponseWriter: responseWriter,
@ -177,7 +183,7 @@ func (a *auditResponseWriter) processCode(code int) {
a.event.Stage = auditinternal.StageResponseStarted
if a.sink != nil {
a.sink.ProcessEvents(a.event)
processEvent(a.sink, a.event)
}
})
}

View File

@ -12,7 +12,6 @@ go_library(
srcs = ["backend.go"],
tags = ["automanaged"],
deps = [
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
],

View File

@ -19,11 +19,7 @@ package log
import (
"fmt"
"io"
"strconv"
"strings"
"time"
"github.com/golang/glog"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/audit"
@ -50,42 +46,9 @@ func (b *backend) ProcessEvents(events ...*auditinternal.Event) {
}
func (b *backend) logEvent(ev *auditinternal.Event) {
username := "<none>"
groups := "<none>"
if len(ev.User.Username) > 0 {
username = ev.User.Username
if len(ev.User.Groups) > 0 {
groups = auditStringSlice(ev.User.Groups)
}
}
asuser := "<self>"
asgroups := "<lookup>"
if ev.ImpersonatedUser != nil {
asuser = ev.ImpersonatedUser.Username
if ev.ImpersonatedUser.Groups != nil {
asgroups = auditStringSlice(ev.ImpersonatedUser.Groups)
}
}
namespace := "<none>"
if ev.ObjectRef != nil && len(ev.ObjectRef.Namespace) != 0 {
namespace = ev.ObjectRef.Namespace
}
response := "<deferred>"
if ev.ResponseStatus != nil {
response = strconv.Itoa(int(ev.ResponseStatus.Code))
}
ip := "<unknown>"
if len(ev.SourceIPs) > 0 {
ip = ev.SourceIPs[0]
}
line := fmt.Sprintf("%s AUDIT: id=%q stage=%q ip=%q method=%q user=%q groups=%q as=%q asgroups=%q namespace=%q uri=%q response=\"%s\"\n",
ev.Timestamp.Format(time.RFC3339Nano), ev.AuditID, ev.Stage, ip, ev.Verb, username, groups, asuser, asgroups, namespace, ev.RequestURI, response)
line := audit.EventString(ev)
if _, err := fmt.Fprint(b.out, line); err != nil {
glog.Errorf("Unable to write audit log: %s, the error is: %v", line, err)
audit.HandlePluginError("log", err, ev)
}
}

View File

@ -65,6 +65,9 @@ const (
defaultBatchMaxWait = time.Minute // Send events at least once a minute.
)
// The plugin name reported in error metrics.
const pluginName = "webhook"
// NewBackend returns an audit backend that sends events over HTTP to an external service.
// The mode indicates the caching behavior of the webhook. Either blocking (ModeBlocking)
// or buffered with batch POSTs (ModeBatch).
@ -119,7 +122,7 @@ func (b *blockingBackend) Run(stopCh <-chan struct{}) error {
func (b *blockingBackend) ProcessEvents(ev ...*auditinternal.Event) {
if err := b.processEvents(ev...); err != nil {
glog.Errorf("failed to POST webhook events: %v", err)
audit.HandlePluginError(pluginName, err, ev...)
}
}
@ -259,7 +262,11 @@ L:
return b.w.RestClient.Post().Body(&list).Do().Error()
})
if err != nil {
glog.Errorf("failed to POST webhook events: %v", err)
impacted := make([]*auditinternal.Event, len(events))
for i := range events {
impacted[i] = &events[i]
}
audit.HandlePluginError(pluginName, err, impacted...)
}
}()
return
@ -278,7 +285,7 @@ func (b *batchBackend) ProcessEvents(ev ...*auditinternal.Event) {
select {
case b.buffer <- event:
default:
glog.Errorf("audit webhook queue blocked, failed to send %d event(s)", len(ev)-i)
audit.HandlePluginError(pluginName, fmt.Errorf("audit webhook queue blocked"), ev[i:]...)
return
}
}