Add throttling to the batching audit webhook

Signed-off-by: Mik Vyatskov <vmik@google.com>
This commit is contained in:
Mik Vyatskov 2017-10-05 23:19:45 +02:00
parent 5f4ff9f283
commit 6bce120a11
3 changed files with 17 additions and 0 deletions

View File

@ -1661,6 +1661,10 @@
{ {
"ImportPath": "k8s.io/client-go/util/cert", "ImportPath": "k8s.io/client-go/util/cert",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/client-go/util/flowcontrol",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
} }
] ]
} }

View File

@ -42,6 +42,7 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library", "//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/webhook:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/webhook:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
], ],
) )

View File

@ -35,6 +35,7 @@ import (
"k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/util/webhook" "k8s.io/apiserver/pkg/util/webhook"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/util/flowcontrol"
) )
const ( const (
@ -63,6 +64,9 @@ const (
defaultBatchMaxSize = 400 // Only send up to 400 events at a time. defaultBatchMaxSize = 400 // Only send up to 400 events at a time.
defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute. defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute.
defaultInitialBackoff = 10 * time.Second // Wait at least 10 seconds before retrying. defaultInitialBackoff = 10 * time.Second // Wait at least 10 seconds before retrying.
defaultBatchThrottleQPS = 10 // Limit the send rate by 10 QPS.
defaultBatchThrottleBurst = 15 // Allow up to 15 QPS burst.
) )
// The plugin name reported in error metrics. // The plugin name reported in error metrics.
@ -154,6 +158,7 @@ func newBatchWebhook(configFile string, groupVersion schema.GroupVersion) (*batc
maxBatchSize: defaultBatchMaxSize, maxBatchSize: defaultBatchMaxSize,
maxBatchWait: defaultBatchMaxWait, maxBatchWait: defaultBatchMaxWait,
shutdownCh: make(chan struct{}), shutdownCh: make(chan struct{}),
throttle: flowcontrol.NewTokenBucketRateLimiter(defaultBatchThrottleQPS, defaultBatchThrottleBurst),
}, nil }, nil
} }
@ -181,6 +186,9 @@ type batchBackend struct {
// all requests have been completed and no new will be spawned, since the // all requests have been completed and no new will be spawned, since the
// sending routine is not running anymore. // sending routine is not running anymore.
reqMutex sync.RWMutex reqMutex sync.RWMutex
// Limits the number of requests sent to the backend per second.
throttle flowcontrol.RateLimiter
} }
func (b *batchBackend) Run(stopCh <-chan struct{}) error { func (b *batchBackend) Run(stopCh <-chan struct{}) error {
@ -306,6 +314,10 @@ func (b *batchBackend) sendBatchEvents(events []auditinternal.Event) {
list := auditinternal.EventList{Items: events} list := auditinternal.EventList{Items: events}
if b.throttle != nil {
b.throttle.Accept()
}
// Locking reqMutex for read will guarantee that the shutdown process will // Locking reqMutex for read will guarantee that the shutdown process will
// block until the goroutine started below is finished. At the same time, it // block until the goroutine started below is finished. At the same time, it
// will not prevent other batches from being proceed further this point. // will not prevent other batches from being proceed further this point.