From 5f4ff9f28341d58a4a905a0e86742aa6c90e81bf Mon Sep 17 00:00:00 2001 From: Mik Vyatskov Date: Thu, 5 Oct 2017 23:18:55 +0200 Subject: [PATCH 1/2] Adjust defaults of audit webhook backends Signed-off-by: Mik Vyatskov --- .../apiserver/plugin/pkg/audit/webhook/BUILD | 1 + .../plugin/pkg/audit/webhook/webhook.go | 17 ++++++++++------- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/BUILD b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/BUILD index fd43d807e7a..dfd5e16ce04 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/BUILD +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/BUILD @@ -41,6 +41,7 @@ go_library( "//vendor/k8s.io/apiserver/pkg/apis/audit/v1beta1:go_default_library", "//vendor/k8s.io/apiserver/pkg/audit:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/webhook:go_default_library", + "//vendor/k8s.io/client-go/rest:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go index bcec271b500..4f8190b84d9 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go @@ -34,6 +34,7 @@ import ( auditv1beta1 "k8s.io/apiserver/pkg/apis/audit/v1beta1" "k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/util/webhook" + "k8s.io/client-go/rest" ) const ( @@ -58,9 +59,10 @@ const ( // // TODO(ericchiang): Make these value configurable. Maybe through a // kubeconfig extension? - defaultBatchBufferSize = 1000 // Buffer up to 1000 events before blocking. - defaultBatchMaxSize = 100 // Only send 100 events at a time. - defaultBatchMaxWait = time.Minute // Send events at least once a minute. + defaultBatchBufferSize = 10000 // Buffer up to 10000 events before starting discarding. + defaultBatchMaxSize = 400 // Only send up to 400 events at a time. + defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute. + defaultInitialBackoff = 10 * time.Second // Wait at least 10 seconds before retrying. ) // The plugin name reported in error metrics. @@ -100,7 +102,8 @@ func init() { } func loadWebhook(configFile string, groupVersion schema.GroupVersion) (*webhook.GenericWebhook, error) { - return webhook.NewGenericWebhook(registry, audit.Codecs, configFile, []schema.GroupVersion{groupVersion}, 0) + return webhook.NewGenericWebhook(registry, audit.Codecs, configFile, + []schema.GroupVersion{groupVersion}, defaultInitialBackoff) } func newBlockingWebhook(configFile string, groupVersion schema.GroupVersion) (*blockingBackend, error) { @@ -314,9 +317,9 @@ func (b *batchBackend) sendBatchEvents(events []auditinternal.Event) { defer b.reqMutex.RUnlock() defer runtime.HandleCrash() - err := webhook.WithExponentialBackoff(0, func() error { - return b.w.RestClient.Post().Body(&list).Do().Error() - }) + err := b.w.WithExponentialBackoff(func() rest.Result { + return b.w.RestClient.Post().Body(&list).Do() + }).Error() if err != nil { impacted := make([]*auditinternal.Event, len(events)) for i := range events { From 6bce120a11782caad7ea477aaaafe3ba31f797d1 Mon Sep 17 00:00:00 2001 From: Mik Vyatskov Date: Thu, 5 Oct 2017 23:19:45 +0200 Subject: [PATCH 2/2] Add throttling to the batching audit webhook Signed-off-by: Mik Vyatskov --- staging/src/k8s.io/apiserver/Godeps/Godeps.json | 4 ++++ .../k8s.io/apiserver/plugin/pkg/audit/webhook/BUILD | 1 + .../apiserver/plugin/pkg/audit/webhook/webhook.go | 12 ++++++++++++ 3 files changed, 17 insertions(+) diff --git a/staging/src/k8s.io/apiserver/Godeps/Godeps.json b/staging/src/k8s.io/apiserver/Godeps/Godeps.json index a299a71a677..22020cde375 100644 --- a/staging/src/k8s.io/apiserver/Godeps/Godeps.json +++ b/staging/src/k8s.io/apiserver/Godeps/Godeps.json @@ -1661,6 +1661,10 @@ { "ImportPath": "k8s.io/client-go/util/cert", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + }, + { + "ImportPath": "k8s.io/client-go/util/flowcontrol", + "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" } ] } diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/BUILD b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/BUILD index dfd5e16ce04..fcee573ce32 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/BUILD +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/BUILD @@ -42,6 +42,7 @@ go_library( "//vendor/k8s.io/apiserver/pkg/audit:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/webhook:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", + "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go index 4f8190b84d9..41d8c0ed92a 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go @@ -35,6 +35,7 @@ import ( "k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/util/webhook" "k8s.io/client-go/rest" + "k8s.io/client-go/util/flowcontrol" ) const ( @@ -63,6 +64,9 @@ const ( defaultBatchMaxSize = 400 // Only send up to 400 events at a time. defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute. defaultInitialBackoff = 10 * time.Second // Wait at least 10 seconds before retrying. + + defaultBatchThrottleQPS = 10 // Limit the send rate by 10 QPS. + defaultBatchThrottleBurst = 15 // Allow up to 15 QPS burst. ) // The plugin name reported in error metrics. @@ -154,6 +158,7 @@ func newBatchWebhook(configFile string, groupVersion schema.GroupVersion) (*batc maxBatchSize: defaultBatchMaxSize, maxBatchWait: defaultBatchMaxWait, shutdownCh: make(chan struct{}), + throttle: flowcontrol.NewTokenBucketRateLimiter(defaultBatchThrottleQPS, defaultBatchThrottleBurst), }, nil } @@ -181,6 +186,9 @@ type batchBackend struct { // all requests have been completed and no new will be spawned, since the // sending routine is not running anymore. reqMutex sync.RWMutex + + // Limits the number of requests sent to the backend per second. + throttle flowcontrol.RateLimiter } func (b *batchBackend) Run(stopCh <-chan struct{}) error { @@ -306,6 +314,10 @@ func (b *batchBackend) sendBatchEvents(events []auditinternal.Event) { list := auditinternal.EventList{Items: events} + if b.throttle != nil { + b.throttle.Accept() + } + // Locking reqMutex for read will guarantee that the shutdown process will // block until the goroutine started below is finished. At the same time, it // will not prevent other batches from being proceed further this point.