From 7e717ef3a6a57d31251ccee94d9e2dd29a70c27b Mon Sep 17 00:00:00 2001 From: Mik Vyatskov Date: Thu, 30 Nov 2017 18:47:48 +0100 Subject: [PATCH] Make audit batch webhook backend configurable Signed-off-by: Mik Vyatskov --- cmd/kube-apiserver/app/options/BUILD | 1 + .../app/options/options_test.go | 15 +++++ .../apiserver/pkg/server/options/audit.go | 47 ++++++++++++-- .../plugin/pkg/audit/webhook/webhook.go | 61 ++++++++++++++----- .../plugin/pkg/audit/webhook/webhook_test.go | 2 +- 5 files changed, 106 insertions(+), 20 deletions(-) diff --git a/cmd/kube-apiserver/app/options/BUILD b/cmd/kube-apiserver/app/options/BUILD index 7c46be368bf..96087a582eb 100644 --- a/cmd/kube-apiserver/app/options/BUILD +++ b/cmd/kube-apiserver/app/options/BUILD @@ -74,6 +74,7 @@ go_test( "//vendor/k8s.io/apiserver/pkg/server/options:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library", + "//vendor/k8s.io/apiserver/plugin/pkg/audit/webhook:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", ], ) diff --git a/cmd/kube-apiserver/app/options/options_test.go b/cmd/kube-apiserver/app/options/options_test.go index 716398a4e63..30979d10524 100644 --- a/cmd/kube-apiserver/app/options/options_test.go +++ b/cmd/kube-apiserver/app/options/options_test.go @@ -28,6 +28,7 @@ import ( apiserveroptions "k8s.io/apiserver/pkg/server/options" "k8s.io/apiserver/pkg/storage/storagebackend" utilconfig "k8s.io/apiserver/pkg/util/flag" + auditwebhook "k8s.io/apiserver/plugin/pkg/audit/webhook" restclient "k8s.io/client-go/rest" "k8s.io/kubernetes/pkg/api/legacyscheme" kapi "k8s.io/kubernetes/pkg/apis/core" @@ -55,6 +56,12 @@ func TestAddFlags(t *testing.T) { "--audit-policy-file=/policy", "--audit-webhook-config-file=/webhook-config", "--audit-webhook-mode=blocking", + "--audit-webhook-batch-buffer-size=42", + "--audit-webhook-batch-max-size=43", + "--audit-webhook-batch-max-wait=1s", + "--audit-webhook-batch-throttle-qps=43.5", + "--audit-webhook-batch-throttle-burst=44", + "--audit-webhook-batch-initial-backoff=2s", "--authentication-token-webhook-cache-ttl=3m", "--authentication-token-webhook-config-file=/token-webhook-config", "--authorization-mode=AlwaysDeny", @@ -170,6 +177,14 @@ func TestAddFlags(t *testing.T) { WebhookOptions: apiserveroptions.AuditWebhookOptions{ Mode: "blocking", ConfigFile: "/webhook-config", + BatchConfig: auditwebhook.BatchBackendConfig{ + BufferSize: 42, + MaxBatchSize: 43, + MaxBatchWait: 1 * time.Second, + ThrottleQPS: 43.5, + ThrottleBurst: 44, + InitialBackoff: 2 * time.Second, + }, }, PolicyFile: "/policy", }, diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/audit.go b/staging/src/k8s.io/apiserver/pkg/server/options/audit.go index 833e91e5307..e43ebb12bc9 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/audit.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/audit.go @@ -77,12 +77,17 @@ type AuditWebhookOptions struct { // // Defaults to asynchronous batch events. Mode string + // Configuration for batching webhook. Only used in batch mode. + BatchConfig pluginwebhook.BatchBackendConfig } func NewAuditOptions() *AuditOptions { return &AuditOptions{ - WebhookOptions: AuditWebhookOptions{Mode: pluginwebhook.ModeBatch}, - LogOptions: AuditLogOptions{Format: pluginlog.FormatJson}, + WebhookOptions: AuditWebhookOptions{ + Mode: pluginwebhook.ModeBatch, + BatchConfig: pluginwebhook.NewDefaultBatchBackendConfig(), + }, + LogOptions: AuditLogOptions{Format: pluginlog.FormatJson}, } } @@ -102,7 +107,7 @@ func (o *AuditOptions) Validate() []error { allErrors = append(allErrors, fmt.Errorf("feature '%s' must be enabled to set option --audit-webhook-config-file", features.AdvancedAuditing)) } } else { - // check webhook mode + // Check webhook mode validMode := false for _, m := range pluginwebhook.AllowedModes { if m == o.WebhookOptions.Mode { @@ -114,7 +119,21 @@ func (o *AuditOptions) Validate() []error { allErrors = append(allErrors, fmt.Errorf("invalid audit webhook mode %s, allowed modes are %q", o.WebhookOptions.Mode, strings.Join(pluginwebhook.AllowedModes, ","))) } - // check log format + // Check webhook batch configuration + if o.WebhookOptions.BatchConfig.BufferSize <= 0 { + allErrors = append(allErrors, fmt.Errorf("invalid audit batch webhook buffer size %v, must be a positive number", o.WebhookOptions.BatchConfig.BufferSize)) + } + if o.WebhookOptions.BatchConfig.MaxBatchSize <= 0 { + allErrors = append(allErrors, fmt.Errorf("invalid audit batch webhook max batch size %v, must be a positive number", o.WebhookOptions.BatchConfig.MaxBatchSize)) + } + if o.WebhookOptions.BatchConfig.ThrottleQPS <= 0 { + allErrors = append(allErrors, fmt.Errorf("invalid audit batch webhook throttle QPS %v, must be a positive number", o.WebhookOptions.BatchConfig.ThrottleQPS)) + } + if o.WebhookOptions.BatchConfig.ThrottleBurst <= 0 { + allErrors = append(allErrors, fmt.Errorf("invalid audit batch webhook throttle burst %v, must be a positive number", o.WebhookOptions.BatchConfig.ThrottleBurst)) + } + + // Check log format validFormat := false for _, f := range pluginlog.AllowedFormats { if f == o.LogOptions.Format { @@ -249,6 +268,24 @@ func (o *AuditWebhookOptions) AddFlags(fs *pflag.FlagSet) { "Strategy for sending audit events. Blocking indicates sending events should block"+ " server responses. Batch causes the webhook to buffer and send events"+ " asynchronously. Known modes are "+strings.Join(pluginwebhook.AllowedModes, ",")+".") + fs.IntVar(&o.BatchConfig.BufferSize, "audit-webhook-batch-buffer-size", + o.BatchConfig.BufferSize, "The size of the buffer to store events before "+ + "batching and sending to the webhook. Only used in batch mode.") + fs.IntVar(&o.BatchConfig.MaxBatchSize, "audit-webhook-batch-max-size", + o.BatchConfig.MaxBatchSize, "The maximum size of a batch sent to the webhook. "+ + "Only used in batch mode.") + fs.DurationVar(&o.BatchConfig.MaxBatchWait, "audit-webhook-batch-max-wait", + o.BatchConfig.MaxBatchWait, "The amount of time to wait before force sending the "+ + "batch that hadn't reached the max size. Only used in batch mode.") + fs.Float32Var(&o.BatchConfig.ThrottleQPS, "audit-webhook-batch-throttle-qps", + o.BatchConfig.ThrottleQPS, "Maximum average number of requests per second. "+ + "Only used in batch mode.") + fs.IntVar(&o.BatchConfig.ThrottleBurst, "audit-webhook-batch-throttle-burst", + o.BatchConfig.ThrottleBurst, "Maximum number of requests sent at the same "+ + "moment if ThrottleQPS was not utilized before. Only used in batch mode.") + fs.DurationVar(&o.BatchConfig.InitialBackoff, "audit-webhook-batch-initial-backoff", + o.BatchConfig.InitialBackoff, "The amount of time to wait before retrying the "+ + "first failed requests. Only used in batch mode.") } func (o *AuditWebhookOptions) applyTo(c *server.Config) error { @@ -256,7 +293,7 @@ func (o *AuditWebhookOptions) applyTo(c *server.Config) error { return nil } - webhook, err := pluginwebhook.NewBackend(o.ConfigFile, o.Mode, auditv1beta1.SchemeGroupVersion) + webhook, err := pluginwebhook.NewBackend(o.ConfigFile, o.Mode, auditv1beta1.SchemeGroupVersion, o.BatchConfig) if err != nil { return fmt.Errorf("initializing audit webhook: %v", err) } diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go index 41d8c0ed92a..7f390ac57fb 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go @@ -57,9 +57,6 @@ var AllowedModes = []string{ const ( // Default configuration values for ModeBatch. - // - // TODO(ericchiang): Make these value configurable. Maybe through a - // kubeconfig extension? defaultBatchBufferSize = 10000 // Buffer up to 10000 events before starting discarding. defaultBatchMaxSize = 400 // Only send up to 400 events at a time. defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute. @@ -72,13 +69,49 @@ const ( // The plugin name reported in error metrics. const pluginName = "webhook" +// BatchBackendConfig represents batching webhook audit backend configuration. +type BatchBackendConfig struct { + // BufferSize defines a size of the buffering queue. + BufferSize int + // MaxBatchSize defines maximum size of a batch. + MaxBatchSize int + // MaxBatchWait defines maximum amount of time to wait for MaxBatchSize + // events to be accumulated in the buffer before forcibly sending what's + // being accumulated. + MaxBatchWait time.Duration + + // ThrottleQPS defines the allowed rate of batches per second sent to the webhook. + ThrottleQPS float32 + // ThrottleBurst defines the maximum rate of batches per second sent to the webhook in case + // the capacity defined by ThrottleQPS was not utilized. + ThrottleBurst int + + // InitialBackoff defines the amount of time to wait before retrying the requests + // to the webhook for the first time. + InitialBackoff time.Duration +} + +// NewDefaultBatchBackendConfig returns new BatchBackendConfig objects populated by default values. +func NewDefaultBatchBackendConfig() BatchBackendConfig { + return BatchBackendConfig{ + BufferSize: defaultBatchBufferSize, + MaxBatchSize: defaultBatchMaxSize, + MaxBatchWait: defaultBatchMaxWait, + + ThrottleQPS: defaultBatchThrottleQPS, + ThrottleBurst: defaultBatchThrottleBurst, + + InitialBackoff: defaultInitialBackoff, + } +} + // NewBackend returns an audit backend that sends events over HTTP to an external service. // The mode indicates the caching behavior of the webhook. Either blocking (ModeBlocking) // or buffered with batch POSTs (ModeBatch). -func NewBackend(kubeConfigFile string, mode string, groupVersion schema.GroupVersion) (audit.Backend, error) { +func NewBackend(kubeConfigFile string, mode string, groupVersion schema.GroupVersion, config BatchBackendConfig) (audit.Backend, error) { switch mode { case ModeBatch: - return newBatchWebhook(kubeConfigFile, groupVersion) + return newBatchWebhook(kubeConfigFile, groupVersion, config) case ModeBlocking: return newBlockingWebhook(kubeConfigFile, groupVersion) default: @@ -105,13 +138,13 @@ func init() { install.Install(groupFactoryRegistry, registry, audit.Scheme) } -func loadWebhook(configFile string, groupVersion schema.GroupVersion) (*webhook.GenericWebhook, error) { +func loadWebhook(configFile string, groupVersion schema.GroupVersion, initialBackoff time.Duration) (*webhook.GenericWebhook, error) { return webhook.NewGenericWebhook(registry, audit.Codecs, configFile, - []schema.GroupVersion{groupVersion}, defaultInitialBackoff) + []schema.GroupVersion{groupVersion}, initialBackoff) } func newBlockingWebhook(configFile string, groupVersion schema.GroupVersion) (*blockingBackend, error) { - w, err := loadWebhook(configFile, groupVersion) + w, err := loadWebhook(configFile, groupVersion, defaultInitialBackoff) if err != nil { return nil, err } @@ -146,19 +179,19 @@ func (b *blockingBackend) processEvents(ev ...*auditinternal.Event) error { return b.w.RestClient.Post().Body(&list).Do().Error() } -func newBatchWebhook(configFile string, groupVersion schema.GroupVersion) (*batchBackend, error) { - w, err := loadWebhook(configFile, groupVersion) +func newBatchWebhook(configFile string, groupVersion schema.GroupVersion, config BatchBackendConfig) (*batchBackend, error) { + w, err := loadWebhook(configFile, groupVersion, config.InitialBackoff) if err != nil { return nil, err } return &batchBackend{ w: w, - buffer: make(chan *auditinternal.Event, defaultBatchBufferSize), - maxBatchSize: defaultBatchMaxSize, - maxBatchWait: defaultBatchMaxWait, + buffer: make(chan *auditinternal.Event, config.BufferSize), + maxBatchSize: config.MaxBatchSize, + maxBatchWait: config.MaxBatchWait, shutdownCh: make(chan struct{}), - throttle: flowcontrol.NewTokenBucketRateLimiter(defaultBatchThrottleQPS, defaultBatchThrottleBurst), + throttle: flowcontrol.NewTokenBucketRateLimiter(config.ThrottleQPS, config.ThrottleBurst), }, nil } diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook_test.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook_test.go index 1b7f3df24ed..62c09fb3557 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook_test.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook_test.go @@ -116,7 +116,7 @@ func newWebhook(t *testing.T, endpoint string, mode string, groupVersion schema. // NOTE(ericchiang): Do we need to use a proper serializer? require.NoError(t, stdjson.NewEncoder(f).Encode(config), "writing kubeconfig") - backend, err := NewBackend(f.Name(), mode, groupVersion) + backend, err := NewBackend(f.Name(), mode, groupVersion, NewDefaultBatchBackendConfig()) require.NoError(t, err, "initializing backend") return backend