Implement batching audit webhook graceful shutdown

This commit is contained in:
Mik Vyatskov 2017-08-15 12:30:14 +02:00
parent 4d6db7466c
commit 7798d32fc7
5 changed files with 205 additions and 37 deletions

View File

@ -36,6 +36,7 @@ type Backend interface {
Run(stopCh <-chan struct{}) error
// Shutdown will synchronously shut down the backend while making sure that all pending
// events are delivered.
// events are delivered. It can be assumed that this method is called after
// the stopCh channel passed to the Run method has been closed.
Shutdown()
}

View File

@ -36,8 +36,8 @@ func (f *fakeBackend) Run(stopCh <-chan struct{}) error {
return nil
}
func (u *fakeBackend) Shutdown() {
// nothing to do here
func (f *fakeBackend) Shutdown() {
// Nothing to do here.
}
func TestUnion(t *testing.T) {

View File

@ -86,6 +86,6 @@ func (b *backend) Run(stopCh <-chan struct{}) error {
return nil
}
func (u *backend) Shutdown() {
// nothing to do here
func (b *backend) Shutdown() {
// Nothing to do here.
}

View File

@ -18,8 +18,10 @@ limitations under the License.
package webhook
import (
"errors"
"fmt"
"strings"
"sync"
"time"
"k8s.io/apimachinery/pkg/apimachinery/announced"
@ -161,6 +163,7 @@ func newBatchWebhook(configFile string) (*batchBackend, error) {
maxBatchSize: defaultBatchMaxSize,
maxBatchWait: defaultBatchMaxWait,
cloner: c,
shutdownCh: make(chan struct{}),
}, nil
}
@ -179,28 +182,42 @@ type batchBackend struct {
// Receiving maxBatchSize events will always trigger a send, regardless of
// if this amount of time has been reached.
maxBatchWait time.Duration
// Channel to signal that the sending routine has stopped and therefore
// it's safe to assume that no new requests will be initiated.
shutdownCh chan struct{}
// The sending routine locks reqMutex for reading before initiating a new
// goroutine to send a request. This goroutine then unlocks reqMutex for
// reading when completed. The Shutdown method locks reqMutex for writing
// after the sending routine has exited. When reqMutex is locked for writing,
// all requests have been completed and no new will be spawned, since the
// sending routine is not running anymore.
reqMutex sync.RWMutex
}
func (b *batchBackend) Run(stopCh <-chan struct{}) error {
f := func() {
// Recover from any panics caused by this method so a panic in the
// goroutine can't bring down the main routine.
defer runtime.HandleCrash()
t := time.NewTimer(b.maxBatchWait)
defer t.Stop() // Release ticker resources
b.sendBatchEvents(stopCh, t.C)
}
go func() {
for {
f()
// Signal that the sending routine has exited.
defer close(b.shutdownCh)
select {
case <-stopCh:
return
default:
b.runSendingRoutine(stopCh)
// Handle the events that were received after the last buffer
// scraping and before this line. Since the buffer is closed, no new
// events will come through.
for {
if last := func() bool {
// Recover from any panic in order to try to send all remaining events.
// Note, that in case of a panic, the return value will be false and
// the loop execution will continue.
defer runtime.HandleCrash()
events := b.collectLastEvents()
b.sendBatchEvents(events)
return len(events) == 0
}(); last {
break
}
}
}()
@ -208,25 +225,62 @@ func (b *batchBackend) Run(stopCh <-chan struct{}) error {
}
func (b *batchBackend) Shutdown() {
// TODO: send out batched events
<-b.shutdownCh
// Write locking reqMutex will guarantee that all requests will be completed
// by the time the goroutine continues the execution. Since this line is
// executed after shutdownCh was closed, no new requests will follow this
// lock, because read lock is called in the same goroutine that closes
// shutdownCh before exiting.
b.reqMutex.Lock()
b.reqMutex.Unlock()
}
// sendBatchEvents attempts to batch some number of events to the backend. It POSTs events
// in a goroutine and logging any error encountered during the POST.
// runSendingRoutine runs a loop that collects events from the buffer. When
// stopCh is closed, runSendingRoutine stops and closes the buffer.
func (b *batchBackend) runSendingRoutine(stopCh <-chan struct{}) {
defer close(b.buffer)
for {
func() {
// Recover from any panics caused by this function so a panic in the
// goroutine can't bring down the main routine.
defer runtime.HandleCrash()
t := time.NewTimer(b.maxBatchWait)
defer t.Stop() // Release ticker resources
b.sendBatchEvents(b.collectEvents(stopCh, t.C))
}()
select {
case <-stopCh:
return
default:
}
}
}
// collectEvents attempts to collect some number of events in a batch.
//
// The following things can cause sendBatchEvents to exit:
// The following things can cause collectEvents to stop and return the list
// of events:
//
// * Some maximum number of events are received.
// * Timer has passed, all queued events are sent.
// * StopCh is closed, all queued events are sent.
//
func (b *batchBackend) sendBatchEvents(stopCh <-chan struct{}, timer <-chan time.Time) {
func (b *batchBackend) collectEvents(stopCh <-chan struct{}, timer <-chan time.Time) []auditinternal.Event {
var events []auditinternal.Event
L:
for i := 0; i < b.maxBatchSize; i++ {
select {
case ev := <-b.buffer:
case ev, ok := <-b.buffer:
// Buffer channel was closed and no new events will follow.
if !ok {
break L
}
events = append(events, *ev)
case <-timer:
// Timer has expired. Send whatever events are in the queue.
@ -237,15 +291,43 @@ L:
}
}
return events
}
// collectLastEvents assumes that the buffer was closed. It collects the first
// maxBatchSize events from the closed buffer into a batch and returns them.
func (b *batchBackend) collectLastEvents() []auditinternal.Event {
var events []auditinternal.Event
for i := 0; i < b.maxBatchSize; i++ {
ev, ok := <-b.buffer
if !ok {
break
}
events = append(events, *ev)
}
return events
}
// sendBatchEvents sends a POST requests with the event list in a goroutine
// and logs any error encountered.
func (b *batchBackend) sendBatchEvents(events []auditinternal.Event) {
if len(events) == 0 {
return
}
list := auditinternal.EventList{Items: events}
// Locking reqMutex for read will guarantee that the shutdown process will
// block until the goroutine started below is finished. At the same time, it
// will not prevent other batches from being proceed further this point.
b.reqMutex.RLock()
go func() {
// Execute the webhook POST in a goroutine to keep it from blocking.
// This lets the webhook continue to drain the queue immediatly.
defer b.reqMutex.RUnlock()
defer runtime.HandleCrash()
err := webhook.WithExponentialBackoff(0, func() error {
@ -268,10 +350,27 @@ func (b *batchBackend) ProcessEvents(ev ...*auditinternal.Event) {
// sent to the Sink. Deep copy and send the copy to the queue.
event := e.DeepCopy()
select {
case b.buffer <- event:
default:
audit.HandlePluginError(pluginName, fmt.Errorf("audit webhook queue blocked"), ev[i:]...)
// The following mechanism is in place to support the situation when audit
// events are still coming after the backend was shut down.
var sendErr error
func() {
// If the backend was shut down and the buffer channel was closed, an
// attempt to add an event to it will result in panic that we should
// recover from.
defer func() {
if err := recover(); err != nil {
sendErr = errors.New("audit webhook shut down")
}
}()
select {
case b.buffer <- event:
default:
sendErr = errors.New("audit webhook queue blocked")
}
}()
if sendErr != nil {
audit.HandlePluginError(pluginName, sendErr, ev[i:]...)
return
}
}

View File

@ -163,7 +163,7 @@ func TestBatchWebhookMaxEvents(t *testing.T) {
stopCh := make(chan struct{})
timer := make(chan time.Time, 1)
backend.sendBatchEvents(stopCh, timer)
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
require.Equal(t, defaultBatchMaxSize, <-got, "did not get batch max size")
go func() {
@ -171,7 +171,7 @@ func TestBatchWebhookMaxEvents(t *testing.T) {
timer <- time.Now() // Trigger the wait timeout
}()
backend.sendBatchEvents(stopCh, timer)
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
require.Equal(t, nRest, <-got, "failed to get the rest of the events")
}
@ -198,10 +198,78 @@ func TestBatchWebhookStopCh(t *testing.T) {
waitForEmptyBuffer(backend)
close(stopCh) // stop channel has stopped
}()
backend.sendBatchEvents(stopCh, timer)
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
require.Equal(t, expected, <-got, "get queued events after timer expires")
}
func TestBatchWebhookProcessEventsAfterStop(t *testing.T) {
events := make([]*auditinternal.Event, 1) // less than max size.
for i := range events {
events[i] = &auditinternal.Event{}
}
got := make(chan struct{})
s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1alpha1.EventList) {
close(got)
}))
defer s.Close()
backend := newTestBatchWebhook(t, s.URL)
stopCh := make(chan struct{})
backend.Run(stopCh)
close(stopCh)
<-backend.shutdownCh
backend.ProcessEvents(events...)
assert.Equal(t, 0, len(backend.buffer), "processed events after the backed has been stopped")
}
func TestBatchWebhookShutdown(t *testing.T) {
events := make([]*auditinternal.Event, 1)
for i := range events {
events[i] = &auditinternal.Event{}
}
got := make(chan struct{})
contReqCh := make(chan struct{})
shutdownCh := make(chan struct{})
s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1alpha1.EventList) {
close(got)
<-contReqCh
}))
defer s.Close()
backend := newTestBatchWebhook(t, s.URL)
backend.ProcessEvents(events...)
go func() {
// Assume stopCh was closed.
close(backend.buffer)
backend.sendBatchEvents(backend.collectLastEvents())
}()
<-got
go func() {
close(backend.shutdownCh)
backend.Shutdown()
close(shutdownCh)
}()
// Wait for some time in case there's a bug that allows for the Shutdown
// method to exit before all requests has been completed.
time.Sleep(1 * time.Second)
select {
case <-shutdownCh:
t.Fatal("Backend shut down before all requests finished")
default:
// Continue.
}
close(contReqCh)
<-shutdownCh
}
func TestBatchWebhookEmptyBuffer(t *testing.T) {
events := make([]*auditinternal.Event, 1) // less than max size.
for i := range events {
@ -223,7 +291,7 @@ func TestBatchWebhookEmptyBuffer(t *testing.T) {
timer <- time.Now() // Timer is done.
// Buffer is empty, no events have been queued. This should exit but send no events.
backend.sendBatchEvents(stopCh, timer)
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
// Send additional events after the sendBatchEvents has been called.
backend.ProcessEvents(events...)
@ -232,7 +300,7 @@ func TestBatchWebhookEmptyBuffer(t *testing.T) {
timer <- time.Now()
}()
backend.sendBatchEvents(stopCh, timer)
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
// Make sure we didn't get a POST with zero events.
require.Equal(t, expected, <-got, "expected one event")