From 881e6d4f6f905079b2c27299e7b631b6903b6815 Mon Sep 17 00:00:00 2001 From: Mik Vyatskov Date: Thu, 22 Feb 2018 19:52:33 +0100 Subject: [PATCH] Add buffering to the log audit backend Signed-off-by: Mik Vyatskov --- cluster/gce/gci/configure-helper.sh | 2 +- cmd/kube-apiserver/app/options/BUILD | 2 +- .../app/options/options_test.go | 42 ++- .../Godeps/Godeps.json | 4 + .../k8s.io/apiserver/pkg/server/options/BUILD | 1 + .../apiserver/pkg/server/options/audit.go | 183 ++++++--- .../plugin/pkg/audit/buffered/buffered.go | 8 +- .../apiserver/plugin/pkg/audit/log/backend.go | 9 +- .../apiserver/plugin/pkg/audit/webhook/BUILD | 8 +- .../plugin/pkg/audit/webhook/webhook.go | 354 ++---------------- .../plugin/pkg/audit/webhook/webhook_test.go | 285 +------------- .../audit/webhook/webhook_v1alpha1_test.go | 289 -------------- .../k8s.io/kube-aggregator/Godeps/Godeps.json | 4 + .../sample-apiserver/Godeps/Godeps.json | 4 + 14 files changed, 216 insertions(+), 979 deletions(-) delete mode 100644 staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook_v1alpha1_test.go diff --git a/cluster/gce/gci/configure-helper.sh b/cluster/gce/gci/configure-helper.sh index fd7ee4ba76f..fb9cd3cb151 100644 --- a/cluster/gce/gci/configure-helper.sh +++ b/cluster/gce/gci/configure-helper.sh @@ -1635,7 +1635,7 @@ function start-kube-apiserver { params+=" --audit-webhook-batch-throttle-burst=${ADVANCED_AUDIT_WEBHOOK_THROTTLE_BURST}" fi if [[ -n "${ADVANCED_AUDIT_WEBHOOK_INITIAL_BACKOFF:-}" ]]; then - params+=" --audit-webhook-batch-initial-backoff=${ADVANCED_AUDIT_WEBHOOK_INITIAL_BACKOFF}" + params+=" --audit-webhook-initial-backoff=${ADVANCED_AUDIT_WEBHOOK_INITIAL_BACKOFF}" fi create-master-audit-webhook-config "${audit_webhook_config_file}" audit_webhook_config_mount="{\"name\": \"auditwebhookconfigmount\",\"mountPath\": \"${audit_webhook_config_file}\", \"readOnly\": true}," diff --git a/cmd/kube-apiserver/app/options/BUILD b/cmd/kube-apiserver/app/options/BUILD index 189e4fbc2c4..bb6f67d6aa2 100644 --- a/cmd/kube-apiserver/app/options/BUILD +++ b/cmd/kube-apiserver/app/options/BUILD @@ -46,7 +46,7 @@ go_test( "//vendor/k8s.io/apiserver/pkg/server/options:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library", - "//vendor/k8s.io/apiserver/plugin/pkg/audit/webhook:go_default_library", + "//vendor/k8s.io/apiserver/plugin/pkg/audit/buffered:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", ], ) diff --git a/cmd/kube-apiserver/app/options/options_test.go b/cmd/kube-apiserver/app/options/options_test.go index 65372401b32..f90c333a0a5 100644 --- a/cmd/kube-apiserver/app/options/options_test.go +++ b/cmd/kube-apiserver/app/options/options_test.go @@ -29,7 +29,7 @@ import ( genericoptions "k8s.io/apiserver/pkg/server/options" "k8s.io/apiserver/pkg/storage/storagebackend" utilflag "k8s.io/apiserver/pkg/util/flag" - auditwebhook "k8s.io/apiserver/plugin/pkg/audit/webhook" + auditbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered" restclient "k8s.io/client-go/rest" "k8s.io/kubernetes/pkg/api/legacyscheme" kapi "k8s.io/kubernetes/pkg/apis/core" @@ -54,15 +54,23 @@ func TestAddFlags(t *testing.T) { "--audit-log-maxbackup=12", "--audit-log-maxsize=13", "--audit-log-path=/var/log", + "--audit-log-mode=blocking", + "--audit-log-batch-buffer-size=46", + "--audit-log-batch-max-size=47", + "--audit-log-batch-max-wait=48s", + "--audit-log-batch-throttle-enable=true", + "--audit-log-batch-throttle-qps=49.5", + "--audit-log-batch-throttle-burst=50", "--audit-policy-file=/policy", "--audit-webhook-config-file=/webhook-config", "--audit-webhook-mode=blocking", "--audit-webhook-batch-buffer-size=42", "--audit-webhook-batch-max-size=43", "--audit-webhook-batch-max-wait=1s", + "--audit-webhook-batch-throttle-enable=false", "--audit-webhook-batch-throttle-qps=43.5", "--audit-webhook-batch-throttle-burst=44", - "--audit-webhook-batch-initial-backoff=2s", + "--audit-webhook-initial-backoff=2s", "--authentication-token-webhook-cache-ttl=3m", "--authentication-token-webhook-config-file=/token-webhook-config", "--authorization-mode=AlwaysDeny", @@ -180,18 +188,32 @@ func TestAddFlags(t *testing.T) { MaxBackups: 12, MaxSize: 13, Format: "json", + BatchOptions: apiserveroptions.AuditBatchOptions{ + Mode: "blocking", + BatchConfig: auditbuffered.BatchConfig{ + BufferSize: 46, + MaxBatchSize: 47, + MaxBatchWait: 48 * time.Second, + ThrottleEnable: true, + ThrottleQPS: 49.5, + ThrottleBurst: 50, + }, + }, }, WebhookOptions: apiserveroptions.AuditWebhookOptions{ - Mode: "blocking", ConfigFile: "/webhook-config", - BatchConfig: auditwebhook.BatchBackendConfig{ - BufferSize: 42, - MaxBatchSize: 43, - MaxBatchWait: 1 * time.Second, - ThrottleQPS: 43.5, - ThrottleBurst: 44, - InitialBackoff: 2 * time.Second, + BatchOptions: apiserveroptions.AuditBatchOptions{ + Mode: "blocking", + BatchConfig: auditbuffered.BatchConfig{ + BufferSize: 42, + MaxBatchSize: 43, + MaxBatchWait: 1 * time.Second, + ThrottleEnable: false, + ThrottleQPS: 43.5, + ThrottleBurst: 44, + }, }, + InitialBackoff: 2 * time.Second, }, PolicyFile: "/policy", }, diff --git a/staging/src/k8s.io/apiextensions-apiserver/Godeps/Godeps.json b/staging/src/k8s.io/apiextensions-apiserver/Godeps/Godeps.json index 393c78d5f22..99b398cdfed 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/Godeps/Godeps.json +++ b/staging/src/k8s.io/apiextensions-apiserver/Godeps/Godeps.json @@ -1470,6 +1470,10 @@ "ImportPath": "k8s.io/apiserver/pkg/util/wsstream", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" }, + { + "ImportPath": "k8s.io/apiserver/plugin/pkg/audit/buffered", + "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + }, { "ImportPath": "k8s.io/apiserver/plugin/pkg/audit/log", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/BUILD b/staging/src/k8s.io/apiserver/pkg/server/options/BUILD index 9eda89b33cf..cf9a18215a3 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/options/BUILD @@ -55,6 +55,7 @@ go_library( "//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library", + "//vendor/k8s.io/apiserver/plugin/pkg/audit/buffered:go_default_library", "//vendor/k8s.io/apiserver/plugin/pkg/audit/log:go_default_library", "//vendor/k8s.io/apiserver/plugin/pkg/audit/webhook:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/audit.go b/staging/src/k8s.io/apiserver/pkg/server/options/audit.go index e43ebb12bc9..4e2fa0acd1c 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/audit.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/audit.go @@ -21,6 +21,7 @@ import ( "io" "os" "strings" + "time" "github.com/golang/glog" "github.com/spf13/pflag" @@ -32,6 +33,7 @@ import ( "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/server" utilfeature "k8s.io/apiserver/pkg/util/feature" + pluginbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered" pluginlog "k8s.io/apiserver/plugin/pkg/audit/log" pluginwebhook "k8s.io/apiserver/plugin/pkg/audit/webhook" ) @@ -58,6 +60,33 @@ type AuditOptions struct { WebhookOptions AuditWebhookOptions } +const ( + // ModeBatch indicates that the audit backend should buffer audit events + // internally, sending batch updates either once a certain number of + // events have been received or a certain amount of time has passed. + ModeBatch = "batch" + // ModeBlocking causes the audit backend to block on every attempt to process + // a set of events. This causes requests to the API server to wait for the + // flush before sending a response. + ModeBlocking = "blocking" +) + +// AllowedModes is the modes known for audit backends. +var AllowedModes = []string{ + ModeBatch, + ModeBlocking, +} + +type AuditBatchOptions struct { + // Should the backend asynchronous batch events to the webhook backend or + // should the backend block responses? + // + // Defaults to asynchronous batch events. + Mode string + // Configuration for batching backend. Only used in batch mode. + BatchConfig pluginbuffered.BatchConfig +} + // AuditLogOptions determines the output of the structured audit log by default. // If the AdvancedAuditing feature is set to false, AuditLogOptions holds the legacy // audit log writer. @@ -67,27 +96,37 @@ type AuditLogOptions struct { MaxBackups int MaxSize int Format string + + BatchOptions AuditBatchOptions } // AuditWebhookOptions control the webhook configuration for audit events. type AuditWebhookOptions struct { - ConfigFile string - // Should the webhook asynchronous batch events to the webhook backend or - // should the webhook block responses? - // - // Defaults to asynchronous batch events. - Mode string - // Configuration for batching webhook. Only used in batch mode. - BatchConfig pluginwebhook.BatchBackendConfig + ConfigFile string + InitialBackoff time.Duration + + BatchOptions AuditBatchOptions } func NewAuditOptions() *AuditOptions { + defaultLogBatchConfig := pluginbuffered.NewDefaultBatchConfig() + defaultLogBatchConfig.ThrottleEnable = false + return &AuditOptions{ WebhookOptions: AuditWebhookOptions{ - Mode: pluginwebhook.ModeBatch, - BatchConfig: pluginwebhook.NewDefaultBatchBackendConfig(), + BatchOptions: AuditBatchOptions{ + Mode: ModeBatch, + BatchConfig: defaultLogBatchConfig, + }, + InitialBackoff: pluginwebhook.DefaultInitialBackoff, + }, + LogOptions: AuditLogOptions{ + Format: pluginlog.FormatJson, + BatchOptions: AuditBatchOptions{ + Mode: ModeBatch, + BatchConfig: pluginbuffered.NewDefaultBatchConfig(), + }, }, - LogOptions: AuditLogOptions{Format: pluginlog.FormatJson}, } } @@ -107,30 +146,20 @@ func (o *AuditOptions) Validate() []error { allErrors = append(allErrors, fmt.Errorf("feature '%s' must be enabled to set option --audit-webhook-config-file", features.AdvancedAuditing)) } } else { - // Check webhook mode - validMode := false - for _, m := range pluginwebhook.AllowedModes { - if m == o.WebhookOptions.Mode { - validMode = true - break - } + // check webhook configuration + if err := validateBackendMode(pluginwebhook.PluginName, o.WebhookOptions.BatchOptions.Mode); err != nil { + allErrors = append(allErrors, err) } - if !validMode { - allErrors = append(allErrors, fmt.Errorf("invalid audit webhook mode %s, allowed modes are %q", o.WebhookOptions.Mode, strings.Join(pluginwebhook.AllowedModes, ","))) + if err := validateBackendBatchConfig(pluginwebhook.PluginName, o.LogOptions.BatchOptions.BatchConfig); err != nil { + allErrors = append(allErrors, err) } - // Check webhook batch configuration - if o.WebhookOptions.BatchConfig.BufferSize <= 0 { - allErrors = append(allErrors, fmt.Errorf("invalid audit batch webhook buffer size %v, must be a positive number", o.WebhookOptions.BatchConfig.BufferSize)) + // check log configuration + if err := validateBackendMode(pluginlog.PluginName, o.LogOptions.BatchOptions.Mode); err != nil { + allErrors = append(allErrors, err) } - if o.WebhookOptions.BatchConfig.MaxBatchSize <= 0 { - allErrors = append(allErrors, fmt.Errorf("invalid audit batch webhook max batch size %v, must be a positive number", o.WebhookOptions.BatchConfig.MaxBatchSize)) - } - if o.WebhookOptions.BatchConfig.ThrottleQPS <= 0 { - allErrors = append(allErrors, fmt.Errorf("invalid audit batch webhook throttle QPS %v, must be a positive number", o.WebhookOptions.BatchConfig.ThrottleQPS)) - } - if o.WebhookOptions.BatchConfig.ThrottleBurst <= 0 { - allErrors = append(allErrors, fmt.Errorf("invalid audit batch webhook throttle burst %v, must be a positive number", o.WebhookOptions.BatchConfig.ThrottleBurst)) + if err := validateBackendBatchConfig(pluginlog.PluginName, o.LogOptions.BatchOptions.BatchConfig); err != nil { + allErrors = append(allErrors, err) } // Check log format @@ -160,6 +189,31 @@ func (o *AuditOptions) Validate() []error { return allErrors } +func validateBackendMode(pluginName string, mode string) error { + for _, m := range AllowedModes { + if m == mode { + return nil + } + } + return fmt.Errorf("invalid audit %s mode %s, allowed modes are %q", pluginName, mode, strings.Join(AllowedModes, ",")) +} + +func validateBackendBatchConfig(pluginName string, config pluginbuffered.BatchConfig) error { + if config.BufferSize <= 0 { + return fmt.Errorf("invalid audit batch %s buffer size %v, must be a positive number", pluginName, config.BufferSize) + } + if config.MaxBatchSize <= 0 { + return fmt.Errorf("invalid audit batch %s max batch size %v, must be a positive number", pluginName, config.MaxBatchSize) + } + if config.ThrottleQPS <= 0 { + return fmt.Errorf("invalid audit batch %s throttle QPS %v, must be a positive number", pluginName, config.ThrottleQPS) + } + if config.ThrottleBurst <= 0 { + return fmt.Errorf("invalid audit batch %s throttle burst %v, must be a positive number", pluginName, config.ThrottleBurst) + } + return nil +} + func (o *AuditOptions) AddFlags(fs *pflag.FlagSet) { if o == nil { return @@ -170,7 +224,9 @@ func (o *AuditOptions) AddFlags(fs *pflag.FlagSet) { " With AdvancedAuditing, a profile is required to enable auditing.") o.LogOptions.AddFlags(fs) + o.LogOptions.BatchOptions.AddFlags(pluginlog.PluginName, fs) o.WebhookOptions.AddFlags(fs) + o.WebhookOptions.BatchOptions.AddFlags(pluginwebhook.PluginName, fs) } func (o *AuditOptions) ApplyTo(c *server.Config) error { @@ -216,6 +272,36 @@ func (o *AuditOptions) applyTo(c *server.Config) error { return nil } +func (o *AuditBatchOptions) AddFlags(pluginName string, fs *pflag.FlagSet) { + fs.StringVar(&o.Mode, fmt.Sprintf("audit-%s-mode", pluginName), o.Mode, + "Strategy for sending audit events. Blocking indicates sending events should block"+ + " server responses. Batch causes the backend to buffer and write events"+ + " asynchronously. Known modes are "+strings.Join(AllowedModes, ",")+".") + fs.IntVar(&o.BatchConfig.BufferSize, fmt.Sprintf("audit-%s-batch-buffer-size", pluginName), + o.BatchConfig.BufferSize, "The size of the buffer to store events before "+ + "batching and writing. Only used in batch mode.") + fs.IntVar(&o.BatchConfig.MaxBatchSize, fmt.Sprintf("audit-%s-batch-max-size", pluginName), + o.BatchConfig.MaxBatchSize, "The maximum size of a batch. Only used in batch mode.") + fs.DurationVar(&o.BatchConfig.MaxBatchWait, fmt.Sprintf("audit-%s-batch-max-wait", pluginName), + o.BatchConfig.MaxBatchWait, "The amount of time to wait before force writing the "+ + "batch that hadn't reached the max size. Only used in batch mode.") + fs.BoolVar(&o.BatchConfig.ThrottleEnable, fmt.Sprintf("audit-%s-batch-throttle-enable", pluginName), + o.BatchConfig.ThrottleEnable, "Whether batching throttling is enabled. Only used in batch mode.") + fs.Float32Var(&o.BatchConfig.ThrottleQPS, fmt.Sprintf("audit-%s-batch-throttle-qps", pluginName), + o.BatchConfig.ThrottleQPS, "Maximum average number of batches per second. "+ + "Only used in batch mode.") + fs.IntVar(&o.BatchConfig.ThrottleBurst, fmt.Sprintf("audit-%s-batch-throttle-burst", pluginName), + o.BatchConfig.ThrottleBurst, "Maximum number of requests sent at the same "+ + "moment if ThrottleQPS was not utilized before. Only used in batch mode.") +} + +func (o *AuditBatchOptions) wrapBackend(delegate audit.Backend) audit.Backend { + if o.Mode == ModeBlocking { + return delegate + } + return pluginbuffered.NewBackend(delegate, o.BatchConfig) +} + func (o *AuditLogOptions) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&o.Path, "audit-log-path", o.Path, "If set, all requests coming to the apiserver will be logged to this file. '-' means standard out.") @@ -250,7 +336,8 @@ func (o *AuditLogOptions) getWriter() io.Writer { func (o *AuditLogOptions) advancedApplyTo(c *server.Config) error { if w := o.getWriter(); w != nil { - c.AuditBackend = appendBackend(c.AuditBackend, pluginlog.NewBackend(w, o.Format, auditv1beta1.SchemeGroupVersion)) + log := pluginlog.NewBackend(w, o.Format, auditv1beta1.SchemeGroupVersion) + c.AuditBackend = appendBackend(c.AuditBackend, o.BatchOptions.wrapBackend(log)) } return nil } @@ -264,28 +351,12 @@ func (o *AuditWebhookOptions) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&o.ConfigFile, "audit-webhook-config-file", o.ConfigFile, "Path to a kubeconfig formatted file that defines the audit webhook configuration."+ " Requires the 'AdvancedAuditing' feature gate.") - fs.StringVar(&o.Mode, "audit-webhook-mode", o.Mode, - "Strategy for sending audit events. Blocking indicates sending events should block"+ - " server responses. Batch causes the webhook to buffer and send events"+ - " asynchronously. Known modes are "+strings.Join(pluginwebhook.AllowedModes, ",")+".") - fs.IntVar(&o.BatchConfig.BufferSize, "audit-webhook-batch-buffer-size", - o.BatchConfig.BufferSize, "The size of the buffer to store events before "+ - "batching and sending to the webhook. Only used in batch mode.") - fs.IntVar(&o.BatchConfig.MaxBatchSize, "audit-webhook-batch-max-size", - o.BatchConfig.MaxBatchSize, "The maximum size of a batch sent to the webhook. "+ - "Only used in batch mode.") - fs.DurationVar(&o.BatchConfig.MaxBatchWait, "audit-webhook-batch-max-wait", - o.BatchConfig.MaxBatchWait, "The amount of time to wait before force sending the "+ - "batch that hadn't reached the max size. Only used in batch mode.") - fs.Float32Var(&o.BatchConfig.ThrottleQPS, "audit-webhook-batch-throttle-qps", - o.BatchConfig.ThrottleQPS, "Maximum average number of requests per second. "+ - "Only used in batch mode.") - fs.IntVar(&o.BatchConfig.ThrottleBurst, "audit-webhook-batch-throttle-burst", - o.BatchConfig.ThrottleBurst, "Maximum number of requests sent at the same "+ - "moment if ThrottleQPS was not utilized before. Only used in batch mode.") - fs.DurationVar(&o.BatchConfig.InitialBackoff, "audit-webhook-batch-initial-backoff", - o.BatchConfig.InitialBackoff, "The amount of time to wait before retrying the "+ - "first failed requests. Only used in batch mode.") + fs.DurationVar(&o.InitialBackoff, "audit-webhook-initial-backoff", + o.InitialBackoff, "The amount of time to wait before retrying the first failed request.") + fs.DurationVar(&o.InitialBackoff, "audit-webhook-batch-initial-backoff", + o.InitialBackoff, "The amount of time to wait before retrying the first failed request.") + fs.MarkDeprecated("audit-webhook-batch-initial-backoff", + "Deprecated, use --audit-webhook-initial-backoff instead.") } func (o *AuditWebhookOptions) applyTo(c *server.Config) error { @@ -293,10 +364,10 @@ func (o *AuditWebhookOptions) applyTo(c *server.Config) error { return nil } - webhook, err := pluginwebhook.NewBackend(o.ConfigFile, o.Mode, auditv1beta1.SchemeGroupVersion, o.BatchConfig) + webhook, err := pluginwebhook.NewBackend(o.ConfigFile, auditv1beta1.SchemeGroupVersion) if err != nil { return fmt.Errorf("initializing audit webhook: %v", err) } - c.AuditBackend = appendBackend(c.AuditBackend, webhook) + c.AuditBackend = appendBackend(c.AuditBackend, o.BatchOptions.wrapBackend(webhook)) return nil } diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered.go index c3991bfef88..296cc654fb6 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered.go @@ -153,6 +153,12 @@ func (b *bufferedBackend) Shutdown() { <-b.shutdownCh // Wait until all sending routines exit. + // + // - When b.shutdownCh is closed, we know that the goroutine in Run has terminated. + // - This means that processIncomingEvents has terminated. + // - Which means that b.buffer is closed and cannot accept any new events anymore. + // - Because processEvents is called synchronously from the Run goroutine, the waitgroup has its final value. + // Hence wg.Wait will not miss any more outgoing batches. b.wg.Wait() b.delegateBackend.Shutdown() @@ -250,7 +256,7 @@ func (b *bufferedBackend) ProcessEvents(ev ...*auditinternal.Event) { // recover from. defer func() { if err := recover(); err != nil { - sendErr = fmt.Errorf("panic when processing events: %v", err) + sendErr = fmt.Errorf("audit backend shut down") } if sendErr != nil { audit.HandlePluginError(pluginName, sendErr, ev[evIndex:]...) diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/log/backend.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/log/backend.go index 7feb8f13089..8462b2b3196 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/log/backend.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/log/backend.go @@ -32,6 +32,9 @@ const ( FormatLegacy = "legacy" // FormatJson saves event in structured json format. FormatJson = "json" + + // PluginName is the name of this plugin, to be used in help and logs. + PluginName = "log" ) // AllowedFormats are the formats known by log backend. @@ -70,17 +73,17 @@ func (b *backend) logEvent(ev *auditinternal.Event) { case FormatJson: bs, err := runtime.Encode(audit.Codecs.LegacyCodec(b.groupVersion), ev) if err != nil { - audit.HandlePluginError("log", err, ev) + audit.HandlePluginError(PluginName, err, ev) return } line = string(bs[:]) default: - audit.HandlePluginError("log", fmt.Errorf("log format %q is not in list of known formats (%s)", + audit.HandlePluginError(PluginName, fmt.Errorf("log format %q is not in list of known formats (%s)", b.format, strings.Join(AllowedFormats, ",")), ev) return } if _, err := fmt.Fprint(b.out, line); err != nil { - audit.HandlePluginError("log", err, ev) + audit.HandlePluginError(PluginName, err, ev) } } diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/BUILD b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/BUILD index 0c56f284aaf..1fd9312ff91 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/BUILD +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/BUILD @@ -8,10 +8,7 @@ load( go_test( name = "go_default_test", - srcs = [ - "webhook_test.go", - "webhook_v1alpha1_test.go", - ], + srcs = ["webhook_test.go"], embed = [":go_default_library"], deps = [ "//vendor/github.com/stretchr/testify/assert:go_default_library", @@ -20,7 +17,6 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/serializer/json:go_default_library", "//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library", - "//vendor/k8s.io/apiserver/pkg/apis/audit/v1alpha1:go_default_library", "//vendor/k8s.io/apiserver/pkg/apis/audit/v1beta1:go_default_library", "//vendor/k8s.io/apiserver/pkg/audit:go_default_library", "//vendor/k8s.io/client-go/tools/clientcmd/api/v1:go_default_library", @@ -35,7 +31,6 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/apimachinery/announced:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apimachinery/registered:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library", "//vendor/k8s.io/apiserver/pkg/apis/audit/install:go_default_library", "//vendor/k8s.io/apiserver/pkg/apis/audit/v1alpha1:go_default_library", @@ -43,7 +38,6 @@ go_library( "//vendor/k8s.io/apiserver/pkg/audit:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/webhook:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", - "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go index 247f9444024..a5ce7285c58 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go @@ -18,16 +18,12 @@ limitations under the License. package webhook import ( - "errors" "fmt" - "strings" - "sync" "time" "k8s.io/apimachinery/pkg/apimachinery/announced" "k8s.io/apimachinery/pkg/apimachinery/registered" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/util/runtime" auditinternal "k8s.io/apiserver/pkg/apis/audit" "k8s.io/apiserver/pkg/apis/audit/install" auditv1alpha1 "k8s.io/apiserver/pkg/apis/audit/v1alpha1" @@ -35,91 +31,17 @@ import ( "k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/util/webhook" "k8s.io/client-go/rest" - "k8s.io/client-go/util/flowcontrol" ) const ( - // ModeBatch indicates that the webhook should buffer audit events - // internally, sending batch updates either once a certain number of - // events have been received or a certain amount of time has passed. - ModeBatch = "batch" - // ModeBlocking causes the webhook to block on every attempt to process - // a set of events. This causes requests to the API server to wait for a - // round trip to the external audit service before sending a response. - ModeBlocking = "blocking" + // PluginName is the name of this plugin, to be used in help and logs. + PluginName = "webhook" + + // DefaultInitialBackoff is the default amount of time to wait before + // retrying sending audit events through a webhook. + DefaultInitialBackoff = 10 * time.Second ) -// AllowedModes is the modes known by this webhook. -var AllowedModes = []string{ - ModeBatch, - ModeBlocking, -} - -const ( - // Default configuration values for ModeBatch. - defaultBatchBufferSize = 10000 // Buffer up to 10000 events before starting discarding. - defaultBatchMaxSize = 400 // Only send up to 400 events at a time. - defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute. - defaultInitialBackoff = 10 * time.Second // Wait at least 10 seconds before retrying. - - defaultBatchThrottleQPS = 10 // Limit the send rate by 10 QPS. - defaultBatchThrottleBurst = 15 // Allow up to 15 QPS burst. -) - -// The plugin name reported in error metrics. -const pluginName = "webhook" - -// BatchBackendConfig represents batching webhook audit backend configuration. -type BatchBackendConfig struct { - // BufferSize defines a size of the buffering queue. - BufferSize int - // MaxBatchSize defines maximum size of a batch. - MaxBatchSize int - // MaxBatchWait defines maximum amount of time to wait for MaxBatchSize - // events to be accumulated in the buffer before forcibly sending what's - // being accumulated. - MaxBatchWait time.Duration - - // ThrottleQPS defines the allowed rate of batches per second sent to the webhook. - ThrottleQPS float32 - // ThrottleBurst defines the maximum rate of batches per second sent to the webhook in case - // the capacity defined by ThrottleQPS was not utilized. - ThrottleBurst int - - // InitialBackoff defines the amount of time to wait before retrying the requests - // to the webhook for the first time. - InitialBackoff time.Duration -} - -// NewDefaultBatchBackendConfig returns new BatchBackendConfig objects populated by default values. -func NewDefaultBatchBackendConfig() BatchBackendConfig { - return BatchBackendConfig{ - BufferSize: defaultBatchBufferSize, - MaxBatchSize: defaultBatchMaxSize, - MaxBatchWait: defaultBatchMaxWait, - - ThrottleQPS: defaultBatchThrottleQPS, - ThrottleBurst: defaultBatchThrottleBurst, - - InitialBackoff: defaultInitialBackoff, - } -} - -// NewBackend returns an audit backend that sends events over HTTP to an external service. -// The mode indicates the caching behavior of the webhook. Either blocking (ModeBlocking) -// or buffered with batch POSTs (ModeBatch). -func NewBackend(kubeConfigFile string, mode string, groupVersion schema.GroupVersion, config BatchBackendConfig) (audit.Backend, error) { - switch mode { - case ModeBatch: - return newBatchWebhook(kubeConfigFile, groupVersion, config) - case ModeBlocking: - return newBlockingWebhook(kubeConfigFile, groupVersion) - default: - return nil, fmt.Errorf("webhook mode %q is not in list of known modes (%s)", - mode, strings.Join(AllowedModes, ",")) - } -} - var ( // NOTE: Copied from other webhook implementations // @@ -143,267 +65,39 @@ func loadWebhook(configFile string, groupVersion schema.GroupVersion, initialBac []schema.GroupVersion{groupVersion}, initialBackoff) } -func newBlockingWebhook(configFile string, groupVersion schema.GroupVersion) (*blockingBackend, error) { - w, err := loadWebhook(configFile, groupVersion, defaultInitialBackoff) - if err != nil { - return nil, err - } - return &blockingBackend{w}, nil -} - -type blockingBackend struct { +type backend struct { w *webhook.GenericWebhook } -func (b *blockingBackend) Run(stopCh <-chan struct{}) error { +// NewBackend returns an audit backend that sends events over HTTP to an external service. +func NewBackend(kubeConfigFile string, groupVersion schema.GroupVersion) (audit.Backend, error) { + w, err := loadWebhook(kubeConfigFile, groupVersion, DefaultInitialBackoff) + if err != nil { + return nil, err + } + return &backend{w}, nil +} + +func (b *backend) Run(stopCh <-chan struct{}) error { return nil } -func (b *blockingBackend) Shutdown() { +func (b *backend) Shutdown() { // nothing to do here } -func (b *blockingBackend) ProcessEvents(ev ...*auditinternal.Event) { +func (b *backend) ProcessEvents(ev ...*auditinternal.Event) { if err := b.processEvents(ev...); err != nil { - audit.HandlePluginError(pluginName, err, ev...) + audit.HandlePluginError(PluginName, err, ev...) } } -func (b *blockingBackend) processEvents(ev ...*auditinternal.Event) error { +func (b *backend) processEvents(ev ...*auditinternal.Event) error { var list auditinternal.EventList for _, e := range ev { list.Items = append(list.Items, *e) } - // NOTE: No exponential backoff because this is the blocking webhook - // mode. Any attempts to retry will block API server requests. - return b.w.RestClient.Post().Body(&list).Do().Error() -} - -func newBatchWebhook(configFile string, groupVersion schema.GroupVersion, config BatchBackendConfig) (*batchBackend, error) { - w, err := loadWebhook(configFile, groupVersion, config.InitialBackoff) - if err != nil { - return nil, err - } - - return &batchBackend{ - w: w, - buffer: make(chan *auditinternal.Event, config.BufferSize), - maxBatchSize: config.MaxBatchSize, - maxBatchWait: config.MaxBatchWait, - shutdownCh: make(chan struct{}), - throttle: flowcontrol.NewTokenBucketRateLimiter(config.ThrottleQPS, config.ThrottleBurst), - }, nil -} - -type batchBackend struct { - w *webhook.GenericWebhook - - // Channel to buffer events in memory before sending them on the webhook. - buffer chan *auditinternal.Event - // Maximum number of events that can be sent at once. - maxBatchSize int - // Amount of time to wait after sending events before force sending another set. - // - // Receiving maxBatchSize events will always trigger a send, regardless of - // if this amount of time has been reached. - maxBatchWait time.Duration - - // Channel to signal that the sending routine has stopped and therefore - // it's safe to assume that no new requests will be initiated. - shutdownCh chan struct{} - - // The sending routine locks reqMutex for reading before initiating a new - // goroutine to send a request. This goroutine then unlocks reqMutex for - // reading when completed. The Shutdown method locks reqMutex for writing - // after the sending routine has exited. When reqMutex is locked for writing, - // all requests have been completed and no new will be spawned, since the - // sending routine is not running anymore. - reqMutex sync.RWMutex - - // Limits the number of requests sent to the backend per second. - throttle flowcontrol.RateLimiter -} - -func (b *batchBackend) Run(stopCh <-chan struct{}) error { - go func() { - // Signal that the sending routine has exited. - defer close(b.shutdownCh) - - b.runSendingRoutine(stopCh) - - // Handle the events that were received after the last buffer - // scraping and before this line. Since the buffer is closed, no new - // events will come through. - for { - if last := func() bool { - // Recover from any panic in order to try to send all remaining events. - // Note, that in case of a panic, the return value will be false and - // the loop execution will continue. - defer runtime.HandleCrash() - - events := b.collectLastEvents() - b.sendBatchEvents(events) - return len(events) == 0 - }(); last { - break - } - } - }() - return nil -} - -func (b *batchBackend) Shutdown() { - <-b.shutdownCh - - // Write locking reqMutex will guarantee that all requests will be completed - // by the time the goroutine continues the execution. Since this line is - // executed after shutdownCh was closed, no new requests will follow this - // lock, because read lock is called in the same goroutine that closes - // shutdownCh before exiting. - b.reqMutex.Lock() - b.reqMutex.Unlock() -} - -// runSendingRoutine runs a loop that collects events from the buffer. When -// stopCh is closed, runSendingRoutine stops and closes the buffer. -func (b *batchBackend) runSendingRoutine(stopCh <-chan struct{}) { - defer close(b.buffer) - - for { - func() { - // Recover from any panics caused by this function so a panic in the - // goroutine can't bring down the main routine. - defer runtime.HandleCrash() - - t := time.NewTimer(b.maxBatchWait) - defer t.Stop() // Release ticker resources - - b.sendBatchEvents(b.collectEvents(stopCh, t.C)) - }() - - select { - case <-stopCh: - return - default: - } - } -} - -// collectEvents attempts to collect some number of events in a batch. -// -// The following things can cause collectEvents to stop and return the list -// of events: -// -// * Some maximum number of events are received. -// * Timer has passed, all queued events are sent. -// * StopCh is closed, all queued events are sent. -// -func (b *batchBackend) collectEvents(stopCh <-chan struct{}, timer <-chan time.Time) []auditinternal.Event { - var events []auditinternal.Event - -L: - for i := 0; i < b.maxBatchSize; i++ { - select { - case ev, ok := <-b.buffer: - // Buffer channel was closed and no new events will follow. - if !ok { - break L - } - events = append(events, *ev) - case <-timer: - // Timer has expired. Send whatever events are in the queue. - break L - case <-stopCh: - // Webhook has shut down. Send the last events. - break L - } - } - - return events -} - -// collectLastEvents assumes that the buffer was closed. It collects the first -// maxBatchSize events from the closed buffer into a batch and returns them. -func (b *batchBackend) collectLastEvents() []auditinternal.Event { - var events []auditinternal.Event - - for i := 0; i < b.maxBatchSize; i++ { - ev, ok := <-b.buffer - if !ok { - break - } - events = append(events, *ev) - } - - return events -} - -// sendBatchEvents sends a POST requests with the event list in a goroutine -// and logs any error encountered. -func (b *batchBackend) sendBatchEvents(events []auditinternal.Event) { - if len(events) == 0 { - return - } - - list := auditinternal.EventList{Items: events} - - if b.throttle != nil { - b.throttle.Accept() - } - - // Locking reqMutex for read will guarantee that the shutdown process will - // block until the goroutine started below is finished. At the same time, it - // will not prevent other batches from being proceed further this point. - b.reqMutex.RLock() - go func() { - // Execute the webhook POST in a goroutine to keep it from blocking. - // This lets the webhook continue to drain the queue immediately. - - defer b.reqMutex.RUnlock() - defer runtime.HandleCrash() - - err := b.w.WithExponentialBackoff(func() rest.Result { - return b.w.RestClient.Post().Body(&list).Do() - }).Error() - if err != nil { - impacted := make([]*auditinternal.Event, len(events)) - for i := range events { - impacted[i] = &events[i] - } - audit.HandlePluginError(pluginName, err, impacted...) - } - }() - return -} - -func (b *batchBackend) ProcessEvents(ev ...*auditinternal.Event) { - for i, e := range ev { - // Per the audit.Backend interface these events are reused after being - // sent to the Sink. Deep copy and send the copy to the queue. - event := e.DeepCopy() - - // The following mechanism is in place to support the situation when audit - // events are still coming after the backend was shut down. - var sendErr error - func() { - // If the backend was shut down and the buffer channel was closed, an - // attempt to add an event to it will result in panic that we should - // recover from. - defer func() { - if err := recover(); err != nil { - sendErr = errors.New("audit webhook shut down") - } - }() - - select { - case b.buffer <- event: - default: - sendErr = errors.New("audit webhook queue blocked") - } - }() - if sendErr != nil { - audit.HandlePluginError(pluginName, sendErr, ev[i:]...) - return - } - } + return b.w.WithExponentialBackoff(func() rest.Result { + return b.w.RestClient.Post().Body(&list).Do() + }).Error() } diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook_test.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook_test.go index fa616ff2ca7..95b0de3082d 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook_test.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook_test.go @@ -25,10 +25,7 @@ import ( "net/http/httptest" "os" "reflect" - "sync" - "sync/atomic" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -91,15 +88,7 @@ func (t *testWebhookHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusInternalServerError) } -func newTestBlockingWebhook(t *testing.T, endpoint string, groupVersion schema.GroupVersion) *blockingBackend { - return newWebhook(t, endpoint, ModeBlocking, groupVersion).(*blockingBackend) -} - -func newTestBatchWebhook(t *testing.T, endpoint string, groupVersion schema.GroupVersion) *batchBackend { - return newWebhook(t, endpoint, ModeBatch, groupVersion).(*batchBackend) -} - -func newWebhook(t *testing.T, endpoint string, mode string, groupVersion schema.GroupVersion) audit.Backend { +func newWebhook(t *testing.T, endpoint string, groupVersion schema.GroupVersion) *backend { config := v1.Config{ Clusters: []v1.NamedCluster{ {Cluster: v1.Cluster{Server: endpoint, InsecureSkipTLSVerify: true}}, @@ -116,10 +105,10 @@ func newWebhook(t *testing.T, endpoint string, mode string, groupVersion schema. // NOTE(ericchiang): Do we need to use a proper serializer? require.NoError(t, stdjson.NewEncoder(f).Encode(config), "writing kubeconfig") - backend, err := NewBackend(f.Name(), mode, groupVersion, NewDefaultBatchBackendConfig()) + b, err := NewBackend(f.Name(), groupVersion) require.NoError(t, err, "initializing backend") - return backend + return b.(*backend) } func TestWebhook(t *testing.T) { @@ -131,275 +120,9 @@ func TestWebhook(t *testing.T) { })) defer s.Close() - backend := newTestBlockingWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion) + backend := newWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion) // Ensure this doesn't return a serialization error. event := &auditinternal.Event{} require.NoError(t, backend.processEvents(event), "failed to send events") } - -// waitForEmptyBuffer indicates when the sendBatchEvents method has read from the -// existing buffer. This lets test coordinate closing a timer and stop channel -// until the for loop has read from the buffer. -func waitForEmptyBuffer(b *batchBackend) { - for len(b.buffer) != 0 { - time.Sleep(time.Millisecond) - } -} - -func TestBatchWebhookMaxEvents(t *testing.T) { - nRest := 10 - events := make([]*auditinternal.Event, defaultBatchMaxSize+nRest) // greater than max size. - for i := range events { - events[i] = &auditinternal.Event{} - } - - got := make(chan int, 2) - s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) { - got <- len(events.(*auditv1beta1.EventList).Items) - })) - defer s.Close() - - backend := newTestBatchWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion) - - backend.ProcessEvents(events...) - - stopCh := make(chan struct{}) - timer := make(chan time.Time, 1) - - backend.sendBatchEvents(backend.collectEvents(stopCh, timer)) - require.Equal(t, defaultBatchMaxSize, <-got, "did not get batch max size") - - go func() { - waitForEmptyBuffer(backend) // wait for the buffer to empty - timer <- time.Now() // Trigger the wait timeout - }() - - backend.sendBatchEvents(backend.collectEvents(stopCh, timer)) - require.Equal(t, nRest, <-got, "failed to get the rest of the events") -} - -func TestBatchWebhookStopCh(t *testing.T) { - events := make([]*auditinternal.Event, 1) // less than max size. - for i := range events { - events[i] = &auditinternal.Event{} - } - - expected := len(events) - got := make(chan int, 2) - s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) { - got <- len(events.(*auditv1beta1.EventList).Items) - })) - defer s.Close() - - backend := newTestBatchWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion) - backend.ProcessEvents(events...) - - stopCh := make(chan struct{}) - timer := make(chan time.Time) - - go func() { - waitForEmptyBuffer(backend) - close(stopCh) // stop channel has stopped - }() - backend.sendBatchEvents(backend.collectEvents(stopCh, timer)) - require.Equal(t, expected, <-got, "get queued events after timer expires") -} - -func TestBatchWebhookProcessEventsAfterStop(t *testing.T) { - events := make([]*auditinternal.Event, 1) // less than max size. - for i := range events { - events[i] = &auditinternal.Event{} - } - - got := make(chan struct{}) - s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) { - close(got) - })) - defer s.Close() - - backend := newTestBatchWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion) - stopCh := make(chan struct{}) - - backend.Run(stopCh) - close(stopCh) - <-backend.shutdownCh - backend.ProcessEvents(events...) - assert.Equal(t, 0, len(backend.buffer), "processed events after the backed has been stopped") -} - -func TestBatchWebhookShutdown(t *testing.T) { - events := make([]*auditinternal.Event, 1) - for i := range events { - events[i] = &auditinternal.Event{} - } - - got := make(chan struct{}) - contReqCh := make(chan struct{}) - shutdownCh := make(chan struct{}) - s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) { - close(got) - <-contReqCh - })) - defer s.Close() - - backend := newTestBatchWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion) - backend.ProcessEvents(events...) - - go func() { - // Assume stopCh was closed. - close(backend.buffer) - backend.sendBatchEvents(backend.collectLastEvents()) - }() - - <-got - - go func() { - close(backend.shutdownCh) - backend.Shutdown() - close(shutdownCh) - }() - - // Wait for some time in case there's a bug that allows for the Shutdown - // method to exit before all requests has been completed. - time.Sleep(1 * time.Second) - select { - case <-shutdownCh: - t.Fatal("Backend shut down before all requests finished") - default: - // Continue. - } - - close(contReqCh) - <-shutdownCh -} - -func TestBatchWebhookEmptyBuffer(t *testing.T) { - events := make([]*auditinternal.Event, 1) // less than max size. - for i := range events { - events[i] = &auditinternal.Event{} - } - - expected := len(events) - got := make(chan int, 2) - s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) { - got <- len(events.(*auditv1beta1.EventList).Items) - })) - defer s.Close() - - backend := newTestBatchWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion) - - stopCh := make(chan struct{}) - timer := make(chan time.Time, 1) - - timer <- time.Now() // Timer is done. - - // Buffer is empty, no events have been queued. This should exit but send no events. - backend.sendBatchEvents(backend.collectEvents(stopCh, timer)) - - // Send additional events after the sendBatchEvents has been called. - backend.ProcessEvents(events...) - go func() { - waitForEmptyBuffer(backend) - timer <- time.Now() - }() - - backend.sendBatchEvents(backend.collectEvents(stopCh, timer)) - - // Make sure we didn't get a POST with zero events. - require.Equal(t, expected, <-got, "expected one event") -} - -func TestBatchBufferFull(t *testing.T) { - events := make([]*auditinternal.Event, defaultBatchBufferSize+1) // More than buffered size - for i := range events { - events[i] = &auditinternal.Event{} - } - s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) { - // Do nothing. - })) - defer s.Close() - - backend := newTestBatchWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion) - - // Make sure this doesn't block. - backend.ProcessEvents(events...) -} - -func TestBatchRun(t *testing.T) { - - // Divisable by max batch size so we don't have to wait for a minute for - // the test to finish. - events := make([]*auditinternal.Event, defaultBatchMaxSize*3) - for i := range events { - events[i] = &auditinternal.Event{} - } - - got := new(int64) - want := len(events) - - wg := new(sync.WaitGroup) - wg.Add(want) - done := make(chan struct{}) - - go func() { - wg.Wait() - // When the expected number of events have been received, close the channel. - close(done) - }() - - s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(obj runtime.Object) { - events := obj.(*auditv1beta1.EventList) - atomic.AddInt64(got, int64(len(events.Items))) - wg.Add(-len(events.Items)) - })) - defer s.Close() - - stopCh := make(chan struct{}) - defer close(stopCh) - - backend := newTestBatchWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion) - - // Test the Run codepath. E.g. that the spawned goroutines behave correctly. - backend.Run(stopCh) - - backend.ProcessEvents(events...) - - select { - case <-done: - // Received all the events. - case <-time.After(2 * time.Minute): - t.Errorf("expected %d events got %d", want, atomic.LoadInt64(got)) - } -} - -func TestBatchConcurrentRequests(t *testing.T) { - events := make([]*auditinternal.Event, defaultBatchBufferSize) // Don't drop events - for i := range events { - events[i] = &auditinternal.Event{} - } - - wg := new(sync.WaitGroup) - wg.Add(len(events)) - - s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) { - wg.Add(-len(events.(*auditv1beta1.EventList).Items)) - - // Since the webhook makes concurrent requests, blocking on the webhook response - // shouldn't block the webhook from sending more events. - // - // Wait for all responses to be received before sending the response. - wg.Wait() - })) - defer s.Close() - - stopCh := make(chan struct{}) - defer close(stopCh) - - backend := newTestBatchWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion) - backend.Run(stopCh) - - backend.ProcessEvents(events...) - // Wait for the webhook to receive all events. - wg.Wait() -} diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook_v1alpha1_test.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook_v1alpha1_test.go deleted file mode 100644 index 6b6a1a89534..00000000000 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook_v1alpha1_test.go +++ /dev/null @@ -1,289 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package webhook - -import ( - "net/http/httptest" - "sync" - "sync/atomic" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "k8s.io/apimachinery/pkg/runtime" - auditinternal "k8s.io/apiserver/pkg/apis/audit" - auditv1alpha1 "k8s.io/apiserver/pkg/apis/audit/v1alpha1" -) - -func TestBatchWebhookMaxEventsV1Alpha1(t *testing.T) { - nRest := 10 - events := make([]*auditinternal.Event, defaultBatchMaxSize+nRest) // greater than max size. - for i := range events { - events[i] = &auditinternal.Event{} - } - - got := make(chan int, 2) - s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) { - got <- len(events.(*auditv1alpha1.EventList).Items) - })) - defer s.Close() - - backend := newTestBatchWebhook(t, s.URL, auditv1alpha1.SchemeGroupVersion) - - backend.ProcessEvents(events...) - - stopCh := make(chan struct{}) - timer := make(chan time.Time, 1) - - backend.sendBatchEvents(backend.collectEvents(stopCh, timer)) - require.Equal(t, defaultBatchMaxSize, <-got, "did not get batch max size") - - go func() { - waitForEmptyBuffer(backend) // wait for the buffer to empty - timer <- time.Now() // Trigger the wait timeout - }() - - backend.sendBatchEvents(backend.collectEvents(stopCh, timer)) - require.Equal(t, nRest, <-got, "failed to get the rest of the events") -} - -func TestBatchWebhookStopChV1Alpha1(t *testing.T) { - events := make([]*auditinternal.Event, 1) // less than max size. - for i := range events { - events[i] = &auditinternal.Event{} - } - - expected := len(events) - got := make(chan int, 2) - s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) { - got <- len(events.(*auditv1alpha1.EventList).Items) - })) - defer s.Close() - - backend := newTestBatchWebhook(t, s.URL, auditv1alpha1.SchemeGroupVersion) - backend.ProcessEvents(events...) - - stopCh := make(chan struct{}) - timer := make(chan time.Time) - - go func() { - waitForEmptyBuffer(backend) - close(stopCh) // stop channel has stopped - }() - backend.sendBatchEvents(backend.collectEvents(stopCh, timer)) - require.Equal(t, expected, <-got, "get queued events after timer expires") -} - -func TestBatchWebhookProcessEventsAfterStopV1Alpha1(t *testing.T) { - events := make([]*auditinternal.Event, 1) // less than max size. - for i := range events { - events[i] = &auditinternal.Event{} - } - - got := make(chan struct{}) - s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) { - close(got) - })) - defer s.Close() - - backend := newTestBatchWebhook(t, s.URL, auditv1alpha1.SchemeGroupVersion) - stopCh := make(chan struct{}) - - backend.Run(stopCh) - close(stopCh) - <-backend.shutdownCh - backend.ProcessEvents(events...) - assert.Equal(t, 0, len(backend.buffer), "processed events after the backed has been stopped") -} - -func TestBatchWebhookShutdownV1Alpha1(t *testing.T) { - events := make([]*auditinternal.Event, 1) - for i := range events { - events[i] = &auditinternal.Event{} - } - - got := make(chan struct{}) - contReqCh := make(chan struct{}) - shutdownCh := make(chan struct{}) - s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) { - close(got) - <-contReqCh - })) - defer s.Close() - - backend := newTestBatchWebhook(t, s.URL, auditv1alpha1.SchemeGroupVersion) - backend.ProcessEvents(events...) - - go func() { - // Assume stopCh was closed. - close(backend.buffer) - backend.sendBatchEvents(backend.collectLastEvents()) - }() - - <-got - - go func() { - close(backend.shutdownCh) - backend.Shutdown() - close(shutdownCh) - }() - - // Wait for some time in case there's a bug that allows for the Shutdown - // method to exit before all requests has been completed. - time.Sleep(1 * time.Second) - select { - case <-shutdownCh: - t.Fatal("Backend shut down before all requests finished") - default: - // Continue. - } - - close(contReqCh) - <-shutdownCh -} - -func TestBatchWebhookEmptyBufferV1Alpha1(t *testing.T) { - events := make([]*auditinternal.Event, 1) // less than max size. - for i := range events { - events[i] = &auditinternal.Event{} - } - - expected := len(events) - got := make(chan int, 2) - s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) { - got <- len(events.(*auditv1alpha1.EventList).Items) - })) - defer s.Close() - - backend := newTestBatchWebhook(t, s.URL, auditv1alpha1.SchemeGroupVersion) - - stopCh := make(chan struct{}) - timer := make(chan time.Time, 1) - - timer <- time.Now() // Timer is done. - - // Buffer is empty, no events have been queued. This should exit but send no events. - backend.sendBatchEvents(backend.collectEvents(stopCh, timer)) - - // Send additional events after the sendBatchEvents has been called. - backend.ProcessEvents(events...) - go func() { - waitForEmptyBuffer(backend) - timer <- time.Now() - }() - - backend.sendBatchEvents(backend.collectEvents(stopCh, timer)) - - // Make sure we didn't get a POST with zero events. - require.Equal(t, expected, <-got, "expected one event") -} - -func TestBatchBufferFullV1Alpha1(t *testing.T) { - events := make([]*auditinternal.Event, defaultBatchBufferSize+1) // More than buffered size - for i := range events { - events[i] = &auditinternal.Event{} - } - s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) { - // Do nothing. - })) - defer s.Close() - - backend := newTestBatchWebhook(t, s.URL, auditv1alpha1.SchemeGroupVersion) - - // Make sure this doesn't block. - backend.ProcessEvents(events...) -} - -func TestBatchRunV1Alpha1(t *testing.T) { - - // Divisable by max batch size so we don't have to wait for a minute for - // the test to finish. - events := make([]*auditinternal.Event, defaultBatchMaxSize*3) - for i := range events { - events[i] = &auditinternal.Event{} - } - - got := new(int64) - want := len(events) - - wg := new(sync.WaitGroup) - wg.Add(want) - done := make(chan struct{}) - - go func() { - wg.Wait() - // When the expected number of events have been received, close the channel. - close(done) - }() - - s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(obj runtime.Object) { - events := obj.(*auditv1alpha1.EventList) - atomic.AddInt64(got, int64(len(events.Items))) - wg.Add(-len(events.Items)) - })) - defer s.Close() - - stopCh := make(chan struct{}) - defer close(stopCh) - - backend := newTestBatchWebhook(t, s.URL, auditv1alpha1.SchemeGroupVersion) - - // Test the Run codepath. E.g. that the spawned goroutines behave correctly. - backend.Run(stopCh) - - backend.ProcessEvents(events...) - - select { - case <-done: - // Received all the events. - case <-time.After(2 * time.Minute): - t.Errorf("expected %d events got %d", want, atomic.LoadInt64(got)) - } -} - -func TestBatchConcurrentRequestsV1Alpha1(t *testing.T) { - events := make([]*auditinternal.Event, defaultBatchBufferSize) // Don't drop events - for i := range events { - events[i] = &auditinternal.Event{} - } - - wg := new(sync.WaitGroup) - wg.Add(len(events)) - - s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) { - wg.Add(-len(events.(*auditv1alpha1.EventList).Items)) - - // Since the webhook makes concurrent requests, blocking on the webhook response - // shouldn't block the webhook from sending more events. - // - // Wait for all responses to be received before sending the response. - wg.Wait() - })) - defer s.Close() - - stopCh := make(chan struct{}) - defer close(stopCh) - - backend := newTestBatchWebhook(t, s.URL, auditv1alpha1.SchemeGroupVersion) - backend.Run(stopCh) - - backend.ProcessEvents(events...) - // Wait for the webhook to receive all events. - wg.Wait() -} diff --git a/staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json b/staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json index 4ee479033f5..7d720d5d74b 100644 --- a/staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json +++ b/staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json @@ -1142,6 +1142,10 @@ "ImportPath": "k8s.io/apiserver/pkg/util/wsstream", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" }, + { + "ImportPath": "k8s.io/apiserver/plugin/pkg/audit/buffered", + "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + }, { "ImportPath": "k8s.io/apiserver/plugin/pkg/audit/log", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" diff --git a/staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json b/staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json index a88b4c56c6d..83357405ade 100644 --- a/staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json +++ b/staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json @@ -1110,6 +1110,10 @@ "ImportPath": "k8s.io/apiserver/pkg/util/wsstream", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" }, + { + "ImportPath": "k8s.io/apiserver/plugin/pkg/audit/buffered", + "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + }, { "ImportPath": "k8s.io/apiserver/plugin/pkg/audit/log", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"