diff --git a/staging/src/k8s.io/apiserver/Godeps/Godeps.json b/staging/src/k8s.io/apiserver/Godeps/Godeps.json index d4d6b7e7c0e..cc9ecc45c52 100644 --- a/staging/src/k8s.io/apiserver/Godeps/Godeps.json +++ b/staging/src/k8s.io/apiserver/Godeps/Godeps.json @@ -1681,6 +1681,10 @@ { "ImportPath": "k8s.io/client-go/util/cert", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + }, + { + "ImportPath": "k8s.io/client-go/util/flowcontrol", + "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" } ] } diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/BUILD b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/BUILD index fd43d807e7a..fcee573ce32 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/BUILD +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/BUILD @@ -41,6 +41,8 @@ go_library( "//vendor/k8s.io/apiserver/pkg/apis/audit/v1beta1:go_default_library", "//vendor/k8s.io/apiserver/pkg/audit:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/webhook:go_default_library", + "//vendor/k8s.io/client-go/rest:go_default_library", + "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go index bcec271b500..41d8c0ed92a 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go @@ -34,6 +34,8 @@ import ( auditv1beta1 "k8s.io/apiserver/pkg/apis/audit/v1beta1" "k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/util/webhook" + "k8s.io/client-go/rest" + "k8s.io/client-go/util/flowcontrol" ) const ( @@ -58,9 +60,13 @@ const ( // // TODO(ericchiang): Make these value configurable. Maybe through a // kubeconfig extension? - defaultBatchBufferSize = 1000 // Buffer up to 1000 events before blocking. - defaultBatchMaxSize = 100 // Only send 100 events at a time. - defaultBatchMaxWait = time.Minute // Send events at least once a minute. + defaultBatchBufferSize = 10000 // Buffer up to 10000 events before starting discarding. + defaultBatchMaxSize = 400 // Only send up to 400 events at a time. + defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute. + defaultInitialBackoff = 10 * time.Second // Wait at least 10 seconds before retrying. + + defaultBatchThrottleQPS = 10 // Limit the send rate by 10 QPS. + defaultBatchThrottleBurst = 15 // Allow up to 15 QPS burst. ) // The plugin name reported in error metrics. @@ -100,7 +106,8 @@ func init() { } func loadWebhook(configFile string, groupVersion schema.GroupVersion) (*webhook.GenericWebhook, error) { - return webhook.NewGenericWebhook(registry, audit.Codecs, configFile, []schema.GroupVersion{groupVersion}, 0) + return webhook.NewGenericWebhook(registry, audit.Codecs, configFile, + []schema.GroupVersion{groupVersion}, defaultInitialBackoff) } func newBlockingWebhook(configFile string, groupVersion schema.GroupVersion) (*blockingBackend, error) { @@ -151,6 +158,7 @@ func newBatchWebhook(configFile string, groupVersion schema.GroupVersion) (*batc maxBatchSize: defaultBatchMaxSize, maxBatchWait: defaultBatchMaxWait, shutdownCh: make(chan struct{}), + throttle: flowcontrol.NewTokenBucketRateLimiter(defaultBatchThrottleQPS, defaultBatchThrottleBurst), }, nil } @@ -178,6 +186,9 @@ type batchBackend struct { // all requests have been completed and no new will be spawned, since the // sending routine is not running anymore. reqMutex sync.RWMutex + + // Limits the number of requests sent to the backend per second. + throttle flowcontrol.RateLimiter } func (b *batchBackend) Run(stopCh <-chan struct{}) error { @@ -303,6 +314,10 @@ func (b *batchBackend) sendBatchEvents(events []auditinternal.Event) { list := auditinternal.EventList{Items: events} + if b.throttle != nil { + b.throttle.Accept() + } + // Locking reqMutex for read will guarantee that the shutdown process will // block until the goroutine started below is finished. At the same time, it // will not prevent other batches from being proceed further this point. @@ -314,9 +329,9 @@ func (b *batchBackend) sendBatchEvents(events []auditinternal.Event) { defer b.reqMutex.RUnlock() defer runtime.HandleCrash() - err := webhook.WithExponentialBackoff(0, func() error { - return b.w.RestClient.Post().Body(&list).Do().Error() - }) + err := b.w.WithExponentialBackoff(func() rest.Result { + return b.w.RestClient.Post().Body(&list).Do() + }).Error() if err != nil { impacted := make([]*auditinternal.Event, len(events)) for i := range events {