mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Make audit batch webhook backend configurable
Signed-off-by: Mik Vyatskov <vmik@google.com>
This commit is contained in:
parent
2aeace402a
commit
7e717ef3a6
@ -74,6 +74,7 @@ go_test(
|
||||
"//vendor/k8s.io/apiserver/pkg/server/options:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/plugin/pkg/audit/webhook:go_default_library",
|
||||
"//vendor/k8s.io/client-go/rest:go_default_library",
|
||||
],
|
||||
)
|
||||
|
@ -28,6 +28,7 @@ import (
|
||||
apiserveroptions "k8s.io/apiserver/pkg/server/options"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
utilconfig "k8s.io/apiserver/pkg/util/flag"
|
||||
auditwebhook "k8s.io/apiserver/plugin/pkg/audit/webhook"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
kapi "k8s.io/kubernetes/pkg/apis/core"
|
||||
@ -55,6 +56,12 @@ func TestAddFlags(t *testing.T) {
|
||||
"--audit-policy-file=/policy",
|
||||
"--audit-webhook-config-file=/webhook-config",
|
||||
"--audit-webhook-mode=blocking",
|
||||
"--audit-webhook-batch-buffer-size=42",
|
||||
"--audit-webhook-batch-max-size=43",
|
||||
"--audit-webhook-batch-max-wait=1s",
|
||||
"--audit-webhook-batch-throttle-qps=43.5",
|
||||
"--audit-webhook-batch-throttle-burst=44",
|
||||
"--audit-webhook-batch-initial-backoff=2s",
|
||||
"--authentication-token-webhook-cache-ttl=3m",
|
||||
"--authentication-token-webhook-config-file=/token-webhook-config",
|
||||
"--authorization-mode=AlwaysDeny",
|
||||
@ -170,6 +177,14 @@ func TestAddFlags(t *testing.T) {
|
||||
WebhookOptions: apiserveroptions.AuditWebhookOptions{
|
||||
Mode: "blocking",
|
||||
ConfigFile: "/webhook-config",
|
||||
BatchConfig: auditwebhook.BatchBackendConfig{
|
||||
BufferSize: 42,
|
||||
MaxBatchSize: 43,
|
||||
MaxBatchWait: 1 * time.Second,
|
||||
ThrottleQPS: 43.5,
|
||||
ThrottleBurst: 44,
|
||||
InitialBackoff: 2 * time.Second,
|
||||
},
|
||||
},
|
||||
PolicyFile: "/policy",
|
||||
},
|
||||
|
@ -77,12 +77,17 @@ type AuditWebhookOptions struct {
|
||||
//
|
||||
// Defaults to asynchronous batch events.
|
||||
Mode string
|
||||
// Configuration for batching webhook. Only used in batch mode.
|
||||
BatchConfig pluginwebhook.BatchBackendConfig
|
||||
}
|
||||
|
||||
func NewAuditOptions() *AuditOptions {
|
||||
return &AuditOptions{
|
||||
WebhookOptions: AuditWebhookOptions{Mode: pluginwebhook.ModeBatch},
|
||||
LogOptions: AuditLogOptions{Format: pluginlog.FormatJson},
|
||||
WebhookOptions: AuditWebhookOptions{
|
||||
Mode: pluginwebhook.ModeBatch,
|
||||
BatchConfig: pluginwebhook.NewDefaultBatchBackendConfig(),
|
||||
},
|
||||
LogOptions: AuditLogOptions{Format: pluginlog.FormatJson},
|
||||
}
|
||||
}
|
||||
|
||||
@ -102,7 +107,7 @@ func (o *AuditOptions) Validate() []error {
|
||||
allErrors = append(allErrors, fmt.Errorf("feature '%s' must be enabled to set option --audit-webhook-config-file", features.AdvancedAuditing))
|
||||
}
|
||||
} else {
|
||||
// check webhook mode
|
||||
// Check webhook mode
|
||||
validMode := false
|
||||
for _, m := range pluginwebhook.AllowedModes {
|
||||
if m == o.WebhookOptions.Mode {
|
||||
@ -114,7 +119,21 @@ func (o *AuditOptions) Validate() []error {
|
||||
allErrors = append(allErrors, fmt.Errorf("invalid audit webhook mode %s, allowed modes are %q", o.WebhookOptions.Mode, strings.Join(pluginwebhook.AllowedModes, ",")))
|
||||
}
|
||||
|
||||
// check log format
|
||||
// Check webhook batch configuration
|
||||
if o.WebhookOptions.BatchConfig.BufferSize <= 0 {
|
||||
allErrors = append(allErrors, fmt.Errorf("invalid audit batch webhook buffer size %v, must be a positive number", o.WebhookOptions.BatchConfig.BufferSize))
|
||||
}
|
||||
if o.WebhookOptions.BatchConfig.MaxBatchSize <= 0 {
|
||||
allErrors = append(allErrors, fmt.Errorf("invalid audit batch webhook max batch size %v, must be a positive number", o.WebhookOptions.BatchConfig.MaxBatchSize))
|
||||
}
|
||||
if o.WebhookOptions.BatchConfig.ThrottleQPS <= 0 {
|
||||
allErrors = append(allErrors, fmt.Errorf("invalid audit batch webhook throttle QPS %v, must be a positive number", o.WebhookOptions.BatchConfig.ThrottleQPS))
|
||||
}
|
||||
if o.WebhookOptions.BatchConfig.ThrottleBurst <= 0 {
|
||||
allErrors = append(allErrors, fmt.Errorf("invalid audit batch webhook throttle burst %v, must be a positive number", o.WebhookOptions.BatchConfig.ThrottleBurst))
|
||||
}
|
||||
|
||||
// Check log format
|
||||
validFormat := false
|
||||
for _, f := range pluginlog.AllowedFormats {
|
||||
if f == o.LogOptions.Format {
|
||||
@ -249,6 +268,24 @@ func (o *AuditWebhookOptions) AddFlags(fs *pflag.FlagSet) {
|
||||
"Strategy for sending audit events. Blocking indicates sending events should block"+
|
||||
" server responses. Batch causes the webhook to buffer and send events"+
|
||||
" asynchronously. Known modes are "+strings.Join(pluginwebhook.AllowedModes, ",")+".")
|
||||
fs.IntVar(&o.BatchConfig.BufferSize, "audit-webhook-batch-buffer-size",
|
||||
o.BatchConfig.BufferSize, "The size of the buffer to store events before "+
|
||||
"batching and sending to the webhook. Only used in batch mode.")
|
||||
fs.IntVar(&o.BatchConfig.MaxBatchSize, "audit-webhook-batch-max-size",
|
||||
o.BatchConfig.MaxBatchSize, "The maximum size of a batch sent to the webhook. "+
|
||||
"Only used in batch mode.")
|
||||
fs.DurationVar(&o.BatchConfig.MaxBatchWait, "audit-webhook-batch-max-wait",
|
||||
o.BatchConfig.MaxBatchWait, "The amount of time to wait before force sending the "+
|
||||
"batch that hadn't reached the max size. Only used in batch mode.")
|
||||
fs.Float32Var(&o.BatchConfig.ThrottleQPS, "audit-webhook-batch-throttle-qps",
|
||||
o.BatchConfig.ThrottleQPS, "Maximum average number of requests per second. "+
|
||||
"Only used in batch mode.")
|
||||
fs.IntVar(&o.BatchConfig.ThrottleBurst, "audit-webhook-batch-throttle-burst",
|
||||
o.BatchConfig.ThrottleBurst, "Maximum number of requests sent at the same "+
|
||||
"moment if ThrottleQPS was not utilized before. Only used in batch mode.")
|
||||
fs.DurationVar(&o.BatchConfig.InitialBackoff, "audit-webhook-batch-initial-backoff",
|
||||
o.BatchConfig.InitialBackoff, "The amount of time to wait before retrying the "+
|
||||
"first failed requests. Only used in batch mode.")
|
||||
}
|
||||
|
||||
func (o *AuditWebhookOptions) applyTo(c *server.Config) error {
|
||||
@ -256,7 +293,7 @@ func (o *AuditWebhookOptions) applyTo(c *server.Config) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
webhook, err := pluginwebhook.NewBackend(o.ConfigFile, o.Mode, auditv1beta1.SchemeGroupVersion)
|
||||
webhook, err := pluginwebhook.NewBackend(o.ConfigFile, o.Mode, auditv1beta1.SchemeGroupVersion, o.BatchConfig)
|
||||
if err != nil {
|
||||
return fmt.Errorf("initializing audit webhook: %v", err)
|
||||
}
|
||||
|
@ -57,9 +57,6 @@ var AllowedModes = []string{
|
||||
|
||||
const (
|
||||
// Default configuration values for ModeBatch.
|
||||
//
|
||||
// TODO(ericchiang): Make these value configurable. Maybe through a
|
||||
// kubeconfig extension?
|
||||
defaultBatchBufferSize = 10000 // Buffer up to 10000 events before starting discarding.
|
||||
defaultBatchMaxSize = 400 // Only send up to 400 events at a time.
|
||||
defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute.
|
||||
@ -72,13 +69,49 @@ const (
|
||||
// The plugin name reported in error metrics.
|
||||
const pluginName = "webhook"
|
||||
|
||||
// BatchBackendConfig represents batching webhook audit backend configuration.
|
||||
type BatchBackendConfig struct {
|
||||
// BufferSize defines a size of the buffering queue.
|
||||
BufferSize int
|
||||
// MaxBatchSize defines maximum size of a batch.
|
||||
MaxBatchSize int
|
||||
// MaxBatchWait defines maximum amount of time to wait for MaxBatchSize
|
||||
// events to be accumulated in the buffer before forcibly sending what's
|
||||
// being accumulated.
|
||||
MaxBatchWait time.Duration
|
||||
|
||||
// ThrottleQPS defines the allowed rate of batches per second sent to the webhook.
|
||||
ThrottleQPS float32
|
||||
// ThrottleBurst defines the maximum rate of batches per second sent to the webhook in case
|
||||
// the capacity defined by ThrottleQPS was not utilized.
|
||||
ThrottleBurst int
|
||||
|
||||
// InitialBackoff defines the amount of time to wait before retrying the requests
|
||||
// to the webhook for the first time.
|
||||
InitialBackoff time.Duration
|
||||
}
|
||||
|
||||
// NewDefaultBatchBackendConfig returns new BatchBackendConfig objects populated by default values.
|
||||
func NewDefaultBatchBackendConfig() BatchBackendConfig {
|
||||
return BatchBackendConfig{
|
||||
BufferSize: defaultBatchBufferSize,
|
||||
MaxBatchSize: defaultBatchMaxSize,
|
||||
MaxBatchWait: defaultBatchMaxWait,
|
||||
|
||||
ThrottleQPS: defaultBatchThrottleQPS,
|
||||
ThrottleBurst: defaultBatchThrottleBurst,
|
||||
|
||||
InitialBackoff: defaultInitialBackoff,
|
||||
}
|
||||
}
|
||||
|
||||
// NewBackend returns an audit backend that sends events over HTTP to an external service.
|
||||
// The mode indicates the caching behavior of the webhook. Either blocking (ModeBlocking)
|
||||
// or buffered with batch POSTs (ModeBatch).
|
||||
func NewBackend(kubeConfigFile string, mode string, groupVersion schema.GroupVersion) (audit.Backend, error) {
|
||||
func NewBackend(kubeConfigFile string, mode string, groupVersion schema.GroupVersion, config BatchBackendConfig) (audit.Backend, error) {
|
||||
switch mode {
|
||||
case ModeBatch:
|
||||
return newBatchWebhook(kubeConfigFile, groupVersion)
|
||||
return newBatchWebhook(kubeConfigFile, groupVersion, config)
|
||||
case ModeBlocking:
|
||||
return newBlockingWebhook(kubeConfigFile, groupVersion)
|
||||
default:
|
||||
@ -105,13 +138,13 @@ func init() {
|
||||
install.Install(groupFactoryRegistry, registry, audit.Scheme)
|
||||
}
|
||||
|
||||
func loadWebhook(configFile string, groupVersion schema.GroupVersion) (*webhook.GenericWebhook, error) {
|
||||
func loadWebhook(configFile string, groupVersion schema.GroupVersion, initialBackoff time.Duration) (*webhook.GenericWebhook, error) {
|
||||
return webhook.NewGenericWebhook(registry, audit.Codecs, configFile,
|
||||
[]schema.GroupVersion{groupVersion}, defaultInitialBackoff)
|
||||
[]schema.GroupVersion{groupVersion}, initialBackoff)
|
||||
}
|
||||
|
||||
func newBlockingWebhook(configFile string, groupVersion schema.GroupVersion) (*blockingBackend, error) {
|
||||
w, err := loadWebhook(configFile, groupVersion)
|
||||
w, err := loadWebhook(configFile, groupVersion, defaultInitialBackoff)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -146,19 +179,19 @@ func (b *blockingBackend) processEvents(ev ...*auditinternal.Event) error {
|
||||
return b.w.RestClient.Post().Body(&list).Do().Error()
|
||||
}
|
||||
|
||||
func newBatchWebhook(configFile string, groupVersion schema.GroupVersion) (*batchBackend, error) {
|
||||
w, err := loadWebhook(configFile, groupVersion)
|
||||
func newBatchWebhook(configFile string, groupVersion schema.GroupVersion, config BatchBackendConfig) (*batchBackend, error) {
|
||||
w, err := loadWebhook(configFile, groupVersion, config.InitialBackoff)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &batchBackend{
|
||||
w: w,
|
||||
buffer: make(chan *auditinternal.Event, defaultBatchBufferSize),
|
||||
maxBatchSize: defaultBatchMaxSize,
|
||||
maxBatchWait: defaultBatchMaxWait,
|
||||
buffer: make(chan *auditinternal.Event, config.BufferSize),
|
||||
maxBatchSize: config.MaxBatchSize,
|
||||
maxBatchWait: config.MaxBatchWait,
|
||||
shutdownCh: make(chan struct{}),
|
||||
throttle: flowcontrol.NewTokenBucketRateLimiter(defaultBatchThrottleQPS, defaultBatchThrottleBurst),
|
||||
throttle: flowcontrol.NewTokenBucketRateLimiter(config.ThrottleQPS, config.ThrottleBurst),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -116,7 +116,7 @@ func newWebhook(t *testing.T, endpoint string, mode string, groupVersion schema.
|
||||
// NOTE(ericchiang): Do we need to use a proper serializer?
|
||||
require.NoError(t, stdjson.NewEncoder(f).Encode(config), "writing kubeconfig")
|
||||
|
||||
backend, err := NewBackend(f.Name(), mode, groupVersion)
|
||||
backend, err := NewBackend(f.Name(), mode, groupVersion, NewDefaultBatchBackendConfig())
|
||||
require.NoError(t, err, "initializing backend")
|
||||
|
||||
return backend
|
||||
|
Loading…
Reference in New Issue
Block a user