mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-20 18:31:15 +00:00
Add option to k8s apiserver to reject incoming requests upon audit failure
This commit is contained in:
parent
b1a52a38e9
commit
7a10f4eda7
@ -52,12 +52,22 @@ var (
|
|||||||
},
|
},
|
||||||
[]string{"level"},
|
[]string{"level"},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
ApiserverAuditDroppedCounter = prometheus.NewCounter(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "requests_rejected_total",
|
||||||
|
Help: "Counter of apiserver requests rejected due to an error " +
|
||||||
|
"in audit logging backend.",
|
||||||
|
},
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
prometheus.MustRegister(eventCounter)
|
prometheus.MustRegister(eventCounter)
|
||||||
prometheus.MustRegister(errorCounter)
|
prometheus.MustRegister(errorCounter)
|
||||||
prometheus.MustRegister(levelCounter)
|
prometheus.MustRegister(levelCounter)
|
||||||
|
prometheus.MustRegister(ApiserverAuditDroppedCounter)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ObserveEvent updates the relevant prometheus metrics for the generated audit event.
|
// ObserveEvent updates the relevant prometheus metrics for the generated audit event.
|
||||||
|
@ -25,7 +25,8 @@ type Sink interface {
|
|||||||
// Errors might be logged by the sink itself. If an error should be fatal, leading to an internal
|
// Errors might be logged by the sink itself. If an error should be fatal, leading to an internal
|
||||||
// error, ProcessEvents is supposed to panic. The event must not be mutated and is reused by the caller
|
// error, ProcessEvents is supposed to panic. The event must not be mutated and is reused by the caller
|
||||||
// after the call returns, i.e. the sink has to make a deepcopy to keep a copy around if necessary.
|
// after the call returns, i.e. the sink has to make a deepcopy to keep a copy around if necessary.
|
||||||
ProcessEvents(events ...*auditinternal.Event)
|
// Returns true on success, may return false on error.
|
||||||
|
ProcessEvents(events ...*auditinternal.Event) bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type Backend interface {
|
type Backend interface {
|
||||||
|
@ -37,10 +37,12 @@ type union struct {
|
|||||||
backends []Backend
|
backends []Backend
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u union) ProcessEvents(events ...*auditinternal.Event) {
|
func (u union) ProcessEvents(events ...*auditinternal.Event) bool {
|
||||||
|
success := true
|
||||||
for _, backend := range u.backends {
|
for _, backend := range u.backends {
|
||||||
backend.ProcessEvents(events...)
|
success = backend.ProcessEvents(events...) && success
|
||||||
}
|
}
|
||||||
|
return success
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u union) Run(stopCh <-chan struct{}) error {
|
func (u union) Run(stopCh <-chan struct{}) error {
|
||||||
|
@ -28,8 +28,9 @@ type fakeBackend struct {
|
|||||||
events []*auditinternal.Event
|
events []*auditinternal.Event
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fakeBackend) ProcessEvents(events ...*auditinternal.Event) {
|
func (f *fakeBackend) ProcessEvents(events ...*auditinternal.Event) bool {
|
||||||
f.events = append(f.events, events...)
|
f.events = append(f.events, events...)
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fakeBackend) Run(stopCh <-chan struct{}) error {
|
func (f *fakeBackend) Run(stopCh <-chan struct{}) error {
|
||||||
|
@ -39,13 +39,14 @@ type fakeAuditSink struct {
|
|||||||
events []*auditinternal.Event
|
events []*auditinternal.Event
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *fakeAuditSink) ProcessEvents(evs ...*auditinternal.Event) {
|
func (s *fakeAuditSink) ProcessEvents(evs ...*auditinternal.Event) bool {
|
||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
defer s.lock.Unlock()
|
defer s.lock.Unlock()
|
||||||
for _, ev := range evs {
|
for _, ev := range evs {
|
||||||
e := ev.DeepCopy()
|
e := ev.DeepCopy()
|
||||||
s.events = append(s.events, e)
|
s.events = append(s.events, e)
|
||||||
}
|
}
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *fakeAuditSink) Events() []*auditinternal.Event {
|
func (s *fakeAuditSink) Events() []*auditinternal.Event {
|
||||||
|
@ -56,7 +56,11 @@ func WithAudit(handler http.Handler, sink audit.Sink, policy policy.Checker, lon
|
|||||||
}
|
}
|
||||||
|
|
||||||
ev.Stage = auditinternal.StageRequestReceived
|
ev.Stage = auditinternal.StageRequestReceived
|
||||||
processAuditEvent(sink, ev, omitStages)
|
if processed := processAuditEvent(sink, ev, omitStages); !processed {
|
||||||
|
audit.ApiserverAuditDroppedCounter.Inc()
|
||||||
|
responsewriters.InternalError(w, req, errors.New("failed to store audit event"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// intercept the status code
|
// intercept the status code
|
||||||
var longRunningSink audit.Sink
|
var longRunningSink audit.Sink
|
||||||
@ -137,10 +141,10 @@ func createAuditEventAndAttachToContext(req *http.Request, policy policy.Checker
|
|||||||
return req, ev, omitStages, nil
|
return req, ev, omitStages, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func processAuditEvent(sink audit.Sink, ev *auditinternal.Event, omitStages []auditinternal.Stage) {
|
func processAuditEvent(sink audit.Sink, ev *auditinternal.Event, omitStages []auditinternal.Stage) bool {
|
||||||
for _, stage := range omitStages {
|
for _, stage := range omitStages {
|
||||||
if ev.Stage == stage {
|
if ev.Stage == stage {
|
||||||
return
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -150,7 +154,7 @@ func processAuditEvent(sink audit.Sink, ev *auditinternal.Event, omitStages []au
|
|||||||
ev.StageTimestamp = metav1.NewMicroTime(time.Now())
|
ev.StageTimestamp = metav1.NewMicroTime(time.Now())
|
||||||
}
|
}
|
||||||
audit.ObserveEvent()
|
audit.ObserveEvent()
|
||||||
sink.ProcessEvents(ev)
|
return sink.ProcessEvents(ev)
|
||||||
}
|
}
|
||||||
|
|
||||||
func decorateResponseWriter(responseWriter http.ResponseWriter, ev *auditinternal.Event, sink audit.Sink, omitStages []auditinternal.Stage) http.ResponseWriter {
|
func decorateResponseWriter(responseWriter http.ResponseWriter, ev *auditinternal.Event, sink audit.Sink, omitStages []auditinternal.Stage) http.ResponseWriter {
|
||||||
|
@ -42,13 +42,14 @@ type fakeAuditSink struct {
|
|||||||
events []*auditinternal.Event
|
events []*auditinternal.Event
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *fakeAuditSink) ProcessEvents(evs ...*auditinternal.Event) {
|
func (s *fakeAuditSink) ProcessEvents(evs ...*auditinternal.Event) bool {
|
||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
defer s.lock.Unlock()
|
defer s.lock.Unlock()
|
||||||
for _, e := range evs {
|
for _, e := range evs {
|
||||||
event := e.DeepCopy()
|
event := e.DeepCopy()
|
||||||
s.events = append(s.events, event)
|
s.events = append(s.events, event)
|
||||||
}
|
}
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *fakeAuditSink) Events() []*auditinternal.Event {
|
func (s *fakeAuditSink) Events() []*auditinternal.Event {
|
||||||
|
@ -42,6 +42,7 @@ go_library(
|
|||||||
"//staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/validating:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/validating:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/apis/apiserver:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/apis/apiserver:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/apis/apiserver/v1alpha1:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/apis/apiserver/v1alpha1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/apis/audit/v1:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/apis/audit/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/apis/audit/v1alpha1:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/apis/audit/v1alpha1:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/apis/audit/v1beta1:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/apis/audit/v1beta1:go_default_library",
|
||||||
|
@ -29,6 +29,7 @@ import (
|
|||||||
|
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
auditinternal "k8s.io/apiserver/pkg/apis/audit"
|
||||||
auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
|
auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
|
||||||
auditv1alpha1 "k8s.io/apiserver/pkg/apis/audit/v1alpha1"
|
auditv1alpha1 "k8s.io/apiserver/pkg/apis/audit/v1alpha1"
|
||||||
auditv1beta1 "k8s.io/apiserver/pkg/apis/audit/v1beta1"
|
auditv1beta1 "k8s.io/apiserver/pkg/apis/audit/v1beta1"
|
||||||
@ -89,12 +90,17 @@ const (
|
|||||||
// a set of events. This causes requests to the API server to wait for the
|
// a set of events. This causes requests to the API server to wait for the
|
||||||
// flush before sending a response.
|
// flush before sending a response.
|
||||||
ModeBlocking = "blocking"
|
ModeBlocking = "blocking"
|
||||||
|
// ModeBlockingStrict is the same as ModeBlocking, except when there is
|
||||||
|
// a failure during audit logging at RequestReceived stage, the whole
|
||||||
|
// request to apiserver will fail.
|
||||||
|
ModeBlockingStrict = "blocking-strict"
|
||||||
)
|
)
|
||||||
|
|
||||||
// AllowedModes is the modes known for audit backends.
|
// AllowedModes is the modes known for audit backends.
|
||||||
var AllowedModes = []string{
|
var AllowedModes = []string{
|
||||||
ModeBatch,
|
ModeBatch,
|
||||||
ModeBlocking,
|
ModeBlocking,
|
||||||
|
ModeBlockingStrict,
|
||||||
}
|
}
|
||||||
|
|
||||||
type AuditBatchOptions struct {
|
type AuditBatchOptions struct {
|
||||||
@ -393,10 +399,26 @@ func (o *AuditBatchOptions) AddFlags(pluginName string, fs *pflag.FlagSet) {
|
|||||||
"moment if ThrottleQPS was not utilized before. Only used in batch mode.")
|
"moment if ThrottleQPS was not utilized before. Only used in batch mode.")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ignoreErrorsBackend struct {
|
||||||
|
audit.Backend
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *ignoreErrorsBackend) ProcessEvents(ev ...*auditinternal.Event) bool {
|
||||||
|
i.Backend.ProcessEvents(ev...)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *ignoreErrorsBackend) String() string {
|
||||||
|
return fmt.Sprintf("ignoreErrors<%s>", i.Backend)
|
||||||
|
}
|
||||||
|
|
||||||
func (o *AuditBatchOptions) wrapBackend(delegate audit.Backend) audit.Backend {
|
func (o *AuditBatchOptions) wrapBackend(delegate audit.Backend) audit.Backend {
|
||||||
if o.Mode == ModeBlocking {
|
if o.Mode == ModeBlockingStrict {
|
||||||
return delegate
|
return delegate
|
||||||
}
|
}
|
||||||
|
if o.Mode == ModeBlocking {
|
||||||
|
return &ignoreErrorsBackend{Backend: delegate}
|
||||||
|
}
|
||||||
return pluginbuffered.NewBackend(delegate, o.BatchConfig)
|
return pluginbuffered.NewBackend(delegate, o.BatchConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,7 +67,7 @@ func TestAuditValidOptions(t *testing.T) {
|
|||||||
o.PolicyFile = policy
|
o.PolicyFile = policy
|
||||||
return o
|
return o
|
||||||
},
|
},
|
||||||
expected: "log",
|
expected: "ignoreErrors<log>",
|
||||||
}, {
|
}, {
|
||||||
name: "default log no policy",
|
name: "default log no policy",
|
||||||
options: func() *AuditOptions {
|
options: func() *AuditOptions {
|
||||||
@ -93,6 +93,16 @@ func TestAuditValidOptions(t *testing.T) {
|
|||||||
return o
|
return o
|
||||||
},
|
},
|
||||||
expected: "",
|
expected: "",
|
||||||
|
}, {
|
||||||
|
name: "strict webhook",
|
||||||
|
options: func() *AuditOptions {
|
||||||
|
o := NewAuditOptions()
|
||||||
|
o.WebhookOptions.ConfigFile = webhookConfig
|
||||||
|
o.WebhookOptions.BatchOptions.Mode = ModeBlockingStrict
|
||||||
|
o.PolicyFile = policy
|
||||||
|
return o
|
||||||
|
},
|
||||||
|
expected: "webhook",
|
||||||
}, {
|
}, {
|
||||||
name: "default union",
|
name: "default union",
|
||||||
options: func() *AuditOptions {
|
options: func() *AuditOptions {
|
||||||
@ -102,7 +112,7 @@ func TestAuditValidOptions(t *testing.T) {
|
|||||||
o.PolicyFile = policy
|
o.PolicyFile = policy
|
||||||
return o
|
return o
|
||||||
},
|
},
|
||||||
expected: "union[log,buffered<webhook>]",
|
expected: "union[ignoreErrors<log>,buffered<webhook>]",
|
||||||
}, {
|
}, {
|
||||||
name: "custom",
|
name: "custom",
|
||||||
options: func() *AuditOptions {
|
options: func() *AuditOptions {
|
||||||
@ -114,7 +124,7 @@ func TestAuditValidOptions(t *testing.T) {
|
|||||||
o.PolicyFile = policy
|
o.PolicyFile = policy
|
||||||
return o
|
return o
|
||||||
},
|
},
|
||||||
expected: "union[buffered<log>,webhook]",
|
expected: "union[buffered<log>,ignoreErrors<webhook>]",
|
||||||
}, {
|
}, {
|
||||||
name: "default webhook with truncating",
|
name: "default webhook with truncating",
|
||||||
options: func() *AuditOptions {
|
options: func() *AuditOptions {
|
||||||
@ -151,7 +161,7 @@ func TestAuditValidOptions(t *testing.T) {
|
|||||||
o.PolicyFile = policy
|
o.PolicyFile = policy
|
||||||
return o
|
return o
|
||||||
},
|
},
|
||||||
expected: "union[enforced<log>,dynamic[]]",
|
expected: "union[enforced<ignoreErrors<log>>,dynamic[]]",
|
||||||
}, {
|
}, {
|
||||||
name: "dynamic with truncating and webhook",
|
name: "dynamic with truncating and webhook",
|
||||||
options: func() *AuditOptions {
|
options: func() *AuditOptions {
|
||||||
@ -174,7 +184,7 @@ func TestAuditValidOptions(t *testing.T) {
|
|||||||
o.LogOptions.Path = "/audit"
|
o.LogOptions.Path = "/audit"
|
||||||
return o
|
return o
|
||||||
},
|
},
|
||||||
expected: "union[enforced<log>,truncate<union[enforced<buffered<webhook>>,dynamic[]]>]",
|
expected: "union[enforced<ignoreErrors<log>>,truncate<union[enforced<buffered<webhook>>,dynamic[]]>]",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
|
@ -251,7 +251,7 @@ func (b *bufferedBackend) processEvents(events []*auditinternal.Event) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *bufferedBackend) ProcessEvents(ev ...*auditinternal.Event) {
|
func (b *bufferedBackend) ProcessEvents(ev ...*auditinternal.Event) bool {
|
||||||
// The following mechanism is in place to support the situation when audit
|
// The following mechanism is in place to support the situation when audit
|
||||||
// events are still coming after the backend was stopped.
|
// events are still coming after the backend was stopped.
|
||||||
var sendErr error
|
var sendErr error
|
||||||
@ -279,9 +279,10 @@ func (b *bufferedBackend) ProcessEvents(ev ...*auditinternal.Event) {
|
|||||||
case b.buffer <- event:
|
case b.buffer <- event:
|
||||||
default:
|
default:
|
||||||
sendErr = fmt.Errorf("audit buffer queue blocked")
|
sendErr = fmt.Errorf("audit buffer queue blocked")
|
||||||
return
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *bufferedBackend) String() string {
|
func (b *bufferedBackend) String() string {
|
||||||
|
@ -176,10 +176,13 @@ func (s syncedDelegates) Names() []string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ProcessEvents proccesses the given events per current delegate map
|
// ProcessEvents proccesses the given events per current delegate map
|
||||||
func (b *backend) ProcessEvents(events ...*auditinternal.Event) {
|
func (b *backend) ProcessEvents(events ...*auditinternal.Event) bool {
|
||||||
for _, d := range b.GetDelegates() {
|
for _, d := range b.GetDelegates() {
|
||||||
d.ProcessEvents(events...)
|
d.ProcessEvents(events...)
|
||||||
}
|
}
|
||||||
|
// Returning true regardless of results, since dynamic audit backends
|
||||||
|
// can never cause apiserver request to fail.
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run starts a goroutine that propagates the shutdown signal,
|
// Run starts a goroutine that propagates the shutdown signal,
|
||||||
|
@ -56,7 +56,7 @@ func (b Backend) Shutdown() {
|
|||||||
|
|
||||||
// ProcessEvents enforces policy on a shallow copy of the given event
|
// ProcessEvents enforces policy on a shallow copy of the given event
|
||||||
// dropping any sections that don't conform
|
// dropping any sections that don't conform
|
||||||
func (b Backend) ProcessEvents(events ...*auditinternal.Event) {
|
func (b Backend) ProcessEvents(events ...*auditinternal.Event) bool {
|
||||||
for _, event := range events {
|
for _, event := range events {
|
||||||
if event == nil {
|
if event == nil {
|
||||||
continue
|
continue
|
||||||
@ -82,6 +82,9 @@ func (b Backend) ProcessEvents(events ...*auditinternal.Event) {
|
|||||||
}
|
}
|
||||||
b.delegateBackend.ProcessEvents(e)
|
b.delegateBackend.ProcessEvents(e)
|
||||||
}
|
}
|
||||||
|
// Returning true regardless of results, since dynamic audit backends
|
||||||
|
// can never cause apiserver request to fail.
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// String returns a string representation of the backend
|
// String returns a string representation of the backend
|
||||||
|
@ -39,10 +39,11 @@ func (b *Backend) Shutdown() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ProcessEvents calls a callback on a batch, if present.
|
// ProcessEvents calls a callback on a batch, if present.
|
||||||
func (b *Backend) ProcessEvents(ev ...*auditinternal.Event) {
|
func (b *Backend) ProcessEvents(ev ...*auditinternal.Event) bool {
|
||||||
if b.OnRequest != nil {
|
if b.OnRequest != nil {
|
||||||
b.OnRequest(ev)
|
b.OnRequest(ev)
|
||||||
}
|
}
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Backend) String() string {
|
func (b *Backend) String() string {
|
||||||
|
@ -59,13 +59,15 @@ func NewBackend(out io.Writer, format string, groupVersion schema.GroupVersion)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *backend) ProcessEvents(events ...*auditinternal.Event) {
|
func (b *backend) ProcessEvents(events ...*auditinternal.Event) bool {
|
||||||
|
success := true
|
||||||
for _, ev := range events {
|
for _, ev := range events {
|
||||||
b.logEvent(ev)
|
success = b.logEvent(ev) && success
|
||||||
}
|
}
|
||||||
|
return success
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *backend) logEvent(ev *auditinternal.Event) {
|
func (b *backend) logEvent(ev *auditinternal.Event) bool {
|
||||||
line := ""
|
line := ""
|
||||||
switch b.format {
|
switch b.format {
|
||||||
case FormatLegacy:
|
case FormatLegacy:
|
||||||
@ -74,17 +76,19 @@ func (b *backend) logEvent(ev *auditinternal.Event) {
|
|||||||
bs, err := runtime.Encode(audit.Codecs.LegacyCodec(b.groupVersion), ev)
|
bs, err := runtime.Encode(audit.Codecs.LegacyCodec(b.groupVersion), ev)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
audit.HandlePluginError(PluginName, err, ev)
|
audit.HandlePluginError(PluginName, err, ev)
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
line = string(bs[:])
|
line = string(bs[:])
|
||||||
default:
|
default:
|
||||||
audit.HandlePluginError(PluginName, fmt.Errorf("log format %q is not in list of known formats (%s)",
|
audit.HandlePluginError(PluginName, fmt.Errorf("log format %q is not in list of known formats (%s)",
|
||||||
b.format, strings.Join(AllowedFormats, ",")), ev)
|
b.format, strings.Join(AllowedFormats, ",")), ev)
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
if _, err := fmt.Fprint(b.out, line); err != nil {
|
if _, err := fmt.Fprint(b.out, line); err != nil {
|
||||||
audit.HandlePluginError(PluginName, err, ev)
|
audit.HandlePluginError(PluginName, err, ev)
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *backend) Run(stopCh <-chan struct{}) error {
|
func (b *backend) Run(stopCh <-chan struct{}) error {
|
||||||
|
@ -71,11 +71,12 @@ func NewBackend(delegateBackend audit.Backend, config Config, groupVersion schem
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *backend) ProcessEvents(events ...*auditinternal.Event) {
|
func (b *backend) ProcessEvents(events ...*auditinternal.Event) bool {
|
||||||
var errors []error
|
var errors []error
|
||||||
var impacted []*auditinternal.Event
|
var impacted []*auditinternal.Event
|
||||||
var batch []*auditinternal.Event
|
var batch []*auditinternal.Event
|
||||||
var batchSize int64
|
var batchSize int64
|
||||||
|
success := true
|
||||||
for _, event := range events {
|
for _, event := range events {
|
||||||
size, err := b.calcSize(event)
|
size, err := b.calcSize(event)
|
||||||
// If event was correctly serialized, but the size is more than allowed
|
// If event was correctly serialized, but the size is more than allowed
|
||||||
@ -97,7 +98,7 @@ func (b *backend) ProcessEvents(events ...*auditinternal.Event) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if len(batch) > 0 && batchSize+size > b.c.MaxBatchSize {
|
if len(batch) > 0 && batchSize+size > b.c.MaxBatchSize {
|
||||||
b.delegateBackend.ProcessEvents(batch...)
|
success = b.delegateBackend.ProcessEvents(batch...) && success
|
||||||
batch = []*auditinternal.Event{}
|
batch = []*auditinternal.Event{}
|
||||||
batchSize = 0
|
batchSize = 0
|
||||||
}
|
}
|
||||||
@ -107,12 +108,13 @@ func (b *backend) ProcessEvents(events ...*auditinternal.Event) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if len(batch) > 0 {
|
if len(batch) > 0 {
|
||||||
b.delegateBackend.ProcessEvents(batch...)
|
success = b.delegateBackend.ProcessEvents(batch...) && success
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(impacted) > 0 {
|
if len(impacted) > 0 {
|
||||||
audit.HandlePluginError(PluginName, utilerrors.NewAggregate(errors), impacted...)
|
audit.HandlePluginError(PluginName, utilerrors.NewAggregate(errors), impacted...)
|
||||||
}
|
}
|
||||||
|
return success
|
||||||
}
|
}
|
||||||
|
|
||||||
// truncate removed request and response objects from the audit events,
|
// truncate removed request and response objects from the audit events,
|
||||||
|
@ -81,10 +81,12 @@ func (b *backend) Shutdown() {
|
|||||||
// nothing to do here
|
// nothing to do here
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *backend) ProcessEvents(ev ...*auditinternal.Event) {
|
func (b *backend) ProcessEvents(ev ...*auditinternal.Event) bool {
|
||||||
if err := b.processEvents(ev...); err != nil {
|
if err := b.processEvents(ev...); err != nil {
|
||||||
audit.HandlePluginError(b.String(), err, ev...)
|
audit.HandlePluginError(b.String(), err, ev...)
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *backend) processEvents(ev ...*auditinternal.Event) error {
|
func (b *backend) processEvents(ev ...*auditinternal.Event) error {
|
||||||
|
@ -108,8 +108,9 @@ func (f auditChecker) LevelAndStages(attrs authorizer.Attributes) (auditinternal
|
|||||||
|
|
||||||
type auditSinkFunc func(events ...*auditinternal.Event)
|
type auditSinkFunc func(events ...*auditinternal.Event)
|
||||||
|
|
||||||
func (f auditSinkFunc) ProcessEvents(events ...*auditinternal.Event) {
|
func (f auditSinkFunc) ProcessEvents(events ...*auditinternal.Event) bool {
|
||||||
f(events...)
|
f(events...)
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (auditSinkFunc) Run(stopCh <-chan struct{}) error {
|
func (auditSinkFunc) Run(stopCh <-chan struct{}) error {
|
||||||
|
Loading…
Reference in New Issue
Block a user