diff --git a/staging/src/k8s.io/apiserver/Godeps/Godeps.json b/staging/src/k8s.io/apiserver/Godeps/Godeps.json index a299a71a677..22020cde375 100644 --- a/staging/src/k8s.io/apiserver/Godeps/Godeps.json +++ b/staging/src/k8s.io/apiserver/Godeps/Godeps.json @@ -1661,6 +1661,10 @@ { "ImportPath": "k8s.io/client-go/util/cert", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + }, + { + "ImportPath": "k8s.io/client-go/util/flowcontrol", + "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" } ] } diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/BUILD b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/BUILD index dfd5e16ce04..fcee573ce32 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/BUILD +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/BUILD @@ -42,6 +42,7 @@ go_library( "//vendor/k8s.io/apiserver/pkg/audit:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/webhook:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", + "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go index 4f8190b84d9..41d8c0ed92a 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go @@ -35,6 +35,7 @@ import ( "k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/util/webhook" "k8s.io/client-go/rest" + "k8s.io/client-go/util/flowcontrol" ) const ( @@ -63,6 +64,9 @@ const ( defaultBatchMaxSize = 400 // Only send up to 400 events at a time. defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute. defaultInitialBackoff = 10 * time.Second // Wait at least 10 seconds before retrying. + + defaultBatchThrottleQPS = 10 // Limit the send rate by 10 QPS. + defaultBatchThrottleBurst = 15 // Allow up to 15 QPS burst. ) // The plugin name reported in error metrics. @@ -154,6 +158,7 @@ func newBatchWebhook(configFile string, groupVersion schema.GroupVersion) (*batc maxBatchSize: defaultBatchMaxSize, maxBatchWait: defaultBatchMaxWait, shutdownCh: make(chan struct{}), + throttle: flowcontrol.NewTokenBucketRateLimiter(defaultBatchThrottleQPS, defaultBatchThrottleBurst), }, nil } @@ -181,6 +186,9 @@ type batchBackend struct { // all requests have been completed and no new will be spawned, since the // sending routine is not running anymore. reqMutex sync.RWMutex + + // Limits the number of requests sent to the backend per second. + throttle flowcontrol.RateLimiter } func (b *batchBackend) Run(stopCh <-chan struct{}) error { @@ -306,6 +314,10 @@ func (b *batchBackend) sendBatchEvents(events []auditinternal.Event) { list := auditinternal.EventList{Items: events} + if b.throttle != nil { + b.throttle.Accept() + } + // Locking reqMutex for read will guarantee that the shutdown process will // block until the goroutine started below is finished. At the same time, it // will not prevent other batches from being proceed further this point.