diff --git a/staging/src/k8s.io/apiserver/pkg/audit/types.go b/staging/src/k8s.io/apiserver/pkg/audit/types.go index 0b27b0536bf..f1b7cef5457 100644 --- a/staging/src/k8s.io/apiserver/pkg/audit/types.go +++ b/staging/src/k8s.io/apiserver/pkg/audit/types.go @@ -36,6 +36,7 @@ type Backend interface { Run(stopCh <-chan struct{}) error // Shutdown will synchronously shut down the backend while making sure that all pending - // events are delivered. + // events are delivered. It can be assumed that this method is called after + // the stopCh channel passed to the Run method has been closed. Shutdown() } diff --git a/staging/src/k8s.io/apiserver/pkg/audit/union_test.go b/staging/src/k8s.io/apiserver/pkg/audit/union_test.go index 70d33b03f13..45fd6710035 100644 --- a/staging/src/k8s.io/apiserver/pkg/audit/union_test.go +++ b/staging/src/k8s.io/apiserver/pkg/audit/union_test.go @@ -36,8 +36,8 @@ func (f *fakeBackend) Run(stopCh <-chan struct{}) error { return nil } -func (u *fakeBackend) Shutdown() { - // nothing to do here +func (f *fakeBackend) Shutdown() { + // Nothing to do here. } func TestUnion(t *testing.T) { diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/log/backend.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/log/backend.go index ff1c128812f..66660ff2b8c 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/log/backend.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/log/backend.go @@ -86,6 +86,6 @@ func (b *backend) Run(stopCh <-chan struct{}) error { return nil } -func (u *backend) Shutdown() { - // nothing to do here +func (b *backend) Shutdown() { + // Nothing to do here. } diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go index 52e21855ec1..b3844a39984 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go @@ -18,8 +18,10 @@ limitations under the License. package webhook import ( + "errors" "fmt" "strings" + "sync" "time" "k8s.io/apimachinery/pkg/apimachinery/announced" @@ -161,6 +163,7 @@ func newBatchWebhook(configFile string) (*batchBackend, error) { maxBatchSize: defaultBatchMaxSize, maxBatchWait: defaultBatchMaxWait, cloner: c, + shutdownCh: make(chan struct{}), }, nil } @@ -179,28 +182,42 @@ type batchBackend struct { // Receiving maxBatchSize events will always trigger a send, regardless of // if this amount of time has been reached. maxBatchWait time.Duration + + // Channel to signal that the sending routine has stopped and therefore + // it's safe to assume that no new requests will be initiated. + shutdownCh chan struct{} + + // The sending routine locks reqMutex for reading before initiating a new + // goroutine to send a request. This goroutine then unlocks reqMutex for + // reading when completed. The Shutdown method locks reqMutex for writing + // after the sending routine has exited. When reqMutex is locked for writing, + // all requests have been completed and no new will be spawned, since the + // sending routine is not running anymore. + reqMutex sync.RWMutex } func (b *batchBackend) Run(stopCh <-chan struct{}) error { - f := func() { - // Recover from any panics caused by this method so a panic in the - // goroutine can't bring down the main routine. - defer runtime.HandleCrash() - - t := time.NewTimer(b.maxBatchWait) - defer t.Stop() // Release ticker resources - - b.sendBatchEvents(stopCh, t.C) - } - go func() { - for { - f() + // Signal that the sending routine has exited. + defer close(b.shutdownCh) - select { - case <-stopCh: - return - default: + b.runSendingRoutine(stopCh) + + // Handle the events that were received after the last buffer + // scraping and before this line. Since the buffer is closed, no new + // events will come through. + for { + if last := func() bool { + // Recover from any panic in order to try to send all remaining events. + // Note, that in case of a panic, the return value will be false and + // the loop execution will continue. + defer runtime.HandleCrash() + + events := b.collectLastEvents() + b.sendBatchEvents(events) + return len(events) == 0 + }(); last { + break } } }() @@ -208,25 +225,62 @@ func (b *batchBackend) Run(stopCh <-chan struct{}) error { } func (b *batchBackend) Shutdown() { - // TODO: send out batched events + <-b.shutdownCh + + // Write locking reqMutex will guarantee that all requests will be completed + // by the time the goroutine continues the execution. Since this line is + // executed after shutdownCh was closed, no new requests will follow this + // lock, because read lock is called in the same goroutine that closes + // shutdownCh before exiting. + b.reqMutex.Lock() + b.reqMutex.Unlock() } -// sendBatchEvents attempts to batch some number of events to the backend. It POSTs events -// in a goroutine and logging any error encountered during the POST. +// runSendingRoutine runs a loop that collects events from the buffer. When +// stopCh is closed, runSendingRoutine stops and closes the buffer. +func (b *batchBackend) runSendingRoutine(stopCh <-chan struct{}) { + defer close(b.buffer) + + for { + func() { + // Recover from any panics caused by this function so a panic in the + // goroutine can't bring down the main routine. + defer runtime.HandleCrash() + + t := time.NewTimer(b.maxBatchWait) + defer t.Stop() // Release ticker resources + + b.sendBatchEvents(b.collectEvents(stopCh, t.C)) + }() + + select { + case <-stopCh: + return + default: + } + } +} + +// collectEvents attempts to collect some number of events in a batch. // -// The following things can cause sendBatchEvents to exit: +// The following things can cause collectEvents to stop and return the list +// of events: // // * Some maximum number of events are received. // * Timer has passed, all queued events are sent. // * StopCh is closed, all queued events are sent. // -func (b *batchBackend) sendBatchEvents(stopCh <-chan struct{}, timer <-chan time.Time) { +func (b *batchBackend) collectEvents(stopCh <-chan struct{}, timer <-chan time.Time) []auditinternal.Event { var events []auditinternal.Event L: for i := 0; i < b.maxBatchSize; i++ { select { - case ev := <-b.buffer: + case ev, ok := <-b.buffer: + // Buffer channel was closed and no new events will follow. + if !ok { + break L + } events = append(events, *ev) case <-timer: // Timer has expired. Send whatever events are in the queue. @@ -237,15 +291,43 @@ L: } } + return events +} + +// collectLastEvents assumes that the buffer was closed. It collects the first +// maxBatchSize events from the closed buffer into a batch and returns them. +func (b *batchBackend) collectLastEvents() []auditinternal.Event { + var events []auditinternal.Event + + for i := 0; i < b.maxBatchSize; i++ { + ev, ok := <-b.buffer + if !ok { + break + } + events = append(events, *ev) + } + + return events +} + +// sendBatchEvents sends a POST requests with the event list in a goroutine +// and logs any error encountered. +func (b *batchBackend) sendBatchEvents(events []auditinternal.Event) { if len(events) == 0 { return } list := auditinternal.EventList{Items: events} + + // Locking reqMutex for read will guarantee that the shutdown process will + // block until the goroutine started below is finished. At the same time, it + // will not prevent other batches from being proceed further this point. + b.reqMutex.RLock() go func() { // Execute the webhook POST in a goroutine to keep it from blocking. // This lets the webhook continue to drain the queue immediatly. + defer b.reqMutex.RUnlock() defer runtime.HandleCrash() err := webhook.WithExponentialBackoff(0, func() error { @@ -268,10 +350,27 @@ func (b *batchBackend) ProcessEvents(ev ...*auditinternal.Event) { // sent to the Sink. Deep copy and send the copy to the queue. event := e.DeepCopy() - select { - case b.buffer <- event: - default: - audit.HandlePluginError(pluginName, fmt.Errorf("audit webhook queue blocked"), ev[i:]...) + // The following mechanism is in place to support the situation when audit + // events are still coming after the backend was shut down. + var sendErr error + func() { + // If the backend was shut down and the buffer channel was closed, an + // attempt to add an event to it will result in panic that we should + // recover from. + defer func() { + if err := recover(); err != nil { + sendErr = errors.New("audit webhook shut down") + } + }() + + select { + case b.buffer <- event: + default: + sendErr = errors.New("audit webhook queue blocked") + } + }() + if sendErr != nil { + audit.HandlePluginError(pluginName, sendErr, ev[i:]...) return } } diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook_test.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook_test.go index 728e209c4a4..80dec8ef9c0 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook_test.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook_test.go @@ -163,7 +163,7 @@ func TestBatchWebhookMaxEvents(t *testing.T) { stopCh := make(chan struct{}) timer := make(chan time.Time, 1) - backend.sendBatchEvents(stopCh, timer) + backend.sendBatchEvents(backend.collectEvents(stopCh, timer)) require.Equal(t, defaultBatchMaxSize, <-got, "did not get batch max size") go func() { @@ -171,7 +171,7 @@ func TestBatchWebhookMaxEvents(t *testing.T) { timer <- time.Now() // Trigger the wait timeout }() - backend.sendBatchEvents(stopCh, timer) + backend.sendBatchEvents(backend.collectEvents(stopCh, timer)) require.Equal(t, nRest, <-got, "failed to get the rest of the events") } @@ -198,10 +198,78 @@ func TestBatchWebhookStopCh(t *testing.T) { waitForEmptyBuffer(backend) close(stopCh) // stop channel has stopped }() - backend.sendBatchEvents(stopCh, timer) + backend.sendBatchEvents(backend.collectEvents(stopCh, timer)) require.Equal(t, expected, <-got, "get queued events after timer expires") } +func TestBatchWebhookProcessEventsAfterStop(t *testing.T) { + events := make([]*auditinternal.Event, 1) // less than max size. + for i := range events { + events[i] = &auditinternal.Event{} + } + + got := make(chan struct{}) + s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1alpha1.EventList) { + close(got) + })) + defer s.Close() + + backend := newTestBatchWebhook(t, s.URL) + stopCh := make(chan struct{}) + + backend.Run(stopCh) + close(stopCh) + <-backend.shutdownCh + backend.ProcessEvents(events...) + assert.Equal(t, 0, len(backend.buffer), "processed events after the backed has been stopped") +} + +func TestBatchWebhookShutdown(t *testing.T) { + events := make([]*auditinternal.Event, 1) + for i := range events { + events[i] = &auditinternal.Event{} + } + + got := make(chan struct{}) + contReqCh := make(chan struct{}) + shutdownCh := make(chan struct{}) + s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1alpha1.EventList) { + close(got) + <-contReqCh + })) + defer s.Close() + + backend := newTestBatchWebhook(t, s.URL) + backend.ProcessEvents(events...) + + go func() { + // Assume stopCh was closed. + close(backend.buffer) + backend.sendBatchEvents(backend.collectLastEvents()) + }() + + <-got + + go func() { + close(backend.shutdownCh) + backend.Shutdown() + close(shutdownCh) + }() + + // Wait for some time in case there's a bug that allows for the Shutdown + // method to exit before all requests has been completed. + time.Sleep(1 * time.Second) + select { + case <-shutdownCh: + t.Fatal("Backend shut down before all requests finished") + default: + // Continue. + } + + close(contReqCh) + <-shutdownCh +} + func TestBatchWebhookEmptyBuffer(t *testing.T) { events := make([]*auditinternal.Event, 1) // less than max size. for i := range events { @@ -223,7 +291,7 @@ func TestBatchWebhookEmptyBuffer(t *testing.T) { timer <- time.Now() // Timer is done. // Buffer is empty, no events have been queued. This should exit but send no events. - backend.sendBatchEvents(stopCh, timer) + backend.sendBatchEvents(backend.collectEvents(stopCh, timer)) // Send additional events after the sendBatchEvents has been called. backend.ProcessEvents(events...) @@ -232,7 +300,7 @@ func TestBatchWebhookEmptyBuffer(t *testing.T) { timer <- time.Now() }() - backend.sendBatchEvents(stopCh, timer) + backend.sendBatchEvents(backend.collectEvents(stopCh, timer)) // Make sure we didn't get a POST with zero events. require.Equal(t, expected, <-got, "expected one event")