Merge pull request #67223 from tallclair/audit-log

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Synchronous & unbatched audit log writes

**What this PR does / why we need it**:
When enabling buffered audit log file writes to reduce latency under high load, we shouldn't be batching the writes, as the large data write can have an inverse (though unpredictable) impact. Additionally, batched audit log writes should not be done asynchronously, as this just creates lock contention on the log writer.

This is a clean-ed up version of https://github.com/kubernetes/kubernetes/pull/61217

**Which issue(s) this PR fixes**
Fixes #61932 

**Release note**:

```release-note
NONE
```

/sig auth
/priority important-soon
/kind bug
/milestone v1.12
This commit is contained in:
Kubernetes Submit Queue 2018-08-16 06:29:18 -07:00 committed by GitHub
commit 87e7b9fc4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 201 additions and 96 deletions

View File

@ -230,6 +230,7 @@ func TestAddFlags(t *testing.T) {
ThrottleEnable: false,
ThrottleQPS: 43.5,
ThrottleBurst: 44,
AsyncDelegate: true,
},
},
TruncateOptions: apiserveroptions.AuditTruncateOptions{

View File

@ -42,6 +42,16 @@ import (
pluginwebhook "k8s.io/apiserver/plugin/pkg/audit/webhook"
)
const (
// Default configuration values for ModeBatch.
defaultBatchBufferSize = 10000 // Buffer up to 10000 events before starting discarding.
// These batch parameters are only used by the webhook backend.
defaultBatchMaxSize = 400 // Only send up to 400 events at a time.
defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute.
defaultBatchThrottleQPS = 10 // Limit the send rate by 10 QPS.
defaultBatchThrottleBurst = 15 // Allow up to 15 QPS burst.
)
func appendBackend(existing, newBackend audit.Backend) audit.Backend {
if existing == nil {
return newBackend
@ -129,15 +139,12 @@ type AuditWebhookOptions struct {
}
func NewAuditOptions() *AuditOptions {
defaultLogBatchConfig := pluginbuffered.NewDefaultBatchConfig()
defaultLogBatchConfig.ThrottleEnable = false
return &AuditOptions{
WebhookOptions: AuditWebhookOptions{
InitialBackoff: pluginwebhook.DefaultInitialBackoff,
BatchOptions: AuditBatchOptions{
Mode: ModeBatch,
BatchConfig: pluginbuffered.NewDefaultBatchConfig(),
BatchConfig: defaultWebhookBatchConfig(),
},
TruncateOptions: NewAuditTruncateOptions(),
// TODO(audit): use v1 API in release 1.13
@ -147,7 +154,7 @@ func NewAuditOptions() *AuditOptions {
Format: pluginlog.FormatJson,
BatchOptions: AuditBatchOptions{
Mode: ModeBlocking,
BatchConfig: defaultLogBatchConfig,
BatchConfig: defaultLogBatchConfig(),
},
TruncateOptions: NewAuditTruncateOptions(),
// TODO(audit): use v1 API in release 1.13
@ -213,11 +220,13 @@ func validateBackendBatchOptions(pluginName string, options AuditBatchOptions) e
if config.MaxBatchSize <= 0 {
return fmt.Errorf("invalid audit batch %s max batch size %v, must be a positive number", pluginName, config.MaxBatchSize)
}
if config.ThrottleQPS <= 0 {
return fmt.Errorf("invalid audit batch %s throttle QPS %v, must be a positive number", pluginName, config.ThrottleQPS)
}
if config.ThrottleBurst <= 0 {
return fmt.Errorf("invalid audit batch %s throttle burst %v, must be a positive number", pluginName, config.ThrottleBurst)
if config.ThrottleEnable {
if config.ThrottleQPS <= 0 {
return fmt.Errorf("invalid audit batch %s throttle QPS %v, must be a positive number", pluginName, config.ThrottleQPS)
}
if config.ThrottleBurst <= 0 {
return fmt.Errorf("invalid audit batch %s throttle burst %v, must be a positive number", pluginName, config.ThrottleBurst)
}
}
return nil
}
@ -525,3 +534,31 @@ func (o *AuditWebhookOptions) applyTo(c *server.Config) error {
c.AuditBackend = appendBackend(c.AuditBackend, webhook)
return nil
}
// defaultWebhookBatchConfig returns the default BatchConfig used by the Webhook backend.
func defaultWebhookBatchConfig() pluginbuffered.BatchConfig {
return pluginbuffered.BatchConfig{
BufferSize: defaultBatchBufferSize,
MaxBatchSize: defaultBatchMaxSize,
MaxBatchWait: defaultBatchMaxWait,
ThrottleEnable: true,
ThrottleQPS: defaultBatchThrottleQPS,
ThrottleBurst: defaultBatchThrottleBurst,
AsyncDelegate: true,
}
}
// defaultLogBatchConfig returns the default BatchConfig used by the Log backend.
func defaultLogBatchConfig() pluginbuffered.BatchConfig {
return pluginbuffered.BatchConfig{
BufferSize: defaultBatchBufferSize,
// Batching is not useful for the log-file backend.
// MaxBatchWait ignored.
MaxBatchSize: 1,
ThrottleEnable: false,
// Asynchronous log threads just create lock contention.
AsyncDelegate: false,
}
}

View File

@ -26,6 +26,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/fake:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
],
)

View File

@ -31,16 +31,6 @@ import (
// PluginName is the name reported in error metrics.
const PluginName = "buffered"
const (
// Default configuration values for ModeBatch.
defaultBatchBufferSize = 10000 // Buffer up to 10000 events before starting discarding.
defaultBatchMaxSize = 400 // Only send up to 400 events at a time.
defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute.
defaultBatchThrottleQPS = 10 // Limit the send rate by 10 QPS.
defaultBatchThrottleBurst = 15 // Allow up to 15 QPS burst.
)
// BatchConfig represents batching delegate audit backend configuration.
type BatchConfig struct {
// BufferSize defines a size of the buffering queue.
@ -57,19 +47,9 @@ type BatchConfig struct {
// ThrottleBurst defines the maximum number of requests sent to the delegate backend at the same moment in case
// the capacity defined by ThrottleQPS was not utilized.
ThrottleBurst int
}
// NewDefaultBatchConfig returns new Config objects populated by default values.
func NewDefaultBatchConfig() BatchConfig {
return BatchConfig{
BufferSize: defaultBatchBufferSize,
MaxBatchSize: defaultBatchMaxSize,
MaxBatchWait: defaultBatchMaxWait,
ThrottleEnable: true,
ThrottleQPS: defaultBatchThrottleQPS,
ThrottleBurst: defaultBatchThrottleBurst,
}
// Whether the delegate backend should be called asynchronously.
AsyncDelegate bool
}
type bufferedBackend struct {
@ -85,6 +65,9 @@ type bufferedBackend struct {
// Receiving maxBatchSize events will always trigger sending a batch, regardless of the amount of time passed.
maxBatchWait time.Duration
// Whether the delegate backend should be called asynchronously.
asyncDelegate bool
// Channel to signal that the batching routine has processed all remaining events and exited.
// Once `shutdownCh` is closed no new events will be sent to the delegate backend.
shutdownCh chan struct{}
@ -113,6 +96,7 @@ func NewBackend(delegate audit.Backend, config BatchConfig) audit.Backend {
buffer: make(chan *auditinternal.Event, config.BufferSize),
maxBatchSize: config.MaxBatchSize,
maxBatchWait: config.MaxBatchWait,
asyncDelegate: config.AsyncDelegate,
shutdownCh: make(chan struct{}),
wg: sync.WaitGroup{},
throttle: throttle,
@ -169,8 +153,17 @@ func (b *bufferedBackend) Shutdown() {
// b.stopCh is closed, processIncomingEvents stops and closes the buffer.
func (b *bufferedBackend) processIncomingEvents(stopCh <-chan struct{}) {
defer close(b.buffer)
t := time.NewTimer(b.maxBatchWait)
defer t.Stop()
var (
maxWaitChan <-chan time.Time
maxWaitTimer *time.Timer
)
// Only use max wait batching if batching is enabled.
if b.maxBatchSize > 1 {
maxWaitTimer = time.NewTimer(b.maxBatchWait)
maxWaitChan = maxWaitTimer.C
defer maxWaitTimer.Stop()
}
for {
func() {
@ -178,8 +171,10 @@ func (b *bufferedBackend) processIncomingEvents(stopCh <-chan struct{}) {
// goroutine can't bring down the main routine.
defer runtime.HandleCrash()
t.Reset(b.maxBatchWait)
b.processEvents(b.collectEvents(t.C, stopCh))
if b.maxBatchSize > 1 {
maxWaitTimer.Reset(b.maxBatchWait)
}
b.processEvents(b.collectEvents(maxWaitChan, stopCh))
}()
select {
@ -235,15 +230,25 @@ func (b *bufferedBackend) processEvents(events []*auditinternal.Event) {
b.throttle.Accept()
}
b.wg.Add(1)
go func() {
defer b.wg.Done()
defer runtime.HandleCrash()
if b.asyncDelegate {
b.wg.Add(1)
go func() {
defer b.wg.Done()
defer runtime.HandleCrash()
// Execute the real processing in a goroutine to keep it from blocking.
// This lets the batching routine continue draining the queue immediately.
b.delegateBackend.ProcessEvents(events...)
}()
// Execute the real processing in a goroutine to keep it from blocking.
// This lets the batching routine continue draining the queue immediately.
b.delegateBackend.ProcessEvents(events...)
}()
} else {
func() {
defer runtime.HandleCrash()
// Execute the real processing in a goroutine to keep it from blocking.
// This lets the batching routine continue draining the queue immediately.
b.delegateBackend.ProcessEvents(events...)
}()
}
}
func (b *bufferedBackend) ProcessEvents(ev ...*auditinternal.Event) {

View File

@ -17,9 +17,12 @@ limitations under the License.
package buffered
import (
"fmt"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/wait"
@ -28,17 +31,7 @@ import (
)
var (
closedStopCh = func() <-chan struct{} {
ch := make(chan struct{})
close(ch)
return ch
}()
infiniteTimeCh <-chan time.Time = make(chan time.Time)
closedTimeCh = func() <-chan time.Time {
ch := make(chan time.Time)
close(ch)
return ch
}()
infiniteTimeCh <-chan time.Time
)
func newEvents(number int) []*auditinternal.Event {
@ -50,72 +43,118 @@ func newEvents(number int) []*auditinternal.Event {
return events
}
func TestBufferedBackendCollectEvents(t *testing.T) {
config := NewDefaultBatchConfig()
testCases := []struct {
desc string
timer <-chan time.Time
stopCh <-chan struct{}
numEvents int
wantBatchSize int
}{
{
desc: "max batch size encountered",
timer: infiniteTimeCh,
stopCh: wait.NeverStop,
numEvents: config.MaxBatchSize + 1,
wantBatchSize: config.MaxBatchSize,
},
{
desc: "timer expired",
timer: closedTimeCh,
stopCh: wait.NeverStop,
},
{
desc: "chanel closed",
timer: infiniteTimeCh,
stopCh: closedStopCh,
},
func testBatchConfig() BatchConfig {
return BatchConfig{
BufferSize: 100,
MaxBatchSize: 10,
MaxBatchWait: wait.ForeverTestTimeout,
ThrottleEnable: false,
AsyncDelegate: true,
}
for _, tc := range testCases {
tc := tc
t.Run(tc.desc, func(t *testing.T) {
t.Parallel()
}
backend := NewBackend(&fake.Backend{}, config).(*bufferedBackend)
func TestBatchedBackendCollectEvents(t *testing.T) {
config := testBatchConfig()
batchSize := config.MaxBatchSize
backend := NewBackend(&fake.Backend{}, config).(*bufferedBackend)
backend.ProcessEvents(newEvents(tc.numEvents)...)
batch := backend.collectEvents(tc.timer, tc.stopCh)
t.Log("Max batch size encountered.")
backend.ProcessEvents(newEvents(batchSize + 1)...)
batch := backend.collectEvents(nil, nil)
assert.Len(t, batch, batchSize, "Expected full batch")
require.Equal(t, tc.wantBatchSize, len(batch), "unexpected batch size")
})
t.Log("Partial batch should hang until timer expires.")
backend.ProcessEvents(newEvents(1)...)
tc := make(chan time.Time)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
batch = backend.collectEvents(tc, nil)
}()
// Wait for the queued events to be collected.
err := wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) {
return len(backend.buffer) == 0, nil
})
require.NoError(t, err)
tc <- time.Now() // Trigger "timeout"
wg.Wait()
assert.Len(t, batch, 2, "Expected partial batch")
t.Log("Collected events should be delivered when stop channel is closed.")
backend.ProcessEvents(newEvents(3)...)
stopCh := make(chan struct{})
wg.Add(1)
go func() {
defer wg.Done()
batch = backend.collectEvents(nil, stopCh)
}()
// Wait for the queued events to be collected.
err = wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) {
return len(backend.buffer) == 0, nil
})
require.NoError(t, err)
close(stopCh)
wg.Wait()
assert.Len(t, batch, 3, "Expected partial batch")
}
func TestUnbatchedBackendCollectEvents(t *testing.T) {
config := testBatchConfig()
config.MaxBatchSize = 1 // No batching.
backend := NewBackend(&fake.Backend{}, config).(*bufferedBackend)
t.Log("Max batch size encountered.")
backend.ProcessEvents(newEvents(3)...)
batch := backend.collectEvents(nil, nil)
assert.Len(t, batch, 1, "Expected single event")
t.Log("Queue should always be drained.")
for len(backend.buffer) > 0 {
batch = backend.collectEvents(nil, nil)
assert.Len(t, batch, 1, "Expected single event")
}
t.Log("Collection should hault when stop channel is closed.")
stopCh := make(chan struct{})
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
batch = backend.collectEvents(nil, stopCh)
}()
close(stopCh)
wg.Wait()
assert.Empty(t, batch, "Empty final batch")
}
func TestBufferedBackendProcessEventsAfterStop(t *testing.T) {
t.Parallel()
backend := NewBackend(&fake.Backend{}, NewDefaultBatchConfig()).(*bufferedBackend)
backend := NewBackend(&fake.Backend{}, testBatchConfig()).(*bufferedBackend)
closedStopCh := make(chan struct{})
close(closedStopCh)
backend.Run(closedStopCh)
backend.Shutdown()
backend.ProcessEvents(newEvents(1)...)
batch := backend.collectEvents(infiniteTimeCh, wait.NeverStop)
require.Equal(t, 0, len(batch), "processed events after the backed has been stopped")
require.Empty(t, batch, "processed events after the backed has been stopped")
}
func TestBufferedBackendProcessEventsBufferFull(t *testing.T) {
t.Parallel()
config := NewDefaultBatchConfig()
config := testBatchConfig()
config.BufferSize = 1
backend := NewBackend(&fake.Backend{}, config).(*bufferedBackend)
backend.ProcessEvents(newEvents(2)...)
require.Equal(t, 1, len(backend.buffer), "buffed contains more elements than it should")
require.Len(t, backend.buffer, 1, "buffed contains more elements than it should")
}
func TestBufferedBackendShutdownWaitsForDelegatedCalls(t *testing.T) {
@ -129,7 +168,7 @@ func TestBufferedBackendShutdownWaitsForDelegatedCalls(t *testing.T) {
<-delegatedCallEndCh
},
}
config := NewDefaultBatchConfig()
config := testBatchConfig()
backend := NewBackend(delegateBackend, config)
// Run backend, process events, wait for them to be batched and for delegated call to start.
@ -159,3 +198,25 @@ func TestBufferedBackendShutdownWaitsForDelegatedCalls(t *testing.T) {
close(delegatedCallEndCh)
<-shutdownEndCh
}
func TestDelegateProcessEvents(t *testing.T) {
for _, async := range []bool{true, false} {
t.Run(fmt.Sprintf("async:%t", async), func(t *testing.T) {
config := testBatchConfig()
config.AsyncDelegate = async
wg := sync.WaitGroup{}
delegate := &fake.Backend{
OnRequest: func(events []*auditinternal.Event) {
assert.Len(t, events, config.MaxBatchSize, "Unexpected batch")
wg.Done()
},
}
b := NewBackend(delegate, config).(*bufferedBackend)
wg.Add(5)
for i := 0; i < 5; i++ {
b.processEvents(newEvents(config.MaxBatchSize))
}
wg.Wait()
})
}
}