diff --git a/staging/src/k8s.io/apiserver/pkg/audit/metrics.go b/staging/src/k8s.io/apiserver/pkg/audit/metrics.go index 46b480eeaf5..9b81b30cc26 100644 --- a/staging/src/k8s.io/apiserver/pkg/audit/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/audit/metrics.go @@ -52,12 +52,22 @@ var ( }, []string{"level"}, ) + + ApiserverAuditDroppedCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Subsystem: subsystem, + Name: "requests_rejected_total", + Help: "Counter of apiserver requests rejected due to an error " + + "in audit logging backend.", + }, + ) ) func init() { prometheus.MustRegister(eventCounter) prometheus.MustRegister(errorCounter) prometheus.MustRegister(levelCounter) + prometheus.MustRegister(ApiserverAuditDroppedCounter) } // ObserveEvent updates the relevant prometheus metrics for the generated audit event. diff --git a/staging/src/k8s.io/apiserver/pkg/audit/types.go b/staging/src/k8s.io/apiserver/pkg/audit/types.go index dbf03b0f51c..b78bd086b05 100644 --- a/staging/src/k8s.io/apiserver/pkg/audit/types.go +++ b/staging/src/k8s.io/apiserver/pkg/audit/types.go @@ -25,7 +25,8 @@ type Sink interface { // Errors might be logged by the sink itself. If an error should be fatal, leading to an internal // error, ProcessEvents is supposed to panic. The event must not be mutated and is reused by the caller // after the call returns, i.e. the sink has to make a deepcopy to keep a copy around if necessary. - ProcessEvents(events ...*auditinternal.Event) + // Returns true on success, may return false on error. + ProcessEvents(events ...*auditinternal.Event) bool } type Backend interface { diff --git a/staging/src/k8s.io/apiserver/pkg/audit/union.go b/staging/src/k8s.io/apiserver/pkg/audit/union.go index 6ee441533a7..39dd74f740e 100644 --- a/staging/src/k8s.io/apiserver/pkg/audit/union.go +++ b/staging/src/k8s.io/apiserver/pkg/audit/union.go @@ -37,10 +37,12 @@ type union struct { backends []Backend } -func (u union) ProcessEvents(events ...*auditinternal.Event) { +func (u union) ProcessEvents(events ...*auditinternal.Event) bool { + success := true for _, backend := range u.backends { - backend.ProcessEvents(events...) + success = backend.ProcessEvents(events...) && success } + return success } func (u union) Run(stopCh <-chan struct{}) error { diff --git a/staging/src/k8s.io/apiserver/pkg/audit/union_test.go b/staging/src/k8s.io/apiserver/pkg/audit/union_test.go index 3d474a4743c..4ab1de0c814 100644 --- a/staging/src/k8s.io/apiserver/pkg/audit/union_test.go +++ b/staging/src/k8s.io/apiserver/pkg/audit/union_test.go @@ -28,8 +28,9 @@ type fakeBackend struct { events []*auditinternal.Event } -func (f *fakeBackend) ProcessEvents(events ...*auditinternal.Event) { +func (f *fakeBackend) ProcessEvents(events ...*auditinternal.Event) bool { f.events = append(f.events, events...) + return true } func (f *fakeBackend) Run(stopCh <-chan struct{}) error { diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/audit_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/audit_test.go index e0c2de2d439..0ebff74e538 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/audit_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/audit_test.go @@ -39,13 +39,14 @@ type fakeAuditSink struct { events []*auditinternal.Event } -func (s *fakeAuditSink) ProcessEvents(evs ...*auditinternal.Event) { +func (s *fakeAuditSink) ProcessEvents(evs ...*auditinternal.Event) bool { s.lock.Lock() defer s.lock.Unlock() for _, ev := range evs { e := ev.DeepCopy() s.events = append(s.events, e) } + return true } func (s *fakeAuditSink) Events() []*auditinternal.Event { diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit.go index 49463410789..458c8a67c92 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit.go @@ -56,7 +56,11 @@ func WithAudit(handler http.Handler, sink audit.Sink, policy policy.Checker, lon } ev.Stage = auditinternal.StageRequestReceived - processAuditEvent(sink, ev, omitStages) + if processed := processAuditEvent(sink, ev, omitStages); !processed { + audit.ApiserverAuditDroppedCounter.Inc() + responsewriters.InternalError(w, req, errors.New("failed to store audit event")) + return + } // intercept the status code var longRunningSink audit.Sink @@ -137,10 +141,10 @@ func createAuditEventAndAttachToContext(req *http.Request, policy policy.Checker return req, ev, omitStages, nil } -func processAuditEvent(sink audit.Sink, ev *auditinternal.Event, omitStages []auditinternal.Stage) { +func processAuditEvent(sink audit.Sink, ev *auditinternal.Event, omitStages []auditinternal.Stage) bool { for _, stage := range omitStages { if ev.Stage == stage { - return + return true } } @@ -150,7 +154,7 @@ func processAuditEvent(sink audit.Sink, ev *auditinternal.Event, omitStages []au ev.StageTimestamp = metav1.NewMicroTime(time.Now()) } audit.ObserveEvent() - sink.ProcessEvents(ev) + return sink.ProcessEvents(ev) } func decorateResponseWriter(responseWriter http.ResponseWriter, ev *auditinternal.Event, sink audit.Sink, omitStages []auditinternal.Stage) http.ResponseWriter { diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit_test.go index df721dbb65f..3d3edd68f8f 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit_test.go @@ -42,13 +42,14 @@ type fakeAuditSink struct { events []*auditinternal.Event } -func (s *fakeAuditSink) ProcessEvents(evs ...*auditinternal.Event) { +func (s *fakeAuditSink) ProcessEvents(evs ...*auditinternal.Event) bool { s.lock.Lock() defer s.lock.Unlock() for _, e := range evs { event := e.DeepCopy() s.events = append(s.events, event) } + return true } func (s *fakeAuditSink) Events() []*auditinternal.Event { diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/BUILD b/staging/src/k8s.io/apiserver/pkg/server/options/BUILD index 4125ba1343b..87318a1f053 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/options/BUILD @@ -42,6 +42,7 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/validating:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/apiserver:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/apiserver/v1alpha1:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/audit/v1:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/audit/v1alpha1:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/audit/v1beta1:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/audit.go b/staging/src/k8s.io/apiserver/pkg/server/options/audit.go index 240f0a7cbf7..f579399cf8e 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/audit.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/audit.go @@ -29,6 +29,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime/schema" + auditinternal "k8s.io/apiserver/pkg/apis/audit" auditv1 "k8s.io/apiserver/pkg/apis/audit/v1" auditv1alpha1 "k8s.io/apiserver/pkg/apis/audit/v1alpha1" auditv1beta1 "k8s.io/apiserver/pkg/apis/audit/v1beta1" @@ -89,12 +90,17 @@ const ( // a set of events. This causes requests to the API server to wait for the // flush before sending a response. ModeBlocking = "blocking" + // ModeBlockingStrict is the same as ModeBlocking, except when there is + // a failure during audit logging at RequestReceived stage, the whole + // request to apiserver will fail. + ModeBlockingStrict = "blocking-strict" ) // AllowedModes is the modes known for audit backends. var AllowedModes = []string{ ModeBatch, ModeBlocking, + ModeBlockingStrict, } type AuditBatchOptions struct { @@ -393,10 +399,26 @@ func (o *AuditBatchOptions) AddFlags(pluginName string, fs *pflag.FlagSet) { "moment if ThrottleQPS was not utilized before. Only used in batch mode.") } +type ignoreErrorsBackend struct { + audit.Backend +} + +func (i *ignoreErrorsBackend) ProcessEvents(ev ...*auditinternal.Event) bool { + i.Backend.ProcessEvents(ev...) + return true +} + +func (i *ignoreErrorsBackend) String() string { + return fmt.Sprintf("ignoreErrors<%s>", i.Backend) +} + func (o *AuditBatchOptions) wrapBackend(delegate audit.Backend) audit.Backend { - if o.Mode == ModeBlocking { + if o.Mode == ModeBlockingStrict { return delegate } + if o.Mode == ModeBlocking { + return &ignoreErrorsBackend{Backend: delegate} + } return pluginbuffered.NewBackend(delegate, o.BatchConfig) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/audit_test.go b/staging/src/k8s.io/apiserver/pkg/server/options/audit_test.go index b1014ef54b0..7137c521b10 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/audit_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/audit_test.go @@ -67,7 +67,7 @@ func TestAuditValidOptions(t *testing.T) { o.PolicyFile = policy return o }, - expected: "log", + expected: "ignoreErrors", }, { name: "default log no policy", options: func() *AuditOptions { @@ -93,6 +93,16 @@ func TestAuditValidOptions(t *testing.T) { return o }, expected: "", + }, { + name: "strict webhook", + options: func() *AuditOptions { + o := NewAuditOptions() + o.WebhookOptions.ConfigFile = webhookConfig + o.WebhookOptions.BatchOptions.Mode = ModeBlockingStrict + o.PolicyFile = policy + return o + }, + expected: "webhook", }, { name: "default union", options: func() *AuditOptions { @@ -102,7 +112,7 @@ func TestAuditValidOptions(t *testing.T) { o.PolicyFile = policy return o }, - expected: "union[log,buffered]", + expected: "union[ignoreErrors,buffered]", }, { name: "custom", options: func() *AuditOptions { @@ -114,7 +124,7 @@ func TestAuditValidOptions(t *testing.T) { o.PolicyFile = policy return o }, - expected: "union[buffered,webhook]", + expected: "union[buffered,ignoreErrors]", }, { name: "default webhook with truncating", options: func() *AuditOptions { @@ -151,7 +161,7 @@ func TestAuditValidOptions(t *testing.T) { o.PolicyFile = policy return o }, - expected: "union[enforced,dynamic[]]", + expected: "union[enforced>,dynamic[]]", }, { name: "dynamic with truncating and webhook", options: func() *AuditOptions { @@ -174,7 +184,7 @@ func TestAuditValidOptions(t *testing.T) { o.LogOptions.Path = "/audit" return o }, - expected: "union[enforced,truncate>,dynamic[]]>]", + expected: "union[enforced>,truncate>,dynamic[]]>]", }, } for _, tc := range testCases { diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered.go index 66165915fcc..a96d9bea30f 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered.go @@ -251,7 +251,7 @@ func (b *bufferedBackend) processEvents(events []*auditinternal.Event) { } } -func (b *bufferedBackend) ProcessEvents(ev ...*auditinternal.Event) { +func (b *bufferedBackend) ProcessEvents(ev ...*auditinternal.Event) bool { // The following mechanism is in place to support the situation when audit // events are still coming after the backend was stopped. var sendErr error @@ -279,9 +279,10 @@ func (b *bufferedBackend) ProcessEvents(ev ...*auditinternal.Event) { case b.buffer <- event: default: sendErr = fmt.Errorf("audit buffer queue blocked") - return + return true } } + return true } func (b *bufferedBackend) String() string { diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/dynamic.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/dynamic.go index a8347aa3e72..393a205e164 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/dynamic.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/dynamic.go @@ -176,10 +176,13 @@ func (s syncedDelegates) Names() []string { } // ProcessEvents proccesses the given events per current delegate map -func (b *backend) ProcessEvents(events ...*auditinternal.Event) { +func (b *backend) ProcessEvents(events ...*auditinternal.Event) bool { for _, d := range b.GetDelegates() { d.ProcessEvents(events...) } + // Returning true regardless of results, since dynamic audit backends + // can never cause apiserver request to fail. + return true } // Run starts a goroutine that propagates the shutdown signal, diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced/enforced.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced/enforced.go index 1263a59ce61..8feb523bedf 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced/enforced.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced/enforced.go @@ -56,7 +56,7 @@ func (b Backend) Shutdown() { // ProcessEvents enforces policy on a shallow copy of the given event // dropping any sections that don't conform -func (b Backend) ProcessEvents(events ...*auditinternal.Event) { +func (b Backend) ProcessEvents(events ...*auditinternal.Event) bool { for _, event := range events { if event == nil { continue @@ -82,6 +82,9 @@ func (b Backend) ProcessEvents(events ...*auditinternal.Event) { } b.delegateBackend.ProcessEvents(e) } + // Returning true regardless of results, since dynamic audit backends + // can never cause apiserver request to fail. + return true } // String returns a string representation of the backend diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/fake/fake.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/fake/fake.go index a886529ec68..53e56ddf418 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/fake/fake.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/fake/fake.go @@ -39,10 +39,11 @@ func (b *Backend) Shutdown() { } // ProcessEvents calls a callback on a batch, if present. -func (b *Backend) ProcessEvents(ev ...*auditinternal.Event) { +func (b *Backend) ProcessEvents(ev ...*auditinternal.Event) bool { if b.OnRequest != nil { b.OnRequest(ev) } + return true } func (b *Backend) String() string { diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/log/backend.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/log/backend.go index 293cdd3c9f7..e1c948f62ae 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/log/backend.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/log/backend.go @@ -59,13 +59,15 @@ func NewBackend(out io.Writer, format string, groupVersion schema.GroupVersion) } } -func (b *backend) ProcessEvents(events ...*auditinternal.Event) { +func (b *backend) ProcessEvents(events ...*auditinternal.Event) bool { + success := true for _, ev := range events { - b.logEvent(ev) + success = b.logEvent(ev) && success } + return success } -func (b *backend) logEvent(ev *auditinternal.Event) { +func (b *backend) logEvent(ev *auditinternal.Event) bool { line := "" switch b.format { case FormatLegacy: @@ -74,17 +76,19 @@ func (b *backend) logEvent(ev *auditinternal.Event) { bs, err := runtime.Encode(audit.Codecs.LegacyCodec(b.groupVersion), ev) if err != nil { audit.HandlePluginError(PluginName, err, ev) - return + return false } line = string(bs[:]) default: audit.HandlePluginError(PluginName, fmt.Errorf("log format %q is not in list of known formats (%s)", b.format, strings.Join(AllowedFormats, ",")), ev) - return + return false } if _, err := fmt.Fprint(b.out, line); err != nil { audit.HandlePluginError(PluginName, err, ev) + return false } + return true } func (b *backend) Run(stopCh <-chan struct{}) error { diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate/truncate.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate/truncate.go index e06f1f2f057..de1c2d9f74b 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate/truncate.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate/truncate.go @@ -71,11 +71,12 @@ func NewBackend(delegateBackend audit.Backend, config Config, groupVersion schem } } -func (b *backend) ProcessEvents(events ...*auditinternal.Event) { +func (b *backend) ProcessEvents(events ...*auditinternal.Event) bool { var errors []error var impacted []*auditinternal.Event var batch []*auditinternal.Event var batchSize int64 + success := true for _, event := range events { size, err := b.calcSize(event) // If event was correctly serialized, but the size is more than allowed @@ -97,7 +98,7 @@ func (b *backend) ProcessEvents(events ...*auditinternal.Event) { } if len(batch) > 0 && batchSize+size > b.c.MaxBatchSize { - b.delegateBackend.ProcessEvents(batch...) + success = b.delegateBackend.ProcessEvents(batch...) && success batch = []*auditinternal.Event{} batchSize = 0 } @@ -107,12 +108,13 @@ func (b *backend) ProcessEvents(events ...*auditinternal.Event) { } if len(batch) > 0 { - b.delegateBackend.ProcessEvents(batch...) + success = b.delegateBackend.ProcessEvents(batch...) && success } if len(impacted) > 0 { audit.HandlePluginError(PluginName, utilerrors.NewAggregate(errors), impacted...) } + return success } // truncate removed request and response objects from the audit events, diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go index 5f5f7169dc3..9b44e8e85d7 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go @@ -81,10 +81,12 @@ func (b *backend) Shutdown() { // nothing to do here } -func (b *backend) ProcessEvents(ev ...*auditinternal.Event) { +func (b *backend) ProcessEvents(ev ...*auditinternal.Event) bool { if err := b.processEvents(ev...); err != nil { audit.HandlePluginError(b.String(), err, ev...) + return false } + return true } func (b *backend) processEvents(ev ...*auditinternal.Event) error { diff --git a/test/integration/examples/webhook_test.go b/test/integration/examples/webhook_test.go index eb4d04fd30b..4d538043078 100644 --- a/test/integration/examples/webhook_test.go +++ b/test/integration/examples/webhook_test.go @@ -108,8 +108,9 @@ func (f auditChecker) LevelAndStages(attrs authorizer.Attributes) (auditinternal type auditSinkFunc func(events ...*auditinternal.Event) -func (f auditSinkFunc) ProcessEvents(events ...*auditinternal.Event) { +func (f auditSinkFunc) ProcessEvents(events ...*auditinternal.Event) bool { f(events...) + return true } func (auditSinkFunc) Run(stopCh <-chan struct{}) error {