Synchronous & unbatched audit log writes

This commit is contained in:
Tim Allclair 2018-03-15 00:44:46 -07:00
parent e26f5d19d4
commit c9670d0652
No known key found for this signature in database
GPG Key ID: 434D16BCEF479EAB
5 changed files with 201 additions and 96 deletions

View File

@ -231,6 +231,7 @@ func TestAddFlags(t *testing.T) {
ThrottleEnable: false,
ThrottleQPS: 43.5,
ThrottleBurst: 44,
AsyncDelegate: true,
},
},
TruncateOptions: apiserveroptions.AuditTruncateOptions{

View File

@ -42,6 +42,16 @@ import (
pluginwebhook "k8s.io/apiserver/plugin/pkg/audit/webhook"
)
const (
// Default configuration values for ModeBatch.
defaultBatchBufferSize = 10000 // Buffer up to 10000 events before starting discarding.
// These batch parameters are only used by the webhook backend.
defaultBatchMaxSize = 400 // Only send up to 400 events at a time.
defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute.
defaultBatchThrottleQPS = 10 // Limit the send rate by 10 QPS.
defaultBatchThrottleBurst = 15 // Allow up to 15 QPS burst.
)
func appendBackend(existing, newBackend audit.Backend) audit.Backend {
if existing == nil {
return newBackend
@ -129,15 +139,12 @@ type AuditWebhookOptions struct {
}
func NewAuditOptions() *AuditOptions {
defaultLogBatchConfig := pluginbuffered.NewDefaultBatchConfig()
defaultLogBatchConfig.ThrottleEnable = false
return &AuditOptions{
WebhookOptions: AuditWebhookOptions{
InitialBackoff: pluginwebhook.DefaultInitialBackoff,
BatchOptions: AuditBatchOptions{
Mode: ModeBatch,
BatchConfig: pluginbuffered.NewDefaultBatchConfig(),
BatchConfig: defaultWebhookBatchConfig(),
},
TruncateOptions: NewAuditTruncateOptions(),
// TODO(audit): use v1 API in release 1.13
@ -147,7 +154,7 @@ func NewAuditOptions() *AuditOptions {
Format: pluginlog.FormatJson,
BatchOptions: AuditBatchOptions{
Mode: ModeBlocking,
BatchConfig: defaultLogBatchConfig,
BatchConfig: defaultLogBatchConfig(),
},
TruncateOptions: NewAuditTruncateOptions(),
// TODO(audit): use v1 API in release 1.13
@ -213,11 +220,13 @@ func validateBackendBatchOptions(pluginName string, options AuditBatchOptions) e
if config.MaxBatchSize <= 0 {
return fmt.Errorf("invalid audit batch %s max batch size %v, must be a positive number", pluginName, config.MaxBatchSize)
}
if config.ThrottleQPS <= 0 {
return fmt.Errorf("invalid audit batch %s throttle QPS %v, must be a positive number", pluginName, config.ThrottleQPS)
}
if config.ThrottleBurst <= 0 {
return fmt.Errorf("invalid audit batch %s throttle burst %v, must be a positive number", pluginName, config.ThrottleBurst)
if config.ThrottleEnable {
if config.ThrottleQPS <= 0 {
return fmt.Errorf("invalid audit batch %s throttle QPS %v, must be a positive number", pluginName, config.ThrottleQPS)
}
if config.ThrottleBurst <= 0 {
return fmt.Errorf("invalid audit batch %s throttle burst %v, must be a positive number", pluginName, config.ThrottleBurst)
}
}
return nil
}
@ -525,3 +534,31 @@ func (o *AuditWebhookOptions) applyTo(c *server.Config) error {
c.AuditBackend = appendBackend(c.AuditBackend, webhook)
return nil
}
// defaultWebhookBatchConfig returns the default BatchConfig used by the Webhook backend.
func defaultWebhookBatchConfig() pluginbuffered.BatchConfig {
return pluginbuffered.BatchConfig{
BufferSize: defaultBatchBufferSize,
MaxBatchSize: defaultBatchMaxSize,
MaxBatchWait: defaultBatchMaxWait,
ThrottleEnable: true,
ThrottleQPS: defaultBatchThrottleQPS,
ThrottleBurst: defaultBatchThrottleBurst,
AsyncDelegate: true,
}
}
// defaultLogBatchConfig returns the default BatchConfig used by the Log backend.
func defaultLogBatchConfig() pluginbuffered.BatchConfig {
return pluginbuffered.BatchConfig{
BufferSize: defaultBatchBufferSize,
// Batching is not useful for the log-file backend.
// MaxBatchWait ignored.
MaxBatchSize: 1,
ThrottleEnable: false,
// Asynchronous log threads just create lock contention.
AsyncDelegate: false,
}
}

View File

@ -26,6 +26,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/fake:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
],
)

View File

@ -31,16 +31,6 @@ import (
// PluginName is the name reported in error metrics.
const PluginName = "buffered"
const (
// Default configuration values for ModeBatch.
defaultBatchBufferSize = 10000 // Buffer up to 10000 events before starting discarding.
defaultBatchMaxSize = 400 // Only send up to 400 events at a time.
defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute.
defaultBatchThrottleQPS = 10 // Limit the send rate by 10 QPS.
defaultBatchThrottleBurst = 15 // Allow up to 15 QPS burst.
)
// BatchConfig represents batching delegate audit backend configuration.
type BatchConfig struct {
// BufferSize defines a size of the buffering queue.
@ -57,19 +47,9 @@ type BatchConfig struct {
// ThrottleBurst defines the maximum number of requests sent to the delegate backend at the same moment in case
// the capacity defined by ThrottleQPS was not utilized.
ThrottleBurst int
}
// NewDefaultBatchConfig returns new Config objects populated by default values.
func NewDefaultBatchConfig() BatchConfig {
return BatchConfig{
BufferSize: defaultBatchBufferSize,
MaxBatchSize: defaultBatchMaxSize,
MaxBatchWait: defaultBatchMaxWait,
ThrottleEnable: true,
ThrottleQPS: defaultBatchThrottleQPS,
ThrottleBurst: defaultBatchThrottleBurst,
}
// Whether the delegate backend should be called asynchronously.
AsyncDelegate bool
}
type bufferedBackend struct {
@ -85,6 +65,9 @@ type bufferedBackend struct {
// Receiving maxBatchSize events will always trigger sending a batch, regardless of the amount of time passed.
maxBatchWait time.Duration
// Whether the delegate backend should be called asynchronously.
asyncDelegate bool
// Channel to signal that the batching routine has processed all remaining events and exited.
// Once `shutdownCh` is closed no new events will be sent to the delegate backend.
shutdownCh chan struct{}
@ -113,6 +96,7 @@ func NewBackend(delegate audit.Backend, config BatchConfig) audit.Backend {
buffer: make(chan *auditinternal.Event, config.BufferSize),
maxBatchSize: config.MaxBatchSize,
maxBatchWait: config.MaxBatchWait,
asyncDelegate: config.AsyncDelegate,
shutdownCh: make(chan struct{}),
wg: sync.WaitGroup{},
throttle: throttle,
@ -169,8 +153,17 @@ func (b *bufferedBackend) Shutdown() {
// b.stopCh is closed, processIncomingEvents stops and closes the buffer.
func (b *bufferedBackend) processIncomingEvents(stopCh <-chan struct{}) {
defer close(b.buffer)
t := time.NewTimer(b.maxBatchWait)
defer t.Stop()
var (
maxWaitChan <-chan time.Time
maxWaitTimer *time.Timer
)
// Only use max wait batching if batching is enabled.
if b.maxBatchSize > 1 {
maxWaitTimer = time.NewTimer(b.maxBatchWait)
maxWaitChan = maxWaitTimer.C
defer maxWaitTimer.Stop()
}
for {
func() {
@ -178,8 +171,10 @@ func (b *bufferedBackend) processIncomingEvents(stopCh <-chan struct{}) {
// goroutine can't bring down the main routine.
defer runtime.HandleCrash()
t.Reset(b.maxBatchWait)
b.processEvents(b.collectEvents(t.C, stopCh))
if b.maxBatchSize > 1 {
maxWaitTimer.Reset(b.maxBatchWait)
}
b.processEvents(b.collectEvents(maxWaitChan, stopCh))
}()
select {
@ -235,15 +230,25 @@ func (b *bufferedBackend) processEvents(events []*auditinternal.Event) {
b.throttle.Accept()
}
b.wg.Add(1)
go func() {
defer b.wg.Done()
defer runtime.HandleCrash()
if b.asyncDelegate {
b.wg.Add(1)
go func() {
defer b.wg.Done()
defer runtime.HandleCrash()
// Execute the real processing in a goroutine to keep it from blocking.
// This lets the batching routine continue draining the queue immediately.
b.delegateBackend.ProcessEvents(events...)
}()
// Execute the real processing in a goroutine to keep it from blocking.
// This lets the batching routine continue draining the queue immediately.
b.delegateBackend.ProcessEvents(events...)
}()
} else {
func() {
defer runtime.HandleCrash()
// Execute the real processing in a goroutine to keep it from blocking.
// This lets the batching routine continue draining the queue immediately.
b.delegateBackend.ProcessEvents(events...)
}()
}
}
func (b *bufferedBackend) ProcessEvents(ev ...*auditinternal.Event) {

View File

@ -17,9 +17,12 @@ limitations under the License.
package buffered
import (
"fmt"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/wait"
@ -28,17 +31,7 @@ import (
)
var (
closedStopCh = func() <-chan struct{} {
ch := make(chan struct{})
close(ch)
return ch
}()
infiniteTimeCh <-chan time.Time = make(chan time.Time)
closedTimeCh = func() <-chan time.Time {
ch := make(chan time.Time)
close(ch)
return ch
}()
infiniteTimeCh <-chan time.Time
)
func newEvents(number int) []*auditinternal.Event {
@ -50,72 +43,118 @@ func newEvents(number int) []*auditinternal.Event {
return events
}
func TestBufferedBackendCollectEvents(t *testing.T) {
config := NewDefaultBatchConfig()
testCases := []struct {
desc string
timer <-chan time.Time
stopCh <-chan struct{}
numEvents int
wantBatchSize int
}{
{
desc: "max batch size encountered",
timer: infiniteTimeCh,
stopCh: wait.NeverStop,
numEvents: config.MaxBatchSize + 1,
wantBatchSize: config.MaxBatchSize,
},
{
desc: "timer expired",
timer: closedTimeCh,
stopCh: wait.NeverStop,
},
{
desc: "chanel closed",
timer: infiniteTimeCh,
stopCh: closedStopCh,
},
func testBatchConfig() BatchConfig {
return BatchConfig{
BufferSize: 100,
MaxBatchSize: 10,
MaxBatchWait: wait.ForeverTestTimeout,
ThrottleEnable: false,
AsyncDelegate: true,
}
for _, tc := range testCases {
tc := tc
t.Run(tc.desc, func(t *testing.T) {
t.Parallel()
}
backend := NewBackend(&fake.Backend{}, config).(*bufferedBackend)
func TestBatchedBackendCollectEvents(t *testing.T) {
config := testBatchConfig()
batchSize := config.MaxBatchSize
backend := NewBackend(&fake.Backend{}, config).(*bufferedBackend)
backend.ProcessEvents(newEvents(tc.numEvents)...)
batch := backend.collectEvents(tc.timer, tc.stopCh)
t.Log("Max batch size encountered.")
backend.ProcessEvents(newEvents(batchSize + 1)...)
batch := backend.collectEvents(nil, nil)
assert.Len(t, batch, batchSize, "Expected full batch")
require.Equal(t, tc.wantBatchSize, len(batch), "unexpected batch size")
})
t.Log("Partial batch should hang until timer expires.")
backend.ProcessEvents(newEvents(1)...)
tc := make(chan time.Time)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
batch = backend.collectEvents(tc, nil)
}()
// Wait for the queued events to be collected.
err := wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) {
return len(backend.buffer) == 0, nil
})
require.NoError(t, err)
tc <- time.Now() // Trigger "timeout"
wg.Wait()
assert.Len(t, batch, 2, "Expected partial batch")
t.Log("Collected events should be delivered when stop channel is closed.")
backend.ProcessEvents(newEvents(3)...)
stopCh := make(chan struct{})
wg.Add(1)
go func() {
defer wg.Done()
batch = backend.collectEvents(nil, stopCh)
}()
// Wait for the queued events to be collected.
err = wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) {
return len(backend.buffer) == 0, nil
})
require.NoError(t, err)
close(stopCh)
wg.Wait()
assert.Len(t, batch, 3, "Expected partial batch")
}
func TestUnbatchedBackendCollectEvents(t *testing.T) {
config := testBatchConfig()
config.MaxBatchSize = 1 // No batching.
backend := NewBackend(&fake.Backend{}, config).(*bufferedBackend)
t.Log("Max batch size encountered.")
backend.ProcessEvents(newEvents(3)...)
batch := backend.collectEvents(nil, nil)
assert.Len(t, batch, 1, "Expected single event")
t.Log("Queue should always be drained.")
for len(backend.buffer) > 0 {
batch = backend.collectEvents(nil, nil)
assert.Len(t, batch, 1, "Expected single event")
}
t.Log("Collection should hault when stop channel is closed.")
stopCh := make(chan struct{})
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
batch = backend.collectEvents(nil, stopCh)
}()
close(stopCh)
wg.Wait()
assert.Empty(t, batch, "Empty final batch")
}
func TestBufferedBackendProcessEventsAfterStop(t *testing.T) {
t.Parallel()
backend := NewBackend(&fake.Backend{}, NewDefaultBatchConfig()).(*bufferedBackend)
backend := NewBackend(&fake.Backend{}, testBatchConfig()).(*bufferedBackend)
closedStopCh := make(chan struct{})
close(closedStopCh)
backend.Run(closedStopCh)
backend.Shutdown()
backend.ProcessEvents(newEvents(1)...)
batch := backend.collectEvents(infiniteTimeCh, wait.NeverStop)
require.Equal(t, 0, len(batch), "processed events after the backed has been stopped")
require.Empty(t, batch, "processed events after the backed has been stopped")
}
func TestBufferedBackendProcessEventsBufferFull(t *testing.T) {
t.Parallel()
config := NewDefaultBatchConfig()
config := testBatchConfig()
config.BufferSize = 1
backend := NewBackend(&fake.Backend{}, config).(*bufferedBackend)
backend.ProcessEvents(newEvents(2)...)
require.Equal(t, 1, len(backend.buffer), "buffed contains more elements than it should")
require.Len(t, backend.buffer, 1, "buffed contains more elements than it should")
}
func TestBufferedBackendShutdownWaitsForDelegatedCalls(t *testing.T) {
@ -129,7 +168,7 @@ func TestBufferedBackendShutdownWaitsForDelegatedCalls(t *testing.T) {
<-delegatedCallEndCh
},
}
config := NewDefaultBatchConfig()
config := testBatchConfig()
backend := NewBackend(delegateBackend, config)
// Run backend, process events, wait for them to be batched and for delegated call to start.
@ -159,3 +198,25 @@ func TestBufferedBackendShutdownWaitsForDelegatedCalls(t *testing.T) {
close(delegatedCallEndCh)
<-shutdownEndCh
}
func TestDelegateProcessEvents(t *testing.T) {
for _, async := range []bool{true, false} {
t.Run(fmt.Sprintf("async:%t", async), func(t *testing.T) {
config := testBatchConfig()
config.AsyncDelegate = async
wg := sync.WaitGroup{}
delegate := &fake.Backend{
OnRequest: func(events []*auditinternal.Event) {
assert.Len(t, events, config.MaxBatchSize, "Unexpected batch")
wg.Done()
},
}
b := NewBackend(delegate, config).(*bufferedBackend)
wg.Add(5)
for i := 0; i < 5; i++ {
b.processEvents(newEvents(config.MaxBatchSize))
}
wg.Wait()
})
}
}