Merge pull request #53417 from crassirostris/audit-defaults

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Adjust defaults of audit webhook backends

This PR:

- increases the default buffer size to contain at lease on the order of magnitude audit events than it's possible to have simultaneous requests (500 AFAIR)
- increase the default batch size. From our load tests .95 size of the log entry is under 2.5KB, therefore 400 entry will sum up to ~1MB request, which sounds reasonable
- increase the initial backoff size. AFAIU, if the initial value is zero, all retries will be used in under 15 seconds (with 0.2 jitter and 1.5 factor), while the backend or a proxy can be unavailable for some reason for 30 seconds and more.
- add throttling to the batching audit webhook

A PR to make these parameters configurable will follow-up

@hzxuzhonghu implemented throttling part of this PR

```release-note
Adjust batching audit webhook default parameters: increase queue size, batch size, and initial backoff.
Add throttling to the batching audit webhook. Default rate limit is 10 QPS.
```

/cc @sttts @tallclair @CaoShuFeng @ericchiang @piosz
This commit is contained in:
Kubernetes Submit Queue 2017-10-06 05:03:32 -07:00 committed by GitHub
commit 5cc95fbf27
3 changed files with 28 additions and 7 deletions

View File

@ -1681,6 +1681,10 @@
{ {
"ImportPath": "k8s.io/client-go/util/cert", "ImportPath": "k8s.io/client-go/util/cert",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/client-go/util/flowcontrol",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
} }
] ]
} }

View File

@ -41,6 +41,8 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/apis/audit/v1beta1:go_default_library", "//vendor/k8s.io/apiserver/pkg/apis/audit/v1beta1:go_default_library",
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library", "//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/webhook:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/webhook:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
], ],
) )

View File

@ -34,6 +34,8 @@ import (
auditv1beta1 "k8s.io/apiserver/pkg/apis/audit/v1beta1" auditv1beta1 "k8s.io/apiserver/pkg/apis/audit/v1beta1"
"k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/util/webhook" "k8s.io/apiserver/pkg/util/webhook"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/flowcontrol"
) )
const ( const (
@ -58,9 +60,13 @@ const (
// //
// TODO(ericchiang): Make these value configurable. Maybe through a // TODO(ericchiang): Make these value configurable. Maybe through a
// kubeconfig extension? // kubeconfig extension?
defaultBatchBufferSize = 1000 // Buffer up to 1000 events before blocking. defaultBatchBufferSize = 10000 // Buffer up to 10000 events before starting discarding.
defaultBatchMaxSize = 100 // Only send 100 events at a time. defaultBatchMaxSize = 400 // Only send up to 400 events at a time.
defaultBatchMaxWait = time.Minute // Send events at least once a minute. defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute.
defaultInitialBackoff = 10 * time.Second // Wait at least 10 seconds before retrying.
defaultBatchThrottleQPS = 10 // Limit the send rate by 10 QPS.
defaultBatchThrottleBurst = 15 // Allow up to 15 QPS burst.
) )
// The plugin name reported in error metrics. // The plugin name reported in error metrics.
@ -100,7 +106,8 @@ func init() {
} }
func loadWebhook(configFile string, groupVersion schema.GroupVersion) (*webhook.GenericWebhook, error) { func loadWebhook(configFile string, groupVersion schema.GroupVersion) (*webhook.GenericWebhook, error) {
return webhook.NewGenericWebhook(registry, audit.Codecs, configFile, []schema.GroupVersion{groupVersion}, 0) return webhook.NewGenericWebhook(registry, audit.Codecs, configFile,
[]schema.GroupVersion{groupVersion}, defaultInitialBackoff)
} }
func newBlockingWebhook(configFile string, groupVersion schema.GroupVersion) (*blockingBackend, error) { func newBlockingWebhook(configFile string, groupVersion schema.GroupVersion) (*blockingBackend, error) {
@ -151,6 +158,7 @@ func newBatchWebhook(configFile string, groupVersion schema.GroupVersion) (*batc
maxBatchSize: defaultBatchMaxSize, maxBatchSize: defaultBatchMaxSize,
maxBatchWait: defaultBatchMaxWait, maxBatchWait: defaultBatchMaxWait,
shutdownCh: make(chan struct{}), shutdownCh: make(chan struct{}),
throttle: flowcontrol.NewTokenBucketRateLimiter(defaultBatchThrottleQPS, defaultBatchThrottleBurst),
}, nil }, nil
} }
@ -178,6 +186,9 @@ type batchBackend struct {
// all requests have been completed and no new will be spawned, since the // all requests have been completed and no new will be spawned, since the
// sending routine is not running anymore. // sending routine is not running anymore.
reqMutex sync.RWMutex reqMutex sync.RWMutex
// Limits the number of requests sent to the backend per second.
throttle flowcontrol.RateLimiter
} }
func (b *batchBackend) Run(stopCh <-chan struct{}) error { func (b *batchBackend) Run(stopCh <-chan struct{}) error {
@ -303,6 +314,10 @@ func (b *batchBackend) sendBatchEvents(events []auditinternal.Event) {
list := auditinternal.EventList{Items: events} list := auditinternal.EventList{Items: events}
if b.throttle != nil {
b.throttle.Accept()
}
// Locking reqMutex for read will guarantee that the shutdown process will // Locking reqMutex for read will guarantee that the shutdown process will
// block until the goroutine started below is finished. At the same time, it // block until the goroutine started below is finished. At the same time, it
// will not prevent other batches from being proceed further this point. // will not prevent other batches from being proceed further this point.
@ -314,9 +329,9 @@ func (b *batchBackend) sendBatchEvents(events []auditinternal.Event) {
defer b.reqMutex.RUnlock() defer b.reqMutex.RUnlock()
defer runtime.HandleCrash() defer runtime.HandleCrash()
err := webhook.WithExponentialBackoff(0, func() error { err := b.w.WithExponentialBackoff(func() rest.Result {
return b.w.RestClient.Post().Body(&list).Do().Error() return b.w.RestClient.Post().Body(&list).Do()
}) }).Error()
if err != nil { if err != nil {
impacted := make([]*auditinternal.Event, len(events)) impacted := make([]*auditinternal.Event, len(events))
for i := range events { for i := range events {