mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Merge pull request #60237 from crassirostris/audit-use-buffered-backend
Automatic merge from submit-queue (batch tested with PRs 60542, 60237). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Audit use buffered backend This is the next step after https://github.com/kubernetes/kubernetes/pull/60076 This PR fixes https://github.com/kubernetes/kubernetes/issues/53020, to address https://github.com/kubernetes/kubernetes/issues/53006 later In this PR buffered backend, introduced in https://github.com/kubernetes/kubernetes/pull/60076, is used to replace ad-hoc solution for webhook and add an ability to enable buffering for the log audit backend. ```release-note Log audit backend can now be configured to perform batching before writing events to disk. ``` /cc @sttts @tallclair @ericchiang @CaoShuFeng
This commit is contained in:
commit
209cdd9048
@ -1538,7 +1538,7 @@ function start-kube-apiserver {
|
|||||||
params+=" --audit-webhook-batch-throttle-burst=${ADVANCED_AUDIT_WEBHOOK_THROTTLE_BURST}"
|
params+=" --audit-webhook-batch-throttle-burst=${ADVANCED_AUDIT_WEBHOOK_THROTTLE_BURST}"
|
||||||
fi
|
fi
|
||||||
if [[ -n "${ADVANCED_AUDIT_WEBHOOK_INITIAL_BACKOFF:-}" ]]; then
|
if [[ -n "${ADVANCED_AUDIT_WEBHOOK_INITIAL_BACKOFF:-}" ]]; then
|
||||||
params+=" --audit-webhook-batch-initial-backoff=${ADVANCED_AUDIT_WEBHOOK_INITIAL_BACKOFF}"
|
params+=" --audit-webhook-initial-backoff=${ADVANCED_AUDIT_WEBHOOK_INITIAL_BACKOFF}"
|
||||||
fi
|
fi
|
||||||
create-master-audit-webhook-config "${audit_webhook_config_file}"
|
create-master-audit-webhook-config "${audit_webhook_config_file}"
|
||||||
audit_webhook_config_mount="{\"name\": \"auditwebhookconfigmount\",\"mountPath\": \"${audit_webhook_config_file}\", \"readOnly\": true},"
|
audit_webhook_config_mount="{\"name\": \"auditwebhookconfigmount\",\"mountPath\": \"${audit_webhook_config_file}\", \"readOnly\": true},"
|
||||||
|
@ -46,7 +46,7 @@ go_test(
|
|||||||
"//vendor/k8s.io/apiserver/pkg/server/options:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/server/options:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/plugin/pkg/audit/webhook:go_default_library",
|
"//vendor/k8s.io/apiserver/plugin/pkg/audit/buffered:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/rest:go_default_library",
|
"//vendor/k8s.io/client-go/rest:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -29,7 +29,7 @@ import (
|
|||||||
genericoptions "k8s.io/apiserver/pkg/server/options"
|
genericoptions "k8s.io/apiserver/pkg/server/options"
|
||||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||||
utilflag "k8s.io/apiserver/pkg/util/flag"
|
utilflag "k8s.io/apiserver/pkg/util/flag"
|
||||||
auditwebhook "k8s.io/apiserver/plugin/pkg/audit/webhook"
|
auditbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered"
|
||||||
restclient "k8s.io/client-go/rest"
|
restclient "k8s.io/client-go/rest"
|
||||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||||
kapi "k8s.io/kubernetes/pkg/apis/core"
|
kapi "k8s.io/kubernetes/pkg/apis/core"
|
||||||
@ -54,15 +54,23 @@ func TestAddFlags(t *testing.T) {
|
|||||||
"--audit-log-maxbackup=12",
|
"--audit-log-maxbackup=12",
|
||||||
"--audit-log-maxsize=13",
|
"--audit-log-maxsize=13",
|
||||||
"--audit-log-path=/var/log",
|
"--audit-log-path=/var/log",
|
||||||
|
"--audit-log-mode=blocking",
|
||||||
|
"--audit-log-batch-buffer-size=46",
|
||||||
|
"--audit-log-batch-max-size=47",
|
||||||
|
"--audit-log-batch-max-wait=48s",
|
||||||
|
"--audit-log-batch-throttle-enable=true",
|
||||||
|
"--audit-log-batch-throttle-qps=49.5",
|
||||||
|
"--audit-log-batch-throttle-burst=50",
|
||||||
"--audit-policy-file=/policy",
|
"--audit-policy-file=/policy",
|
||||||
"--audit-webhook-config-file=/webhook-config",
|
"--audit-webhook-config-file=/webhook-config",
|
||||||
"--audit-webhook-mode=blocking",
|
"--audit-webhook-mode=blocking",
|
||||||
"--audit-webhook-batch-buffer-size=42",
|
"--audit-webhook-batch-buffer-size=42",
|
||||||
"--audit-webhook-batch-max-size=43",
|
"--audit-webhook-batch-max-size=43",
|
||||||
"--audit-webhook-batch-max-wait=1s",
|
"--audit-webhook-batch-max-wait=1s",
|
||||||
|
"--audit-webhook-batch-throttle-enable=false",
|
||||||
"--audit-webhook-batch-throttle-qps=43.5",
|
"--audit-webhook-batch-throttle-qps=43.5",
|
||||||
"--audit-webhook-batch-throttle-burst=44",
|
"--audit-webhook-batch-throttle-burst=44",
|
||||||
"--audit-webhook-batch-initial-backoff=2s",
|
"--audit-webhook-initial-backoff=2s",
|
||||||
"--authentication-token-webhook-cache-ttl=3m",
|
"--authentication-token-webhook-cache-ttl=3m",
|
||||||
"--authentication-token-webhook-config-file=/token-webhook-config",
|
"--authentication-token-webhook-config-file=/token-webhook-config",
|
||||||
"--authorization-mode=AlwaysDeny",
|
"--authorization-mode=AlwaysDeny",
|
||||||
@ -180,19 +188,33 @@ func TestAddFlags(t *testing.T) {
|
|||||||
MaxBackups: 12,
|
MaxBackups: 12,
|
||||||
MaxSize: 13,
|
MaxSize: 13,
|
||||||
Format: "json",
|
Format: "json",
|
||||||
|
BatchOptions: apiserveroptions.AuditBatchOptions{
|
||||||
|
Mode: "blocking",
|
||||||
|
BatchConfig: auditbuffered.BatchConfig{
|
||||||
|
BufferSize: 46,
|
||||||
|
MaxBatchSize: 47,
|
||||||
|
MaxBatchWait: 48 * time.Second,
|
||||||
|
ThrottleEnable: true,
|
||||||
|
ThrottleQPS: 49.5,
|
||||||
|
ThrottleBurst: 50,
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
WebhookOptions: apiserveroptions.AuditWebhookOptions{
|
WebhookOptions: apiserveroptions.AuditWebhookOptions{
|
||||||
Mode: "blocking",
|
|
||||||
ConfigFile: "/webhook-config",
|
ConfigFile: "/webhook-config",
|
||||||
BatchConfig: auditwebhook.BatchBackendConfig{
|
BatchOptions: apiserveroptions.AuditBatchOptions{
|
||||||
|
Mode: "blocking",
|
||||||
|
BatchConfig: auditbuffered.BatchConfig{
|
||||||
BufferSize: 42,
|
BufferSize: 42,
|
||||||
MaxBatchSize: 43,
|
MaxBatchSize: 43,
|
||||||
MaxBatchWait: 1 * time.Second,
|
MaxBatchWait: 1 * time.Second,
|
||||||
|
ThrottleEnable: false,
|
||||||
ThrottleQPS: 43.5,
|
ThrottleQPS: 43.5,
|
||||||
ThrottleBurst: 44,
|
ThrottleBurst: 44,
|
||||||
InitialBackoff: 2 * time.Second,
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
InitialBackoff: 2 * time.Second,
|
||||||
|
},
|
||||||
PolicyFile: "/policy",
|
PolicyFile: "/policy",
|
||||||
},
|
},
|
||||||
Features: &apiserveroptions.FeatureOptions{
|
Features: &apiserveroptions.FeatureOptions{
|
||||||
|
@ -1470,6 +1470,10 @@
|
|||||||
"ImportPath": "k8s.io/apiserver/pkg/util/wsstream",
|
"ImportPath": "k8s.io/apiserver/pkg/util/wsstream",
|
||||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"ImportPath": "k8s.io/apiserver/plugin/pkg/audit/buffered",
|
||||||
|
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/apiserver/plugin/pkg/audit/log",
|
"ImportPath": "k8s.io/apiserver/plugin/pkg/audit/log",
|
||||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||||
|
@ -55,6 +55,7 @@ go_library(
|
|||||||
"//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library",
|
||||||
|
"//vendor/k8s.io/apiserver/plugin/pkg/audit/buffered:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/plugin/pkg/audit/log:go_default_library",
|
"//vendor/k8s.io/apiserver/plugin/pkg/audit/log:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/plugin/pkg/audit/webhook:go_default_library",
|
"//vendor/k8s.io/apiserver/plugin/pkg/audit/webhook:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/informers:go_default_library",
|
"//vendor/k8s.io/client-go/informers:go_default_library",
|
||||||
|
@ -21,6 +21,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
"github.com/spf13/pflag"
|
"github.com/spf13/pflag"
|
||||||
@ -32,6 +33,7 @@ import (
|
|||||||
"k8s.io/apiserver/pkg/features"
|
"k8s.io/apiserver/pkg/features"
|
||||||
"k8s.io/apiserver/pkg/server"
|
"k8s.io/apiserver/pkg/server"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
|
pluginbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered"
|
||||||
pluginlog "k8s.io/apiserver/plugin/pkg/audit/log"
|
pluginlog "k8s.io/apiserver/plugin/pkg/audit/log"
|
||||||
pluginwebhook "k8s.io/apiserver/plugin/pkg/audit/webhook"
|
pluginwebhook "k8s.io/apiserver/plugin/pkg/audit/webhook"
|
||||||
)
|
)
|
||||||
@ -58,6 +60,33 @@ type AuditOptions struct {
|
|||||||
WebhookOptions AuditWebhookOptions
|
WebhookOptions AuditWebhookOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
// ModeBatch indicates that the audit backend should buffer audit events
|
||||||
|
// internally, sending batch updates either once a certain number of
|
||||||
|
// events have been received or a certain amount of time has passed.
|
||||||
|
ModeBatch = "batch"
|
||||||
|
// ModeBlocking causes the audit backend to block on every attempt to process
|
||||||
|
// a set of events. This causes requests to the API server to wait for the
|
||||||
|
// flush before sending a response.
|
||||||
|
ModeBlocking = "blocking"
|
||||||
|
)
|
||||||
|
|
||||||
|
// AllowedModes is the modes known for audit backends.
|
||||||
|
var AllowedModes = []string{
|
||||||
|
ModeBatch,
|
||||||
|
ModeBlocking,
|
||||||
|
}
|
||||||
|
|
||||||
|
type AuditBatchOptions struct {
|
||||||
|
// Should the backend asynchronous batch events to the webhook backend or
|
||||||
|
// should the backend block responses?
|
||||||
|
//
|
||||||
|
// Defaults to asynchronous batch events.
|
||||||
|
Mode string
|
||||||
|
// Configuration for batching backend. Only used in batch mode.
|
||||||
|
BatchConfig pluginbuffered.BatchConfig
|
||||||
|
}
|
||||||
|
|
||||||
// AuditLogOptions determines the output of the structured audit log by default.
|
// AuditLogOptions determines the output of the structured audit log by default.
|
||||||
// If the AdvancedAuditing feature is set to false, AuditLogOptions holds the legacy
|
// If the AdvancedAuditing feature is set to false, AuditLogOptions holds the legacy
|
||||||
// audit log writer.
|
// audit log writer.
|
||||||
@ -67,27 +96,37 @@ type AuditLogOptions struct {
|
|||||||
MaxBackups int
|
MaxBackups int
|
||||||
MaxSize int
|
MaxSize int
|
||||||
Format string
|
Format string
|
||||||
|
|
||||||
|
BatchOptions AuditBatchOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
// AuditWebhookOptions control the webhook configuration for audit events.
|
// AuditWebhookOptions control the webhook configuration for audit events.
|
||||||
type AuditWebhookOptions struct {
|
type AuditWebhookOptions struct {
|
||||||
ConfigFile string
|
ConfigFile string
|
||||||
// Should the webhook asynchronous batch events to the webhook backend or
|
InitialBackoff time.Duration
|
||||||
// should the webhook block responses?
|
|
||||||
//
|
BatchOptions AuditBatchOptions
|
||||||
// Defaults to asynchronous batch events.
|
|
||||||
Mode string
|
|
||||||
// Configuration for batching webhook. Only used in batch mode.
|
|
||||||
BatchConfig pluginwebhook.BatchBackendConfig
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewAuditOptions() *AuditOptions {
|
func NewAuditOptions() *AuditOptions {
|
||||||
|
defaultLogBatchConfig := pluginbuffered.NewDefaultBatchConfig()
|
||||||
|
defaultLogBatchConfig.ThrottleEnable = false
|
||||||
|
|
||||||
return &AuditOptions{
|
return &AuditOptions{
|
||||||
WebhookOptions: AuditWebhookOptions{
|
WebhookOptions: AuditWebhookOptions{
|
||||||
Mode: pluginwebhook.ModeBatch,
|
BatchOptions: AuditBatchOptions{
|
||||||
BatchConfig: pluginwebhook.NewDefaultBatchBackendConfig(),
|
Mode: ModeBatch,
|
||||||
|
BatchConfig: defaultLogBatchConfig,
|
||||||
|
},
|
||||||
|
InitialBackoff: pluginwebhook.DefaultInitialBackoff,
|
||||||
|
},
|
||||||
|
LogOptions: AuditLogOptions{
|
||||||
|
Format: pluginlog.FormatJson,
|
||||||
|
BatchOptions: AuditBatchOptions{
|
||||||
|
Mode: ModeBatch,
|
||||||
|
BatchConfig: pluginbuffered.NewDefaultBatchConfig(),
|
||||||
|
},
|
||||||
},
|
},
|
||||||
LogOptions: AuditLogOptions{Format: pluginlog.FormatJson},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -107,30 +146,20 @@ func (o *AuditOptions) Validate() []error {
|
|||||||
allErrors = append(allErrors, fmt.Errorf("feature '%s' must be enabled to set option --audit-webhook-config-file", features.AdvancedAuditing))
|
allErrors = append(allErrors, fmt.Errorf("feature '%s' must be enabled to set option --audit-webhook-config-file", features.AdvancedAuditing))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Check webhook mode
|
// check webhook configuration
|
||||||
validMode := false
|
if err := validateBackendMode(pluginwebhook.PluginName, o.WebhookOptions.BatchOptions.Mode); err != nil {
|
||||||
for _, m := range pluginwebhook.AllowedModes {
|
allErrors = append(allErrors, err)
|
||||||
if m == o.WebhookOptions.Mode {
|
|
||||||
validMode = true
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
}
|
if err := validateBackendBatchConfig(pluginwebhook.PluginName, o.LogOptions.BatchOptions.BatchConfig); err != nil {
|
||||||
if !validMode {
|
allErrors = append(allErrors, err)
|
||||||
allErrors = append(allErrors, fmt.Errorf("invalid audit webhook mode %s, allowed modes are %q", o.WebhookOptions.Mode, strings.Join(pluginwebhook.AllowedModes, ",")))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check webhook batch configuration
|
// check log configuration
|
||||||
if o.WebhookOptions.BatchConfig.BufferSize <= 0 {
|
if err := validateBackendMode(pluginlog.PluginName, o.LogOptions.BatchOptions.Mode); err != nil {
|
||||||
allErrors = append(allErrors, fmt.Errorf("invalid audit batch webhook buffer size %v, must be a positive number", o.WebhookOptions.BatchConfig.BufferSize))
|
allErrors = append(allErrors, err)
|
||||||
}
|
}
|
||||||
if o.WebhookOptions.BatchConfig.MaxBatchSize <= 0 {
|
if err := validateBackendBatchConfig(pluginlog.PluginName, o.LogOptions.BatchOptions.BatchConfig); err != nil {
|
||||||
allErrors = append(allErrors, fmt.Errorf("invalid audit batch webhook max batch size %v, must be a positive number", o.WebhookOptions.BatchConfig.MaxBatchSize))
|
allErrors = append(allErrors, err)
|
||||||
}
|
|
||||||
if o.WebhookOptions.BatchConfig.ThrottleQPS <= 0 {
|
|
||||||
allErrors = append(allErrors, fmt.Errorf("invalid audit batch webhook throttle QPS %v, must be a positive number", o.WebhookOptions.BatchConfig.ThrottleQPS))
|
|
||||||
}
|
|
||||||
if o.WebhookOptions.BatchConfig.ThrottleBurst <= 0 {
|
|
||||||
allErrors = append(allErrors, fmt.Errorf("invalid audit batch webhook throttle burst %v, must be a positive number", o.WebhookOptions.BatchConfig.ThrottleBurst))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check log format
|
// Check log format
|
||||||
@ -160,6 +189,31 @@ func (o *AuditOptions) Validate() []error {
|
|||||||
return allErrors
|
return allErrors
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func validateBackendMode(pluginName string, mode string) error {
|
||||||
|
for _, m := range AllowedModes {
|
||||||
|
if m == mode {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fmt.Errorf("invalid audit %s mode %s, allowed modes are %q", pluginName, mode, strings.Join(AllowedModes, ","))
|
||||||
|
}
|
||||||
|
|
||||||
|
func validateBackendBatchConfig(pluginName string, config pluginbuffered.BatchConfig) error {
|
||||||
|
if config.BufferSize <= 0 {
|
||||||
|
return fmt.Errorf("invalid audit batch %s buffer size %v, must be a positive number", pluginName, config.BufferSize)
|
||||||
|
}
|
||||||
|
if config.MaxBatchSize <= 0 {
|
||||||
|
return fmt.Errorf("invalid audit batch %s max batch size %v, must be a positive number", pluginName, config.MaxBatchSize)
|
||||||
|
}
|
||||||
|
if config.ThrottleQPS <= 0 {
|
||||||
|
return fmt.Errorf("invalid audit batch %s throttle QPS %v, must be a positive number", pluginName, config.ThrottleQPS)
|
||||||
|
}
|
||||||
|
if config.ThrottleBurst <= 0 {
|
||||||
|
return fmt.Errorf("invalid audit batch %s throttle burst %v, must be a positive number", pluginName, config.ThrottleBurst)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (o *AuditOptions) AddFlags(fs *pflag.FlagSet) {
|
func (o *AuditOptions) AddFlags(fs *pflag.FlagSet) {
|
||||||
if o == nil {
|
if o == nil {
|
||||||
return
|
return
|
||||||
@ -170,7 +224,9 @@ func (o *AuditOptions) AddFlags(fs *pflag.FlagSet) {
|
|||||||
" With AdvancedAuditing, a profile is required to enable auditing.")
|
" With AdvancedAuditing, a profile is required to enable auditing.")
|
||||||
|
|
||||||
o.LogOptions.AddFlags(fs)
|
o.LogOptions.AddFlags(fs)
|
||||||
|
o.LogOptions.BatchOptions.AddFlags(pluginlog.PluginName, fs)
|
||||||
o.WebhookOptions.AddFlags(fs)
|
o.WebhookOptions.AddFlags(fs)
|
||||||
|
o.WebhookOptions.BatchOptions.AddFlags(pluginwebhook.PluginName, fs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o *AuditOptions) ApplyTo(c *server.Config) error {
|
func (o *AuditOptions) ApplyTo(c *server.Config) error {
|
||||||
@ -216,6 +272,36 @@ func (o *AuditOptions) applyTo(c *server.Config) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (o *AuditBatchOptions) AddFlags(pluginName string, fs *pflag.FlagSet) {
|
||||||
|
fs.StringVar(&o.Mode, fmt.Sprintf("audit-%s-mode", pluginName), o.Mode,
|
||||||
|
"Strategy for sending audit events. Blocking indicates sending events should block"+
|
||||||
|
" server responses. Batch causes the backend to buffer and write events"+
|
||||||
|
" asynchronously. Known modes are "+strings.Join(AllowedModes, ",")+".")
|
||||||
|
fs.IntVar(&o.BatchConfig.BufferSize, fmt.Sprintf("audit-%s-batch-buffer-size", pluginName),
|
||||||
|
o.BatchConfig.BufferSize, "The size of the buffer to store events before "+
|
||||||
|
"batching and writing. Only used in batch mode.")
|
||||||
|
fs.IntVar(&o.BatchConfig.MaxBatchSize, fmt.Sprintf("audit-%s-batch-max-size", pluginName),
|
||||||
|
o.BatchConfig.MaxBatchSize, "The maximum size of a batch. Only used in batch mode.")
|
||||||
|
fs.DurationVar(&o.BatchConfig.MaxBatchWait, fmt.Sprintf("audit-%s-batch-max-wait", pluginName),
|
||||||
|
o.BatchConfig.MaxBatchWait, "The amount of time to wait before force writing the "+
|
||||||
|
"batch that hadn't reached the max size. Only used in batch mode.")
|
||||||
|
fs.BoolVar(&o.BatchConfig.ThrottleEnable, fmt.Sprintf("audit-%s-batch-throttle-enable", pluginName),
|
||||||
|
o.BatchConfig.ThrottleEnable, "Whether batching throttling is enabled. Only used in batch mode.")
|
||||||
|
fs.Float32Var(&o.BatchConfig.ThrottleQPS, fmt.Sprintf("audit-%s-batch-throttle-qps", pluginName),
|
||||||
|
o.BatchConfig.ThrottleQPS, "Maximum average number of batches per second. "+
|
||||||
|
"Only used in batch mode.")
|
||||||
|
fs.IntVar(&o.BatchConfig.ThrottleBurst, fmt.Sprintf("audit-%s-batch-throttle-burst", pluginName),
|
||||||
|
o.BatchConfig.ThrottleBurst, "Maximum number of requests sent at the same "+
|
||||||
|
"moment if ThrottleQPS was not utilized before. Only used in batch mode.")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *AuditBatchOptions) wrapBackend(delegate audit.Backend) audit.Backend {
|
||||||
|
if o.Mode == ModeBlocking {
|
||||||
|
return delegate
|
||||||
|
}
|
||||||
|
return pluginbuffered.NewBackend(delegate, o.BatchConfig)
|
||||||
|
}
|
||||||
|
|
||||||
func (o *AuditLogOptions) AddFlags(fs *pflag.FlagSet) {
|
func (o *AuditLogOptions) AddFlags(fs *pflag.FlagSet) {
|
||||||
fs.StringVar(&o.Path, "audit-log-path", o.Path,
|
fs.StringVar(&o.Path, "audit-log-path", o.Path,
|
||||||
"If set, all requests coming to the apiserver will be logged to this file. '-' means standard out.")
|
"If set, all requests coming to the apiserver will be logged to this file. '-' means standard out.")
|
||||||
@ -250,7 +336,8 @@ func (o *AuditLogOptions) getWriter() io.Writer {
|
|||||||
|
|
||||||
func (o *AuditLogOptions) advancedApplyTo(c *server.Config) error {
|
func (o *AuditLogOptions) advancedApplyTo(c *server.Config) error {
|
||||||
if w := o.getWriter(); w != nil {
|
if w := o.getWriter(); w != nil {
|
||||||
c.AuditBackend = appendBackend(c.AuditBackend, pluginlog.NewBackend(w, o.Format, auditv1beta1.SchemeGroupVersion))
|
log := pluginlog.NewBackend(w, o.Format, auditv1beta1.SchemeGroupVersion)
|
||||||
|
c.AuditBackend = appendBackend(c.AuditBackend, o.BatchOptions.wrapBackend(log))
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -264,28 +351,12 @@ func (o *AuditWebhookOptions) AddFlags(fs *pflag.FlagSet) {
|
|||||||
fs.StringVar(&o.ConfigFile, "audit-webhook-config-file", o.ConfigFile,
|
fs.StringVar(&o.ConfigFile, "audit-webhook-config-file", o.ConfigFile,
|
||||||
"Path to a kubeconfig formatted file that defines the audit webhook configuration."+
|
"Path to a kubeconfig formatted file that defines the audit webhook configuration."+
|
||||||
" Requires the 'AdvancedAuditing' feature gate.")
|
" Requires the 'AdvancedAuditing' feature gate.")
|
||||||
fs.StringVar(&o.Mode, "audit-webhook-mode", o.Mode,
|
fs.DurationVar(&o.InitialBackoff, "audit-webhook-initial-backoff",
|
||||||
"Strategy for sending audit events. Blocking indicates sending events should block"+
|
o.InitialBackoff, "The amount of time to wait before retrying the first failed request.")
|
||||||
" server responses. Batch causes the webhook to buffer and send events"+
|
fs.DurationVar(&o.InitialBackoff, "audit-webhook-batch-initial-backoff",
|
||||||
" asynchronously. Known modes are "+strings.Join(pluginwebhook.AllowedModes, ",")+".")
|
o.InitialBackoff, "The amount of time to wait before retrying the first failed request.")
|
||||||
fs.IntVar(&o.BatchConfig.BufferSize, "audit-webhook-batch-buffer-size",
|
fs.MarkDeprecated("audit-webhook-batch-initial-backoff",
|
||||||
o.BatchConfig.BufferSize, "The size of the buffer to store events before "+
|
"Deprecated, use --audit-webhook-initial-backoff instead.")
|
||||||
"batching and sending to the webhook. Only used in batch mode.")
|
|
||||||
fs.IntVar(&o.BatchConfig.MaxBatchSize, "audit-webhook-batch-max-size",
|
|
||||||
o.BatchConfig.MaxBatchSize, "The maximum size of a batch sent to the webhook. "+
|
|
||||||
"Only used in batch mode.")
|
|
||||||
fs.DurationVar(&o.BatchConfig.MaxBatchWait, "audit-webhook-batch-max-wait",
|
|
||||||
o.BatchConfig.MaxBatchWait, "The amount of time to wait before force sending the "+
|
|
||||||
"batch that hadn't reached the max size. Only used in batch mode.")
|
|
||||||
fs.Float32Var(&o.BatchConfig.ThrottleQPS, "audit-webhook-batch-throttle-qps",
|
|
||||||
o.BatchConfig.ThrottleQPS, "Maximum average number of requests per second. "+
|
|
||||||
"Only used in batch mode.")
|
|
||||||
fs.IntVar(&o.BatchConfig.ThrottleBurst, "audit-webhook-batch-throttle-burst",
|
|
||||||
o.BatchConfig.ThrottleBurst, "Maximum number of requests sent at the same "+
|
|
||||||
"moment if ThrottleQPS was not utilized before. Only used in batch mode.")
|
|
||||||
fs.DurationVar(&o.BatchConfig.InitialBackoff, "audit-webhook-batch-initial-backoff",
|
|
||||||
o.BatchConfig.InitialBackoff, "The amount of time to wait before retrying the "+
|
|
||||||
"first failed requests. Only used in batch mode.")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o *AuditWebhookOptions) applyTo(c *server.Config) error {
|
func (o *AuditWebhookOptions) applyTo(c *server.Config) error {
|
||||||
@ -293,10 +364,10 @@ func (o *AuditWebhookOptions) applyTo(c *server.Config) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
webhook, err := pluginwebhook.NewBackend(o.ConfigFile, o.Mode, auditv1beta1.SchemeGroupVersion, o.BatchConfig)
|
webhook, err := pluginwebhook.NewBackend(o.ConfigFile, auditv1beta1.SchemeGroupVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("initializing audit webhook: %v", err)
|
return fmt.Errorf("initializing audit webhook: %v", err)
|
||||||
}
|
}
|
||||||
c.AuditBackend = appendBackend(c.AuditBackend, webhook)
|
c.AuditBackend = appendBackend(c.AuditBackend, o.BatchOptions.wrapBackend(webhook))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -153,6 +153,12 @@ func (b *bufferedBackend) Shutdown() {
|
|||||||
<-b.shutdownCh
|
<-b.shutdownCh
|
||||||
|
|
||||||
// Wait until all sending routines exit.
|
// Wait until all sending routines exit.
|
||||||
|
//
|
||||||
|
// - When b.shutdownCh is closed, we know that the goroutine in Run has terminated.
|
||||||
|
// - This means that processIncomingEvents has terminated.
|
||||||
|
// - Which means that b.buffer is closed and cannot accept any new events anymore.
|
||||||
|
// - Because processEvents is called synchronously from the Run goroutine, the waitgroup has its final value.
|
||||||
|
// Hence wg.Wait will not miss any more outgoing batches.
|
||||||
b.wg.Wait()
|
b.wg.Wait()
|
||||||
|
|
||||||
b.delegateBackend.Shutdown()
|
b.delegateBackend.Shutdown()
|
||||||
@ -250,7 +256,7 @@ func (b *bufferedBackend) ProcessEvents(ev ...*auditinternal.Event) {
|
|||||||
// recover from.
|
// recover from.
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := recover(); err != nil {
|
if err := recover(); err != nil {
|
||||||
sendErr = fmt.Errorf("panic when processing events: %v", err)
|
sendErr = fmt.Errorf("audit backend shut down")
|
||||||
}
|
}
|
||||||
if sendErr != nil {
|
if sendErr != nil {
|
||||||
audit.HandlePluginError(pluginName, sendErr, ev[evIndex:]...)
|
audit.HandlePluginError(pluginName, sendErr, ev[evIndex:]...)
|
||||||
|
@ -32,6 +32,9 @@ const (
|
|||||||
FormatLegacy = "legacy"
|
FormatLegacy = "legacy"
|
||||||
// FormatJson saves event in structured json format.
|
// FormatJson saves event in structured json format.
|
||||||
FormatJson = "json"
|
FormatJson = "json"
|
||||||
|
|
||||||
|
// PluginName is the name of this plugin, to be used in help and logs.
|
||||||
|
PluginName = "log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// AllowedFormats are the formats known by log backend.
|
// AllowedFormats are the formats known by log backend.
|
||||||
@ -70,17 +73,17 @@ func (b *backend) logEvent(ev *auditinternal.Event) {
|
|||||||
case FormatJson:
|
case FormatJson:
|
||||||
bs, err := runtime.Encode(audit.Codecs.LegacyCodec(b.groupVersion), ev)
|
bs, err := runtime.Encode(audit.Codecs.LegacyCodec(b.groupVersion), ev)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
audit.HandlePluginError("log", err, ev)
|
audit.HandlePluginError(PluginName, err, ev)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
line = string(bs[:])
|
line = string(bs[:])
|
||||||
default:
|
default:
|
||||||
audit.HandlePluginError("log", fmt.Errorf("log format %q is not in list of known formats (%s)",
|
audit.HandlePluginError(PluginName, fmt.Errorf("log format %q is not in list of known formats (%s)",
|
||||||
b.format, strings.Join(AllowedFormats, ",")), ev)
|
b.format, strings.Join(AllowedFormats, ",")), ev)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if _, err := fmt.Fprint(b.out, line); err != nil {
|
if _, err := fmt.Fprint(b.out, line); err != nil {
|
||||||
audit.HandlePluginError("log", err, ev)
|
audit.HandlePluginError(PluginName, err, ev)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -8,10 +8,7 @@ load(
|
|||||||
|
|
||||||
go_test(
|
go_test(
|
||||||
name = "go_default_test",
|
name = "go_default_test",
|
||||||
srcs = [
|
srcs = ["webhook_test.go"],
|
||||||
"webhook_test.go",
|
|
||||||
"webhook_v1alpha1_test.go",
|
|
||||||
],
|
|
||||||
embed = [":go_default_library"],
|
embed = [":go_default_library"],
|
||||||
deps = [
|
deps = [
|
||||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||||
@ -20,7 +17,6 @@ go_test(
|
|||||||
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer/json:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer/json:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/apis/audit/v1alpha1:go_default_library",
|
|
||||||
"//vendor/k8s.io/apiserver/pkg/apis/audit/v1beta1:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/apis/audit/v1beta1:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/tools/clientcmd/api/v1:go_default_library",
|
"//vendor/k8s.io/client-go/tools/clientcmd/api/v1:go_default_library",
|
||||||
@ -35,7 +31,6 @@ go_library(
|
|||||||
"//vendor/k8s.io/apimachinery/pkg/apimachinery/announced:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/apimachinery/announced:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/apimachinery/registered:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/apimachinery/registered:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
|
||||||
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/apis/audit/install:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/apis/audit/install:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/apis/audit/v1alpha1:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/apis/audit/v1alpha1:go_default_library",
|
||||||
@ -43,7 +38,6 @@ go_library(
|
|||||||
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/util/webhook:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/util/webhook:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/rest:go_default_library",
|
"//vendor/k8s.io/client-go/rest:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
|
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -18,16 +18,12 @@ limitations under the License.
|
|||||||
package webhook
|
package webhook
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/apimachinery/announced"
|
"k8s.io/apimachinery/pkg/apimachinery/announced"
|
||||||
"k8s.io/apimachinery/pkg/apimachinery/registered"
|
"k8s.io/apimachinery/pkg/apimachinery/registered"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/util/runtime"
|
|
||||||
auditinternal "k8s.io/apiserver/pkg/apis/audit"
|
auditinternal "k8s.io/apiserver/pkg/apis/audit"
|
||||||
"k8s.io/apiserver/pkg/apis/audit/install"
|
"k8s.io/apiserver/pkg/apis/audit/install"
|
||||||
auditv1alpha1 "k8s.io/apiserver/pkg/apis/audit/v1alpha1"
|
auditv1alpha1 "k8s.io/apiserver/pkg/apis/audit/v1alpha1"
|
||||||
@ -35,91 +31,17 @@ import (
|
|||||||
"k8s.io/apiserver/pkg/audit"
|
"k8s.io/apiserver/pkg/audit"
|
||||||
"k8s.io/apiserver/pkg/util/webhook"
|
"k8s.io/apiserver/pkg/util/webhook"
|
||||||
"k8s.io/client-go/rest"
|
"k8s.io/client-go/rest"
|
||||||
"k8s.io/client-go/util/flowcontrol"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// ModeBatch indicates that the webhook should buffer audit events
|
// PluginName is the name of this plugin, to be used in help and logs.
|
||||||
// internally, sending batch updates either once a certain number of
|
PluginName = "webhook"
|
||||||
// events have been received or a certain amount of time has passed.
|
|
||||||
ModeBatch = "batch"
|
// DefaultInitialBackoff is the default amount of time to wait before
|
||||||
// ModeBlocking causes the webhook to block on every attempt to process
|
// retrying sending audit events through a webhook.
|
||||||
// a set of events. This causes requests to the API server to wait for a
|
DefaultInitialBackoff = 10 * time.Second
|
||||||
// round trip to the external audit service before sending a response.
|
|
||||||
ModeBlocking = "blocking"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// AllowedModes is the modes known by this webhook.
|
|
||||||
var AllowedModes = []string{
|
|
||||||
ModeBatch,
|
|
||||||
ModeBlocking,
|
|
||||||
}
|
|
||||||
|
|
||||||
const (
|
|
||||||
// Default configuration values for ModeBatch.
|
|
||||||
defaultBatchBufferSize = 10000 // Buffer up to 10000 events before starting discarding.
|
|
||||||
defaultBatchMaxSize = 400 // Only send up to 400 events at a time.
|
|
||||||
defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute.
|
|
||||||
defaultInitialBackoff = 10 * time.Second // Wait at least 10 seconds before retrying.
|
|
||||||
|
|
||||||
defaultBatchThrottleQPS = 10 // Limit the send rate by 10 QPS.
|
|
||||||
defaultBatchThrottleBurst = 15 // Allow up to 15 QPS burst.
|
|
||||||
)
|
|
||||||
|
|
||||||
// The plugin name reported in error metrics.
|
|
||||||
const pluginName = "webhook"
|
|
||||||
|
|
||||||
// BatchBackendConfig represents batching webhook audit backend configuration.
|
|
||||||
type BatchBackendConfig struct {
|
|
||||||
// BufferSize defines a size of the buffering queue.
|
|
||||||
BufferSize int
|
|
||||||
// MaxBatchSize defines maximum size of a batch.
|
|
||||||
MaxBatchSize int
|
|
||||||
// MaxBatchWait defines maximum amount of time to wait for MaxBatchSize
|
|
||||||
// events to be accumulated in the buffer before forcibly sending what's
|
|
||||||
// being accumulated.
|
|
||||||
MaxBatchWait time.Duration
|
|
||||||
|
|
||||||
// ThrottleQPS defines the allowed rate of batches per second sent to the webhook.
|
|
||||||
ThrottleQPS float32
|
|
||||||
// ThrottleBurst defines the maximum rate of batches per second sent to the webhook in case
|
|
||||||
// the capacity defined by ThrottleQPS was not utilized.
|
|
||||||
ThrottleBurst int
|
|
||||||
|
|
||||||
// InitialBackoff defines the amount of time to wait before retrying the requests
|
|
||||||
// to the webhook for the first time.
|
|
||||||
InitialBackoff time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewDefaultBatchBackendConfig returns new BatchBackendConfig objects populated by default values.
|
|
||||||
func NewDefaultBatchBackendConfig() BatchBackendConfig {
|
|
||||||
return BatchBackendConfig{
|
|
||||||
BufferSize: defaultBatchBufferSize,
|
|
||||||
MaxBatchSize: defaultBatchMaxSize,
|
|
||||||
MaxBatchWait: defaultBatchMaxWait,
|
|
||||||
|
|
||||||
ThrottleQPS: defaultBatchThrottleQPS,
|
|
||||||
ThrottleBurst: defaultBatchThrottleBurst,
|
|
||||||
|
|
||||||
InitialBackoff: defaultInitialBackoff,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewBackend returns an audit backend that sends events over HTTP to an external service.
|
|
||||||
// The mode indicates the caching behavior of the webhook. Either blocking (ModeBlocking)
|
|
||||||
// or buffered with batch POSTs (ModeBatch).
|
|
||||||
func NewBackend(kubeConfigFile string, mode string, groupVersion schema.GroupVersion, config BatchBackendConfig) (audit.Backend, error) {
|
|
||||||
switch mode {
|
|
||||||
case ModeBatch:
|
|
||||||
return newBatchWebhook(kubeConfigFile, groupVersion, config)
|
|
||||||
case ModeBlocking:
|
|
||||||
return newBlockingWebhook(kubeConfigFile, groupVersion)
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("webhook mode %q is not in list of known modes (%s)",
|
|
||||||
mode, strings.Join(AllowedModes, ","))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// NOTE: Copied from other webhook implementations
|
// NOTE: Copied from other webhook implementations
|
||||||
//
|
//
|
||||||
@ -143,267 +65,39 @@ func loadWebhook(configFile string, groupVersion schema.GroupVersion, initialBac
|
|||||||
[]schema.GroupVersion{groupVersion}, initialBackoff)
|
[]schema.GroupVersion{groupVersion}, initialBackoff)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newBlockingWebhook(configFile string, groupVersion schema.GroupVersion) (*blockingBackend, error) {
|
type backend struct {
|
||||||
w, err := loadWebhook(configFile, groupVersion, defaultInitialBackoff)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &blockingBackend{w}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type blockingBackend struct {
|
|
||||||
w *webhook.GenericWebhook
|
w *webhook.GenericWebhook
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *blockingBackend) Run(stopCh <-chan struct{}) error {
|
// NewBackend returns an audit backend that sends events over HTTP to an external service.
|
||||||
|
func NewBackend(kubeConfigFile string, groupVersion schema.GroupVersion) (audit.Backend, error) {
|
||||||
|
w, err := loadWebhook(kubeConfigFile, groupVersion, DefaultInitialBackoff)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &backend{w}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *backend) Run(stopCh <-chan struct{}) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *blockingBackend) Shutdown() {
|
func (b *backend) Shutdown() {
|
||||||
// nothing to do here
|
// nothing to do here
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *blockingBackend) ProcessEvents(ev ...*auditinternal.Event) {
|
func (b *backend) ProcessEvents(ev ...*auditinternal.Event) {
|
||||||
if err := b.processEvents(ev...); err != nil {
|
if err := b.processEvents(ev...); err != nil {
|
||||||
audit.HandlePluginError(pluginName, err, ev...)
|
audit.HandlePluginError(PluginName, err, ev...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *blockingBackend) processEvents(ev ...*auditinternal.Event) error {
|
func (b *backend) processEvents(ev ...*auditinternal.Event) error {
|
||||||
var list auditinternal.EventList
|
var list auditinternal.EventList
|
||||||
for _, e := range ev {
|
for _, e := range ev {
|
||||||
list.Items = append(list.Items, *e)
|
list.Items = append(list.Items, *e)
|
||||||
}
|
}
|
||||||
// NOTE: No exponential backoff because this is the blocking webhook
|
return b.w.WithExponentialBackoff(func() rest.Result {
|
||||||
// mode. Any attempts to retry will block API server requests.
|
|
||||||
return b.w.RestClient.Post().Body(&list).Do().Error()
|
|
||||||
}
|
|
||||||
|
|
||||||
func newBatchWebhook(configFile string, groupVersion schema.GroupVersion, config BatchBackendConfig) (*batchBackend, error) {
|
|
||||||
w, err := loadWebhook(configFile, groupVersion, config.InitialBackoff)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &batchBackend{
|
|
||||||
w: w,
|
|
||||||
buffer: make(chan *auditinternal.Event, config.BufferSize),
|
|
||||||
maxBatchSize: config.MaxBatchSize,
|
|
||||||
maxBatchWait: config.MaxBatchWait,
|
|
||||||
shutdownCh: make(chan struct{}),
|
|
||||||
throttle: flowcontrol.NewTokenBucketRateLimiter(config.ThrottleQPS, config.ThrottleBurst),
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type batchBackend struct {
|
|
||||||
w *webhook.GenericWebhook
|
|
||||||
|
|
||||||
// Channel to buffer events in memory before sending them on the webhook.
|
|
||||||
buffer chan *auditinternal.Event
|
|
||||||
// Maximum number of events that can be sent at once.
|
|
||||||
maxBatchSize int
|
|
||||||
// Amount of time to wait after sending events before force sending another set.
|
|
||||||
//
|
|
||||||
// Receiving maxBatchSize events will always trigger a send, regardless of
|
|
||||||
// if this amount of time has been reached.
|
|
||||||
maxBatchWait time.Duration
|
|
||||||
|
|
||||||
// Channel to signal that the sending routine has stopped and therefore
|
|
||||||
// it's safe to assume that no new requests will be initiated.
|
|
||||||
shutdownCh chan struct{}
|
|
||||||
|
|
||||||
// The sending routine locks reqMutex for reading before initiating a new
|
|
||||||
// goroutine to send a request. This goroutine then unlocks reqMutex for
|
|
||||||
// reading when completed. The Shutdown method locks reqMutex for writing
|
|
||||||
// after the sending routine has exited. When reqMutex is locked for writing,
|
|
||||||
// all requests have been completed and no new will be spawned, since the
|
|
||||||
// sending routine is not running anymore.
|
|
||||||
reqMutex sync.RWMutex
|
|
||||||
|
|
||||||
// Limits the number of requests sent to the backend per second.
|
|
||||||
throttle flowcontrol.RateLimiter
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *batchBackend) Run(stopCh <-chan struct{}) error {
|
|
||||||
go func() {
|
|
||||||
// Signal that the sending routine has exited.
|
|
||||||
defer close(b.shutdownCh)
|
|
||||||
|
|
||||||
b.runSendingRoutine(stopCh)
|
|
||||||
|
|
||||||
// Handle the events that were received after the last buffer
|
|
||||||
// scraping and before this line. Since the buffer is closed, no new
|
|
||||||
// events will come through.
|
|
||||||
for {
|
|
||||||
if last := func() bool {
|
|
||||||
// Recover from any panic in order to try to send all remaining events.
|
|
||||||
// Note, that in case of a panic, the return value will be false and
|
|
||||||
// the loop execution will continue.
|
|
||||||
defer runtime.HandleCrash()
|
|
||||||
|
|
||||||
events := b.collectLastEvents()
|
|
||||||
b.sendBatchEvents(events)
|
|
||||||
return len(events) == 0
|
|
||||||
}(); last {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *batchBackend) Shutdown() {
|
|
||||||
<-b.shutdownCh
|
|
||||||
|
|
||||||
// Write locking reqMutex will guarantee that all requests will be completed
|
|
||||||
// by the time the goroutine continues the execution. Since this line is
|
|
||||||
// executed after shutdownCh was closed, no new requests will follow this
|
|
||||||
// lock, because read lock is called in the same goroutine that closes
|
|
||||||
// shutdownCh before exiting.
|
|
||||||
b.reqMutex.Lock()
|
|
||||||
b.reqMutex.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// runSendingRoutine runs a loop that collects events from the buffer. When
|
|
||||||
// stopCh is closed, runSendingRoutine stops and closes the buffer.
|
|
||||||
func (b *batchBackend) runSendingRoutine(stopCh <-chan struct{}) {
|
|
||||||
defer close(b.buffer)
|
|
||||||
|
|
||||||
for {
|
|
||||||
func() {
|
|
||||||
// Recover from any panics caused by this function so a panic in the
|
|
||||||
// goroutine can't bring down the main routine.
|
|
||||||
defer runtime.HandleCrash()
|
|
||||||
|
|
||||||
t := time.NewTimer(b.maxBatchWait)
|
|
||||||
defer t.Stop() // Release ticker resources
|
|
||||||
|
|
||||||
b.sendBatchEvents(b.collectEvents(stopCh, t.C))
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-stopCh:
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// collectEvents attempts to collect some number of events in a batch.
|
|
||||||
//
|
|
||||||
// The following things can cause collectEvents to stop and return the list
|
|
||||||
// of events:
|
|
||||||
//
|
|
||||||
// * Some maximum number of events are received.
|
|
||||||
// * Timer has passed, all queued events are sent.
|
|
||||||
// * StopCh is closed, all queued events are sent.
|
|
||||||
//
|
|
||||||
func (b *batchBackend) collectEvents(stopCh <-chan struct{}, timer <-chan time.Time) []auditinternal.Event {
|
|
||||||
var events []auditinternal.Event
|
|
||||||
|
|
||||||
L:
|
|
||||||
for i := 0; i < b.maxBatchSize; i++ {
|
|
||||||
select {
|
|
||||||
case ev, ok := <-b.buffer:
|
|
||||||
// Buffer channel was closed and no new events will follow.
|
|
||||||
if !ok {
|
|
||||||
break L
|
|
||||||
}
|
|
||||||
events = append(events, *ev)
|
|
||||||
case <-timer:
|
|
||||||
// Timer has expired. Send whatever events are in the queue.
|
|
||||||
break L
|
|
||||||
case <-stopCh:
|
|
||||||
// Webhook has shut down. Send the last events.
|
|
||||||
break L
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return events
|
|
||||||
}
|
|
||||||
|
|
||||||
// collectLastEvents assumes that the buffer was closed. It collects the first
|
|
||||||
// maxBatchSize events from the closed buffer into a batch and returns them.
|
|
||||||
func (b *batchBackend) collectLastEvents() []auditinternal.Event {
|
|
||||||
var events []auditinternal.Event
|
|
||||||
|
|
||||||
for i := 0; i < b.maxBatchSize; i++ {
|
|
||||||
ev, ok := <-b.buffer
|
|
||||||
if !ok {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
events = append(events, *ev)
|
|
||||||
}
|
|
||||||
|
|
||||||
return events
|
|
||||||
}
|
|
||||||
|
|
||||||
// sendBatchEvents sends a POST requests with the event list in a goroutine
|
|
||||||
// and logs any error encountered.
|
|
||||||
func (b *batchBackend) sendBatchEvents(events []auditinternal.Event) {
|
|
||||||
if len(events) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
list := auditinternal.EventList{Items: events}
|
|
||||||
|
|
||||||
if b.throttle != nil {
|
|
||||||
b.throttle.Accept()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Locking reqMutex for read will guarantee that the shutdown process will
|
|
||||||
// block until the goroutine started below is finished. At the same time, it
|
|
||||||
// will not prevent other batches from being proceed further this point.
|
|
||||||
b.reqMutex.RLock()
|
|
||||||
go func() {
|
|
||||||
// Execute the webhook POST in a goroutine to keep it from blocking.
|
|
||||||
// This lets the webhook continue to drain the queue immediately.
|
|
||||||
|
|
||||||
defer b.reqMutex.RUnlock()
|
|
||||||
defer runtime.HandleCrash()
|
|
||||||
|
|
||||||
err := b.w.WithExponentialBackoff(func() rest.Result {
|
|
||||||
return b.w.RestClient.Post().Body(&list).Do()
|
return b.w.RestClient.Post().Body(&list).Do()
|
||||||
}).Error()
|
}).Error()
|
||||||
if err != nil {
|
|
||||||
impacted := make([]*auditinternal.Event, len(events))
|
|
||||||
for i := range events {
|
|
||||||
impacted[i] = &events[i]
|
|
||||||
}
|
|
||||||
audit.HandlePluginError(pluginName, err, impacted...)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *batchBackend) ProcessEvents(ev ...*auditinternal.Event) {
|
|
||||||
for i, e := range ev {
|
|
||||||
// Per the audit.Backend interface these events are reused after being
|
|
||||||
// sent to the Sink. Deep copy and send the copy to the queue.
|
|
||||||
event := e.DeepCopy()
|
|
||||||
|
|
||||||
// The following mechanism is in place to support the situation when audit
|
|
||||||
// events are still coming after the backend was shut down.
|
|
||||||
var sendErr error
|
|
||||||
func() {
|
|
||||||
// If the backend was shut down and the buffer channel was closed, an
|
|
||||||
// attempt to add an event to it will result in panic that we should
|
|
||||||
// recover from.
|
|
||||||
defer func() {
|
|
||||||
if err := recover(); err != nil {
|
|
||||||
sendErr = errors.New("audit webhook shut down")
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case b.buffer <- event:
|
|
||||||
default:
|
|
||||||
sendErr = errors.New("audit webhook queue blocked")
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
if sendErr != nil {
|
|
||||||
audit.HandlePluginError(pluginName, sendErr, ev[i:]...)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -25,10 +25,7 @@ import (
|
|||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
@ -91,15 +88,7 @@ func (t *testWebhookHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTestBlockingWebhook(t *testing.T, endpoint string, groupVersion schema.GroupVersion) *blockingBackend {
|
func newWebhook(t *testing.T, endpoint string, groupVersion schema.GroupVersion) *backend {
|
||||||
return newWebhook(t, endpoint, ModeBlocking, groupVersion).(*blockingBackend)
|
|
||||||
}
|
|
||||||
|
|
||||||
func newTestBatchWebhook(t *testing.T, endpoint string, groupVersion schema.GroupVersion) *batchBackend {
|
|
||||||
return newWebhook(t, endpoint, ModeBatch, groupVersion).(*batchBackend)
|
|
||||||
}
|
|
||||||
|
|
||||||
func newWebhook(t *testing.T, endpoint string, mode string, groupVersion schema.GroupVersion) audit.Backend {
|
|
||||||
config := v1.Config{
|
config := v1.Config{
|
||||||
Clusters: []v1.NamedCluster{
|
Clusters: []v1.NamedCluster{
|
||||||
{Cluster: v1.Cluster{Server: endpoint, InsecureSkipTLSVerify: true}},
|
{Cluster: v1.Cluster{Server: endpoint, InsecureSkipTLSVerify: true}},
|
||||||
@ -116,10 +105,10 @@ func newWebhook(t *testing.T, endpoint string, mode string, groupVersion schema.
|
|||||||
// NOTE(ericchiang): Do we need to use a proper serializer?
|
// NOTE(ericchiang): Do we need to use a proper serializer?
|
||||||
require.NoError(t, stdjson.NewEncoder(f).Encode(config), "writing kubeconfig")
|
require.NoError(t, stdjson.NewEncoder(f).Encode(config), "writing kubeconfig")
|
||||||
|
|
||||||
backend, err := NewBackend(f.Name(), mode, groupVersion, NewDefaultBatchBackendConfig())
|
b, err := NewBackend(f.Name(), groupVersion)
|
||||||
require.NoError(t, err, "initializing backend")
|
require.NoError(t, err, "initializing backend")
|
||||||
|
|
||||||
return backend
|
return b.(*backend)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWebhook(t *testing.T) {
|
func TestWebhook(t *testing.T) {
|
||||||
@ -131,275 +120,9 @@ func TestWebhook(t *testing.T) {
|
|||||||
}))
|
}))
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
backend := newTestBlockingWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion)
|
backend := newWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion)
|
||||||
|
|
||||||
// Ensure this doesn't return a serialization error.
|
// Ensure this doesn't return a serialization error.
|
||||||
event := &auditinternal.Event{}
|
event := &auditinternal.Event{}
|
||||||
require.NoError(t, backend.processEvents(event), "failed to send events")
|
require.NoError(t, backend.processEvents(event), "failed to send events")
|
||||||
}
|
}
|
||||||
|
|
||||||
// waitForEmptyBuffer indicates when the sendBatchEvents method has read from the
|
|
||||||
// existing buffer. This lets test coordinate closing a timer and stop channel
|
|
||||||
// until the for loop has read from the buffer.
|
|
||||||
func waitForEmptyBuffer(b *batchBackend) {
|
|
||||||
for len(b.buffer) != 0 {
|
|
||||||
time.Sleep(time.Millisecond)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBatchWebhookMaxEvents(t *testing.T) {
|
|
||||||
nRest := 10
|
|
||||||
events := make([]*auditinternal.Event, defaultBatchMaxSize+nRest) // greater than max size.
|
|
||||||
for i := range events {
|
|
||||||
events[i] = &auditinternal.Event{}
|
|
||||||
}
|
|
||||||
|
|
||||||
got := make(chan int, 2)
|
|
||||||
s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) {
|
|
||||||
got <- len(events.(*auditv1beta1.EventList).Items)
|
|
||||||
}))
|
|
||||||
defer s.Close()
|
|
||||||
|
|
||||||
backend := newTestBatchWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion)
|
|
||||||
|
|
||||||
backend.ProcessEvents(events...)
|
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
|
||||||
timer := make(chan time.Time, 1)
|
|
||||||
|
|
||||||
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
|
|
||||||
require.Equal(t, defaultBatchMaxSize, <-got, "did not get batch max size")
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
waitForEmptyBuffer(backend) // wait for the buffer to empty
|
|
||||||
timer <- time.Now() // Trigger the wait timeout
|
|
||||||
}()
|
|
||||||
|
|
||||||
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
|
|
||||||
require.Equal(t, nRest, <-got, "failed to get the rest of the events")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBatchWebhookStopCh(t *testing.T) {
|
|
||||||
events := make([]*auditinternal.Event, 1) // less than max size.
|
|
||||||
for i := range events {
|
|
||||||
events[i] = &auditinternal.Event{}
|
|
||||||
}
|
|
||||||
|
|
||||||
expected := len(events)
|
|
||||||
got := make(chan int, 2)
|
|
||||||
s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) {
|
|
||||||
got <- len(events.(*auditv1beta1.EventList).Items)
|
|
||||||
}))
|
|
||||||
defer s.Close()
|
|
||||||
|
|
||||||
backend := newTestBatchWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion)
|
|
||||||
backend.ProcessEvents(events...)
|
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
|
||||||
timer := make(chan time.Time)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
waitForEmptyBuffer(backend)
|
|
||||||
close(stopCh) // stop channel has stopped
|
|
||||||
}()
|
|
||||||
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
|
|
||||||
require.Equal(t, expected, <-got, "get queued events after timer expires")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBatchWebhookProcessEventsAfterStop(t *testing.T) {
|
|
||||||
events := make([]*auditinternal.Event, 1) // less than max size.
|
|
||||||
for i := range events {
|
|
||||||
events[i] = &auditinternal.Event{}
|
|
||||||
}
|
|
||||||
|
|
||||||
got := make(chan struct{})
|
|
||||||
s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) {
|
|
||||||
close(got)
|
|
||||||
}))
|
|
||||||
defer s.Close()
|
|
||||||
|
|
||||||
backend := newTestBatchWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion)
|
|
||||||
stopCh := make(chan struct{})
|
|
||||||
|
|
||||||
backend.Run(stopCh)
|
|
||||||
close(stopCh)
|
|
||||||
<-backend.shutdownCh
|
|
||||||
backend.ProcessEvents(events...)
|
|
||||||
assert.Equal(t, 0, len(backend.buffer), "processed events after the backed has been stopped")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBatchWebhookShutdown(t *testing.T) {
|
|
||||||
events := make([]*auditinternal.Event, 1)
|
|
||||||
for i := range events {
|
|
||||||
events[i] = &auditinternal.Event{}
|
|
||||||
}
|
|
||||||
|
|
||||||
got := make(chan struct{})
|
|
||||||
contReqCh := make(chan struct{})
|
|
||||||
shutdownCh := make(chan struct{})
|
|
||||||
s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) {
|
|
||||||
close(got)
|
|
||||||
<-contReqCh
|
|
||||||
}))
|
|
||||||
defer s.Close()
|
|
||||||
|
|
||||||
backend := newTestBatchWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion)
|
|
||||||
backend.ProcessEvents(events...)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
// Assume stopCh was closed.
|
|
||||||
close(backend.buffer)
|
|
||||||
backend.sendBatchEvents(backend.collectLastEvents())
|
|
||||||
}()
|
|
||||||
|
|
||||||
<-got
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
close(backend.shutdownCh)
|
|
||||||
backend.Shutdown()
|
|
||||||
close(shutdownCh)
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Wait for some time in case there's a bug that allows for the Shutdown
|
|
||||||
// method to exit before all requests has been completed.
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
select {
|
|
||||||
case <-shutdownCh:
|
|
||||||
t.Fatal("Backend shut down before all requests finished")
|
|
||||||
default:
|
|
||||||
// Continue.
|
|
||||||
}
|
|
||||||
|
|
||||||
close(contReqCh)
|
|
||||||
<-shutdownCh
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBatchWebhookEmptyBuffer(t *testing.T) {
|
|
||||||
events := make([]*auditinternal.Event, 1) // less than max size.
|
|
||||||
for i := range events {
|
|
||||||
events[i] = &auditinternal.Event{}
|
|
||||||
}
|
|
||||||
|
|
||||||
expected := len(events)
|
|
||||||
got := make(chan int, 2)
|
|
||||||
s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) {
|
|
||||||
got <- len(events.(*auditv1beta1.EventList).Items)
|
|
||||||
}))
|
|
||||||
defer s.Close()
|
|
||||||
|
|
||||||
backend := newTestBatchWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion)
|
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
|
||||||
timer := make(chan time.Time, 1)
|
|
||||||
|
|
||||||
timer <- time.Now() // Timer is done.
|
|
||||||
|
|
||||||
// Buffer is empty, no events have been queued. This should exit but send no events.
|
|
||||||
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
|
|
||||||
|
|
||||||
// Send additional events after the sendBatchEvents has been called.
|
|
||||||
backend.ProcessEvents(events...)
|
|
||||||
go func() {
|
|
||||||
waitForEmptyBuffer(backend)
|
|
||||||
timer <- time.Now()
|
|
||||||
}()
|
|
||||||
|
|
||||||
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
|
|
||||||
|
|
||||||
// Make sure we didn't get a POST with zero events.
|
|
||||||
require.Equal(t, expected, <-got, "expected one event")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBatchBufferFull(t *testing.T) {
|
|
||||||
events := make([]*auditinternal.Event, defaultBatchBufferSize+1) // More than buffered size
|
|
||||||
for i := range events {
|
|
||||||
events[i] = &auditinternal.Event{}
|
|
||||||
}
|
|
||||||
s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) {
|
|
||||||
// Do nothing.
|
|
||||||
}))
|
|
||||||
defer s.Close()
|
|
||||||
|
|
||||||
backend := newTestBatchWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion)
|
|
||||||
|
|
||||||
// Make sure this doesn't block.
|
|
||||||
backend.ProcessEvents(events...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBatchRun(t *testing.T) {
|
|
||||||
|
|
||||||
// Divisable by max batch size so we don't have to wait for a minute for
|
|
||||||
// the test to finish.
|
|
||||||
events := make([]*auditinternal.Event, defaultBatchMaxSize*3)
|
|
||||||
for i := range events {
|
|
||||||
events[i] = &auditinternal.Event{}
|
|
||||||
}
|
|
||||||
|
|
||||||
got := new(int64)
|
|
||||||
want := len(events)
|
|
||||||
|
|
||||||
wg := new(sync.WaitGroup)
|
|
||||||
wg.Add(want)
|
|
||||||
done := make(chan struct{})
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
wg.Wait()
|
|
||||||
// When the expected number of events have been received, close the channel.
|
|
||||||
close(done)
|
|
||||||
}()
|
|
||||||
|
|
||||||
s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(obj runtime.Object) {
|
|
||||||
events := obj.(*auditv1beta1.EventList)
|
|
||||||
atomic.AddInt64(got, int64(len(events.Items)))
|
|
||||||
wg.Add(-len(events.Items))
|
|
||||||
}))
|
|
||||||
defer s.Close()
|
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
|
||||||
defer close(stopCh)
|
|
||||||
|
|
||||||
backend := newTestBatchWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion)
|
|
||||||
|
|
||||||
// Test the Run codepath. E.g. that the spawned goroutines behave correctly.
|
|
||||||
backend.Run(stopCh)
|
|
||||||
|
|
||||||
backend.ProcessEvents(events...)
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
// Received all the events.
|
|
||||||
case <-time.After(2 * time.Minute):
|
|
||||||
t.Errorf("expected %d events got %d", want, atomic.LoadInt64(got))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBatchConcurrentRequests(t *testing.T) {
|
|
||||||
events := make([]*auditinternal.Event, defaultBatchBufferSize) // Don't drop events
|
|
||||||
for i := range events {
|
|
||||||
events[i] = &auditinternal.Event{}
|
|
||||||
}
|
|
||||||
|
|
||||||
wg := new(sync.WaitGroup)
|
|
||||||
wg.Add(len(events))
|
|
||||||
|
|
||||||
s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) {
|
|
||||||
wg.Add(-len(events.(*auditv1beta1.EventList).Items))
|
|
||||||
|
|
||||||
// Since the webhook makes concurrent requests, blocking on the webhook response
|
|
||||||
// shouldn't block the webhook from sending more events.
|
|
||||||
//
|
|
||||||
// Wait for all responses to be received before sending the response.
|
|
||||||
wg.Wait()
|
|
||||||
}))
|
|
||||||
defer s.Close()
|
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
|
||||||
defer close(stopCh)
|
|
||||||
|
|
||||||
backend := newTestBatchWebhook(t, s.URL, auditv1beta1.SchemeGroupVersion)
|
|
||||||
backend.Run(stopCh)
|
|
||||||
|
|
||||||
backend.ProcessEvents(events...)
|
|
||||||
// Wait for the webhook to receive all events.
|
|
||||||
wg.Wait()
|
|
||||||
}
|
|
||||||
|
@ -1,289 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2017 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package webhook
|
|
||||||
|
|
||||||
import (
|
|
||||||
"net/http/httptest"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
|
||||||
auditinternal "k8s.io/apiserver/pkg/apis/audit"
|
|
||||||
auditv1alpha1 "k8s.io/apiserver/pkg/apis/audit/v1alpha1"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestBatchWebhookMaxEventsV1Alpha1(t *testing.T) {
|
|
||||||
nRest := 10
|
|
||||||
events := make([]*auditinternal.Event, defaultBatchMaxSize+nRest) // greater than max size.
|
|
||||||
for i := range events {
|
|
||||||
events[i] = &auditinternal.Event{}
|
|
||||||
}
|
|
||||||
|
|
||||||
got := make(chan int, 2)
|
|
||||||
s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) {
|
|
||||||
got <- len(events.(*auditv1alpha1.EventList).Items)
|
|
||||||
}))
|
|
||||||
defer s.Close()
|
|
||||||
|
|
||||||
backend := newTestBatchWebhook(t, s.URL, auditv1alpha1.SchemeGroupVersion)
|
|
||||||
|
|
||||||
backend.ProcessEvents(events...)
|
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
|
||||||
timer := make(chan time.Time, 1)
|
|
||||||
|
|
||||||
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
|
|
||||||
require.Equal(t, defaultBatchMaxSize, <-got, "did not get batch max size")
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
waitForEmptyBuffer(backend) // wait for the buffer to empty
|
|
||||||
timer <- time.Now() // Trigger the wait timeout
|
|
||||||
}()
|
|
||||||
|
|
||||||
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
|
|
||||||
require.Equal(t, nRest, <-got, "failed to get the rest of the events")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBatchWebhookStopChV1Alpha1(t *testing.T) {
|
|
||||||
events := make([]*auditinternal.Event, 1) // less than max size.
|
|
||||||
for i := range events {
|
|
||||||
events[i] = &auditinternal.Event{}
|
|
||||||
}
|
|
||||||
|
|
||||||
expected := len(events)
|
|
||||||
got := make(chan int, 2)
|
|
||||||
s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) {
|
|
||||||
got <- len(events.(*auditv1alpha1.EventList).Items)
|
|
||||||
}))
|
|
||||||
defer s.Close()
|
|
||||||
|
|
||||||
backend := newTestBatchWebhook(t, s.URL, auditv1alpha1.SchemeGroupVersion)
|
|
||||||
backend.ProcessEvents(events...)
|
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
|
||||||
timer := make(chan time.Time)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
waitForEmptyBuffer(backend)
|
|
||||||
close(stopCh) // stop channel has stopped
|
|
||||||
}()
|
|
||||||
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
|
|
||||||
require.Equal(t, expected, <-got, "get queued events after timer expires")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBatchWebhookProcessEventsAfterStopV1Alpha1(t *testing.T) {
|
|
||||||
events := make([]*auditinternal.Event, 1) // less than max size.
|
|
||||||
for i := range events {
|
|
||||||
events[i] = &auditinternal.Event{}
|
|
||||||
}
|
|
||||||
|
|
||||||
got := make(chan struct{})
|
|
||||||
s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) {
|
|
||||||
close(got)
|
|
||||||
}))
|
|
||||||
defer s.Close()
|
|
||||||
|
|
||||||
backend := newTestBatchWebhook(t, s.URL, auditv1alpha1.SchemeGroupVersion)
|
|
||||||
stopCh := make(chan struct{})
|
|
||||||
|
|
||||||
backend.Run(stopCh)
|
|
||||||
close(stopCh)
|
|
||||||
<-backend.shutdownCh
|
|
||||||
backend.ProcessEvents(events...)
|
|
||||||
assert.Equal(t, 0, len(backend.buffer), "processed events after the backed has been stopped")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBatchWebhookShutdownV1Alpha1(t *testing.T) {
|
|
||||||
events := make([]*auditinternal.Event, 1)
|
|
||||||
for i := range events {
|
|
||||||
events[i] = &auditinternal.Event{}
|
|
||||||
}
|
|
||||||
|
|
||||||
got := make(chan struct{})
|
|
||||||
contReqCh := make(chan struct{})
|
|
||||||
shutdownCh := make(chan struct{})
|
|
||||||
s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) {
|
|
||||||
close(got)
|
|
||||||
<-contReqCh
|
|
||||||
}))
|
|
||||||
defer s.Close()
|
|
||||||
|
|
||||||
backend := newTestBatchWebhook(t, s.URL, auditv1alpha1.SchemeGroupVersion)
|
|
||||||
backend.ProcessEvents(events...)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
// Assume stopCh was closed.
|
|
||||||
close(backend.buffer)
|
|
||||||
backend.sendBatchEvents(backend.collectLastEvents())
|
|
||||||
}()
|
|
||||||
|
|
||||||
<-got
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
close(backend.shutdownCh)
|
|
||||||
backend.Shutdown()
|
|
||||||
close(shutdownCh)
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Wait for some time in case there's a bug that allows for the Shutdown
|
|
||||||
// method to exit before all requests has been completed.
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
select {
|
|
||||||
case <-shutdownCh:
|
|
||||||
t.Fatal("Backend shut down before all requests finished")
|
|
||||||
default:
|
|
||||||
// Continue.
|
|
||||||
}
|
|
||||||
|
|
||||||
close(contReqCh)
|
|
||||||
<-shutdownCh
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBatchWebhookEmptyBufferV1Alpha1(t *testing.T) {
|
|
||||||
events := make([]*auditinternal.Event, 1) // less than max size.
|
|
||||||
for i := range events {
|
|
||||||
events[i] = &auditinternal.Event{}
|
|
||||||
}
|
|
||||||
|
|
||||||
expected := len(events)
|
|
||||||
got := make(chan int, 2)
|
|
||||||
s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) {
|
|
||||||
got <- len(events.(*auditv1alpha1.EventList).Items)
|
|
||||||
}))
|
|
||||||
defer s.Close()
|
|
||||||
|
|
||||||
backend := newTestBatchWebhook(t, s.URL, auditv1alpha1.SchemeGroupVersion)
|
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
|
||||||
timer := make(chan time.Time, 1)
|
|
||||||
|
|
||||||
timer <- time.Now() // Timer is done.
|
|
||||||
|
|
||||||
// Buffer is empty, no events have been queued. This should exit but send no events.
|
|
||||||
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
|
|
||||||
|
|
||||||
// Send additional events after the sendBatchEvents has been called.
|
|
||||||
backend.ProcessEvents(events...)
|
|
||||||
go func() {
|
|
||||||
waitForEmptyBuffer(backend)
|
|
||||||
timer <- time.Now()
|
|
||||||
}()
|
|
||||||
|
|
||||||
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
|
|
||||||
|
|
||||||
// Make sure we didn't get a POST with zero events.
|
|
||||||
require.Equal(t, expected, <-got, "expected one event")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBatchBufferFullV1Alpha1(t *testing.T) {
|
|
||||||
events := make([]*auditinternal.Event, defaultBatchBufferSize+1) // More than buffered size
|
|
||||||
for i := range events {
|
|
||||||
events[i] = &auditinternal.Event{}
|
|
||||||
}
|
|
||||||
s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) {
|
|
||||||
// Do nothing.
|
|
||||||
}))
|
|
||||||
defer s.Close()
|
|
||||||
|
|
||||||
backend := newTestBatchWebhook(t, s.URL, auditv1alpha1.SchemeGroupVersion)
|
|
||||||
|
|
||||||
// Make sure this doesn't block.
|
|
||||||
backend.ProcessEvents(events...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBatchRunV1Alpha1(t *testing.T) {
|
|
||||||
|
|
||||||
// Divisable by max batch size so we don't have to wait for a minute for
|
|
||||||
// the test to finish.
|
|
||||||
events := make([]*auditinternal.Event, defaultBatchMaxSize*3)
|
|
||||||
for i := range events {
|
|
||||||
events[i] = &auditinternal.Event{}
|
|
||||||
}
|
|
||||||
|
|
||||||
got := new(int64)
|
|
||||||
want := len(events)
|
|
||||||
|
|
||||||
wg := new(sync.WaitGroup)
|
|
||||||
wg.Add(want)
|
|
||||||
done := make(chan struct{})
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
wg.Wait()
|
|
||||||
// When the expected number of events have been received, close the channel.
|
|
||||||
close(done)
|
|
||||||
}()
|
|
||||||
|
|
||||||
s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(obj runtime.Object) {
|
|
||||||
events := obj.(*auditv1alpha1.EventList)
|
|
||||||
atomic.AddInt64(got, int64(len(events.Items)))
|
|
||||||
wg.Add(-len(events.Items))
|
|
||||||
}))
|
|
||||||
defer s.Close()
|
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
|
||||||
defer close(stopCh)
|
|
||||||
|
|
||||||
backend := newTestBatchWebhook(t, s.URL, auditv1alpha1.SchemeGroupVersion)
|
|
||||||
|
|
||||||
// Test the Run codepath. E.g. that the spawned goroutines behave correctly.
|
|
||||||
backend.Run(stopCh)
|
|
||||||
|
|
||||||
backend.ProcessEvents(events...)
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
// Received all the events.
|
|
||||||
case <-time.After(2 * time.Minute):
|
|
||||||
t.Errorf("expected %d events got %d", want, atomic.LoadInt64(got))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBatchConcurrentRequestsV1Alpha1(t *testing.T) {
|
|
||||||
events := make([]*auditinternal.Event, defaultBatchBufferSize) // Don't drop events
|
|
||||||
for i := range events {
|
|
||||||
events[i] = &auditinternal.Event{}
|
|
||||||
}
|
|
||||||
|
|
||||||
wg := new(sync.WaitGroup)
|
|
||||||
wg.Add(len(events))
|
|
||||||
|
|
||||||
s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) {
|
|
||||||
wg.Add(-len(events.(*auditv1alpha1.EventList).Items))
|
|
||||||
|
|
||||||
// Since the webhook makes concurrent requests, blocking on the webhook response
|
|
||||||
// shouldn't block the webhook from sending more events.
|
|
||||||
//
|
|
||||||
// Wait for all responses to be received before sending the response.
|
|
||||||
wg.Wait()
|
|
||||||
}))
|
|
||||||
defer s.Close()
|
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
|
||||||
defer close(stopCh)
|
|
||||||
|
|
||||||
backend := newTestBatchWebhook(t, s.URL, auditv1alpha1.SchemeGroupVersion)
|
|
||||||
backend.Run(stopCh)
|
|
||||||
|
|
||||||
backend.ProcessEvents(events...)
|
|
||||||
// Wait for the webhook to receive all events.
|
|
||||||
wg.Wait()
|
|
||||||
}
|
|
@ -1142,6 +1142,10 @@
|
|||||||
"ImportPath": "k8s.io/apiserver/pkg/util/wsstream",
|
"ImportPath": "k8s.io/apiserver/pkg/util/wsstream",
|
||||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"ImportPath": "k8s.io/apiserver/plugin/pkg/audit/buffered",
|
||||||
|
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/apiserver/plugin/pkg/audit/log",
|
"ImportPath": "k8s.io/apiserver/plugin/pkg/audit/log",
|
||||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||||
|
@ -1110,6 +1110,10 @@
|
|||||||
"ImportPath": "k8s.io/apiserver/pkg/util/wsstream",
|
"ImportPath": "k8s.io/apiserver/pkg/util/wsstream",
|
||||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"ImportPath": "k8s.io/apiserver/plugin/pkg/audit/buffered",
|
||||||
|
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/apiserver/plugin/pkg/audit/log",
|
"ImportPath": "k8s.io/apiserver/plugin/pkg/audit/log",
|
||||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||||
|
Loading…
Reference in New Issue
Block a user