diff --git a/cmd/kube-apiserver/app/options/BUILD b/cmd/kube-apiserver/app/options/BUILD index 13fb95b6a0b..fa9ba342389 100644 --- a/cmd/kube-apiserver/app/options/BUILD +++ b/cmd/kube-apiserver/app/options/BUILD @@ -60,7 +60,6 @@ go_test( "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered:go_default_library", - "//staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic:go_default_library", "//staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/component-base/cli/flag:go_default_library", diff --git a/cmd/kube-apiserver/app/options/options_test.go b/cmd/kube-apiserver/app/options/options_test.go index ad4a11a4cbc..17f8949c9b4 100644 --- a/cmd/kube-apiserver/app/options/options_test.go +++ b/cmd/kube-apiserver/app/options/options_test.go @@ -30,7 +30,6 @@ import ( apiserveroptions "k8s.io/apiserver/pkg/server/options" "k8s.io/apiserver/pkg/storage/storagebackend" auditbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered" - auditdynamic "k8s.io/apiserver/plugin/pkg/audit/dynamic" audittruncate "k8s.io/apiserver/plugin/pkg/audit/truncate" restclient "k8s.io/client-go/rest" cliflag "k8s.io/component-base/cli/flag" @@ -252,9 +251,6 @@ func TestAddFlags(t *testing.T) { InitialBackoff: 2 * time.Second, GroupVersionString: "audit.k8s.io/v1alpha1", }, - DynamicOptions: apiserveroptions.AuditDynamicOptions{ - BatchConfig: auditdynamic.NewDefaultWebhookBatchConfig(), - }, PolicyFile: "/policy", }, Features: &apiserveroptions.FeatureOptions{ diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 2660130f1d0..92163f46eae 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -505,29 +505,17 @@ func buildGenericConfig( genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName) } + lastErr = s.Audit.ApplyTo(genericConfig) + if lastErr != nil { + return + } + admissionConfig := &kubeapiserveradmission.Config{ ExternalInformers: versionedInformers, LoopbackClientConfig: genericConfig.LoopbackClientConfig, CloudConfigFile: s.CloudProvider.CloudConfigFile, } serviceResolver = buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers) - - authInfoResolverWrapper := webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, genericConfig.EgressSelector, genericConfig.LoopbackClientConfig) - - lastErr = s.Audit.ApplyTo( - genericConfig, - genericConfig.LoopbackClientConfig, - versionedInformers, - serveroptions.NewProcessInfo("kube-apiserver", "kube-system"), - &serveroptions.WebhookOptions{ - AuthInfoResolverWrapper: authInfoResolverWrapper, - ServiceResolver: serviceResolver, - }, - ) - if lastErr != nil { - return - } - pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, genericConfig.EgressSelector, serviceResolver) if err != nil { lastErr = fmt.Errorf("failed to create admission plugin initializer: %v", err) diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 4a8f6a5d05d..d285ed2f1ab 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -669,7 +669,6 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS genericfeatures.StreamingProxyRedirects: {Default: true, PreRelease: featuregate.Deprecated}, genericfeatures.ValidateProxyRedirects: {Default: true, PreRelease: featuregate.Beta}, genericfeatures.AdvancedAuditing: {Default: true, PreRelease: featuregate.GA}, - genericfeatures.DynamicAuditing: {Default: false, PreRelease: featuregate.Alpha}, genericfeatures.APIResponseCompression: {Default: true, PreRelease: featuregate.Beta}, genericfeatures.APIListChunking: {Default: true, PreRelease: featuregate.Beta}, genericfeatures.DryRun: {Default: true, PreRelease: featuregate.GA}, diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/options/options.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/options/options.go index 786dc60ab53..895a8ebfe6e 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/options/options.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/options/options.go @@ -24,7 +24,7 @@ import ( "github.com/spf13/pflag" - "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" "k8s.io/apiextensions-apiserver/pkg/apiserver" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -54,7 +54,6 @@ func NewCustomResourceDefinitionsServerOptions(out, errOut io.Writer) *CustomRes RecommendedOptions: genericoptions.NewRecommendedOptions( defaultEtcdPathPrefix, apiserver.Codecs.LegacyCodec(v1beta1.SchemeGroupVersion, v1.SchemeGroupVersion), - genericoptions.NewProcessInfo("apiextensions-apiserver", "kube-system"), ), APIEnablement: genericoptions.NewAPIEnablementOptions(), diff --git a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go index 463146e1e84..3fb19760b62 100644 --- a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go +++ b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go @@ -59,13 +59,6 @@ const ( // audited. AdvancedAuditing featuregate.Feature = "AdvancedAuditing" - // owner: @pbarker - // alpha: v1.13 - // - // DynamicAuditing enables configuration of audit policy and webhook backends through an - // AuditSink API object. - DynamicAuditing featuregate.Feature = "DynamicAuditing" - // owner: @ilackams // alpha: v1.7 // @@ -163,7 +156,6 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS StreamingProxyRedirects: {Default: true, PreRelease: featuregate.Deprecated}, ValidateProxyRedirects: {Default: true, PreRelease: featuregate.Beta}, AdvancedAuditing: {Default: true, PreRelease: featuregate.GA}, - DynamicAuditing: {Default: false, PreRelease: featuregate.Alpha}, APIResponseCompression: {Default: true, PreRelease: featuregate.Beta}, APIListChunking: {Default: true, PreRelease: featuregate.Beta}, DryRun: {Default: true, PreRelease: featuregate.GA}, diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/BUILD b/staging/src/k8s.io/apiserver/pkg/server/options/BUILD index 0ec3c266159..63df0c13736 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/options/BUILD @@ -14,7 +14,6 @@ go_library( "doc.go", "egress_selector.go", "etcd.go", - "events.go", "feature.go", "recommended.go", "server_run_options.go", @@ -22,13 +21,11 @@ go_library( "serving_unix.go", "serving_windows.go", "serving_with_loopback.go", - "webhook.go", ], importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/server/options", importpath = "k8s.io/apiserver/pkg/server/options", visibility = ["//visibility:public"], deps = [ - "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", @@ -72,16 +69,12 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/util/webhook:go_default_library", "//staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered:go_default_library", - "//staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic:go_default_library", - "//staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced:go_default_library", "//staging/src/k8s.io/apiserver/plugin/pkg/audit/log:go_default_library", "//staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate:go_default_library", "//staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", - "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/tools/clientcmd:go_default_library", "//staging/src/k8s.io/client-go/util/cert:go_default_library", @@ -158,17 +151,12 @@ go_test( "//staging/src/k8s.io/apiserver/pkg/apis/audit/v1:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/authenticatorfactory:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/request/headerrequest:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/features:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/discovery:go_default_library", - "//staging/src/k8s.io/client-go/informers:go_default_library", - "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/tools/clientcmd/api/v1:go_default_library", "//staging/src/k8s.io/component-base/cli/flag:go_default_library", - "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", "//vendor/github.com/spf13/pflag:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/audit.go b/staging/src/k8s.io/apiserver/pkg/server/options/audit.go index 6c8f8be9ab6..06ff8a3efcc 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/audit.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/audit.go @@ -27,7 +27,6 @@ import ( "gopkg.in/natefinch/lumberjack.v2" "k8s.io/klog/v2" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime/schema" utilnet "k8s.io/apimachinery/pkg/util/net" auditinternal "k8s.io/apiserver/pkg/apis/audit" @@ -36,20 +35,12 @@ import ( auditv1beta1 "k8s.io/apiserver/pkg/apis/audit/v1beta1" "k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/audit/policy" - "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/egressselector" - utilfeature "k8s.io/apiserver/pkg/util/feature" pluginbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered" - plugindynamic "k8s.io/apiserver/plugin/pkg/audit/dynamic" - pluginenforced "k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced" pluginlog "k8s.io/apiserver/plugin/pkg/audit/log" plugintruncate "k8s.io/apiserver/plugin/pkg/audit/truncate" pluginwebhook "k8s.io/apiserver/plugin/pkg/audit/webhook" - "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - v1core "k8s.io/client-go/kubernetes/typed/core/v1" - restclient "k8s.io/client-go/rest" ) const ( @@ -80,7 +71,6 @@ type AuditOptions struct { // Plugin options LogOptions AuditLogOptions WebhookOptions AuditWebhookOptions - DynamicOptions AuditDynamicOptions } const ( @@ -180,10 +170,6 @@ func NewAuditOptions() *AuditOptions { TruncateOptions: NewAuditTruncateOptions(), GroupVersionString: "audit.k8s.io/v1", }, - DynamicOptions: AuditDynamicOptions{ - Enabled: false, - BatchConfig: plugindynamic.NewDefaultWebhookBatchConfig(), - }, } } @@ -206,7 +192,6 @@ func (o *AuditOptions) Validate() []error { var allErrors []error allErrors = append(allErrors, o.LogOptions.Validate()...) allErrors = append(allErrors, o.WebhookOptions.Validate()...) - allErrors = append(allErrors, o.DynamicOptions.Validate()...) return allErrors } @@ -286,15 +271,10 @@ func (o *AuditOptions) AddFlags(fs *pflag.FlagSet) { o.WebhookOptions.AddFlags(fs) o.WebhookOptions.BatchOptions.AddFlags(pluginwebhook.PluginName, fs) o.WebhookOptions.TruncateOptions.AddFlags(pluginwebhook.PluginName, fs) - o.DynamicOptions.AddFlags(fs) } func (o *AuditOptions) ApplyTo( c *server.Config, - kubeClientConfig *restclient.Config, - informers informers.SharedInformerFactory, - processInfo *ProcessInfo, - webhookOptions *WebhookOptions, ) error { if o == nil { return nil @@ -347,23 +327,7 @@ func (o *AuditOptions) ApplyTo( // 4. Apply dynamic options. var dynamicBackend audit.Backend - if o.DynamicOptions.enabled() { - // if dynamic is enabled the webhook and log backends need to be wrapped in an enforced backend with the static policy - if webhookBackend != nil { - webhookBackend = pluginenforced.NewBackend(webhookBackend, checker) - } - if logBackend != nil { - logBackend = pluginenforced.NewBackend(logBackend, checker) - } - // build dynamic backend - dynamicBackend, checker, err = o.DynamicOptions.newBackend(c.ExternalAddress, kubeClientConfig, informers, processInfo, webhookOptions) - if err != nil { - return err - } - // union dynamic and webhook backends so that truncate options can be applied to both - dynamicBackend = appendBackend(webhookBackend, dynamicBackend) - dynamicBackend = o.WebhookOptions.TruncateOptions.wrapBackend(dynamicBackend, groupVersion) - } else if webhookBackend != nil { + if webhookBackend != nil { // if only webhook is enabled wrap it in the truncate options dynamicBackend = o.WebhookOptions.TruncateOptions.wrapBackend(webhookBackend, groupVersion) } @@ -610,66 +574,6 @@ func (o *AuditWebhookOptions) newUntruncatedBackend(customDial utilnet.DialFunc) return webhook, nil } -func (o *AuditDynamicOptions) AddFlags(fs *pflag.FlagSet) { - fs.BoolVar(&o.Enabled, "audit-dynamic-configuration", o.Enabled, - "Enables dynamic audit configuration. This feature also requires the DynamicAuditing feature flag") -} - -func (o *AuditDynamicOptions) enabled() bool { - return o.Enabled && utilfeature.DefaultFeatureGate.Enabled(features.DynamicAuditing) -} - -func (o *AuditDynamicOptions) Validate() []error { - var allErrors []error - if o.Enabled && !utilfeature.DefaultFeatureGate.Enabled(features.DynamicAuditing) { - allErrors = append(allErrors, fmt.Errorf("--audit-dynamic-configuration set, but DynamicAuditing feature gate is not enabled")) - } - return allErrors -} - -func (o *AuditDynamicOptions) newBackend( - hostname string, - kubeClientConfig *restclient.Config, - informers informers.SharedInformerFactory, - processInfo *ProcessInfo, - webhookOptions *WebhookOptions, -) (audit.Backend, policy.Checker, error) { - if err := validateProcessInfo(processInfo); err != nil { - return nil, nil, err - } - clientset, err := kubernetes.NewForConfig(kubeClientConfig) - if err != nil { - return nil, nil, err - } - if webhookOptions == nil { - webhookOptions = NewWebhookOptions() - } - checker := policy.NewDynamicChecker() - informer := informers.Auditregistration().V1alpha1().AuditSinks() - eventSink := &v1core.EventSinkImpl{Interface: clientset.CoreV1().Events(processInfo.Namespace)} - - dc := &plugindynamic.Config{ - Informer: informer, - BufferedConfig: o.BatchConfig, - EventConfig: plugindynamic.EventConfig{ - Sink: eventSink, - Source: corev1.EventSource{ - Component: processInfo.Name, - Host: hostname, - }, - }, - WebhookConfig: plugindynamic.WebhookConfig{ - AuthInfoResolverWrapper: webhookOptions.AuthInfoResolverWrapper, - ServiceResolver: webhookOptions.ServiceResolver, - }, - } - backend, err := plugindynamic.NewBackend(dc) - if err != nil { - return nil, nil, fmt.Errorf("could not create dynamic audit backend: %v", err) - } - return backend, checker, nil -} - // defaultWebhookBatchConfig returns the default BatchConfig used by the Webhook backend. func defaultWebhookBatchConfig() pluginbuffered.BatchConfig { return pluginbuffered.BatchConfig{ diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/audit_test.go b/staging/src/k8s.io/apiserver/pkg/server/options/audit_test.go index 55ddb0de54e..601d58df51d 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/audit_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/audit_test.go @@ -23,20 +23,13 @@ import ( "os" "testing" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - auditv1 "k8s.io/apiserver/pkg/apis/audit/v1" - "k8s.io/apiserver/pkg/features" - "k8s.io/apiserver/pkg/server" - utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes/fake" - restclient "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd/api/v1" - featuregatetesting "k8s.io/component-base/featuregate/testing" - "github.com/spf13/pflag" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + auditv1 "k8s.io/apiserver/pkg/apis/audit/v1" + "k8s.io/apiserver/pkg/server" + v1 "k8s.io/client-go/tools/clientcmd/api/v1" ) func TestAuditValidOptions(t *testing.T) { @@ -46,12 +39,6 @@ func TestAuditValidOptions(t *testing.T) { policy := makeTmpPolicy(t) defer os.Remove(policy) - defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicAuditing, true)() - - clientConfig := &restclient.Config{} - informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0) - processInfo := &ProcessInfo{"test", "test"} - testCases := []struct { name string options func() *AuditOptions @@ -135,56 +122,6 @@ func TestAuditValidOptions(t *testing.T) { return o }, expected: "truncate>", - }, { - name: "dynamic", - options: func() *AuditOptions { - o := NewAuditOptions() - o.DynamicOptions.Enabled = true - return o - }, - expected: "dynamic[]", - }, { - name: "dynamic with truncating", - options: func() *AuditOptions { - o := NewAuditOptions() - o.DynamicOptions.Enabled = true - o.WebhookOptions.TruncateOptions.Enabled = true - return o - }, - expected: "truncate", - }, { - name: "dynamic with log", - options: func() *AuditOptions { - o := NewAuditOptions() - o.DynamicOptions.Enabled = true - o.LogOptions.Path = "/audit" - o.PolicyFile = policy - return o - }, - expected: "union[enforced>,dynamic[]]", - }, { - name: "dynamic with truncating and webhook", - options: func() *AuditOptions { - o := NewAuditOptions() - o.DynamicOptions.Enabled = true - o.WebhookOptions.TruncateOptions.Enabled = true - o.WebhookOptions.ConfigFile = webhookConfig - o.PolicyFile = policy - return o - }, - expected: "truncate>,dynamic[]]>", - }, { - name: "dynamic with truncating and webhook and log", - options: func() *AuditOptions { - o := NewAuditOptions() - o.DynamicOptions.Enabled = true - o.WebhookOptions.TruncateOptions.Enabled = true - o.WebhookOptions.ConfigFile = webhookConfig - o.PolicyFile = policy - o.LogOptions.Path = "/audit" - return o - }, - expected: "union[enforced>,truncate>,dynamic[]]>]", }, } for _, tc := range testCases { @@ -200,7 +137,7 @@ func TestAuditValidOptions(t *testing.T) { assert.Empty(t, options.Validate(), "Options should be valid.") config := &server.Config{} - require.NoError(t, options.ApplyTo(config, clientConfig, informerFactory, processInfo, nil)) + require.NoError(t, options.ApplyTo(config)) if tc.expected == "" { assert.Nil(t, config.AuditBackend) } else { @@ -275,13 +212,6 @@ func TestAuditInvalidOptions(t *testing.T) { o.WebhookOptions.TruncateOptions.TruncateConfig.MaxBatchSize = 1 return o }, - }, { - name: "invalid dynamic flag group", - options: func() *AuditOptions { - o := NewAuditOptions() - o.DynamicOptions.Enabled = true - return o - }, }, } for _, tc := range testCases { diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/events.go b/staging/src/k8s.io/apiserver/pkg/server/options/events.go deleted file mode 100644 index 2dfc0111fcc..00000000000 --- a/staging/src/k8s.io/apiserver/pkg/server/options/events.go +++ /dev/null @@ -1,56 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package options - -import ( - "fmt" - "os" -) - -// ProcessInfo holds the apiserver process information used to send events -type ProcessInfo struct { - // Name of the api process to identify events - Name string - - // Namespace of the api process to send events - Namespace string -} - -// NewProcessInfo returns a new process info with the hostname concatenated to the name given -func NewProcessInfo(name, namespace string) *ProcessInfo { - // try to concat the hostname if available - host, _ := os.Hostname() - if host != "" { - name = fmt.Sprintf("%s-%s", name, host) - } - return &ProcessInfo{ - Name: name, - Namespace: namespace, - } -} - -// validateProcessInfo checks for a complete process info -func validateProcessInfo(p *ProcessInfo) error { - if p == nil { - return fmt.Errorf("ProcessInfo must be set") - } else if p.Name == "" { - return fmt.Errorf("ProcessInfo name must be set") - } else if p.Namespace == "" { - return fmt.Errorf("ProcessInfo namespace must be set") - } - return nil -} diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go b/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go index 3d634a2e6ff..8ce82aa0b0d 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go @@ -48,14 +48,11 @@ type RecommendedOptions struct { // admission plugin initializers to Admission.ApplyTo. ExtraAdmissionInitializers func(c *server.RecommendedConfig) ([]admission.PluginInitializer, error) Admission *AdmissionOptions - // ProcessInfo is used to identify events created by the server. - ProcessInfo *ProcessInfo - Webhook *WebhookOptions // API Server Egress Selector is used to control outbound traffic from the API Server EgressSelector *EgressSelectorOptions } -func NewRecommendedOptions(prefix string, codec runtime.Codec, processInfo *ProcessInfo) *RecommendedOptions { +func NewRecommendedOptions(prefix string, codec runtime.Codec) *RecommendedOptions { sso := NewSecureServingOptions() // We are composing recommended options for an aggregated api-server, @@ -78,8 +75,6 @@ func NewRecommendedOptions(prefix string, codec runtime.Codec, processInfo *Proc FeatureGate: feature.DefaultFeatureGate, ExtraAdmissionInitializers: func(c *server.RecommendedConfig) ([]admission.PluginInitializer, error) { return nil, nil }, Admission: NewAdmissionOptions(), - ProcessInfo: processInfo, - Webhook: NewWebhookOptions(), EgressSelector: NewEgressSelectorOptions(), } } @@ -111,7 +106,7 @@ func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error { if err := o.Authorization.ApplyTo(&config.Config.Authorization); err != nil { return err } - if err := o.Audit.ApplyTo(&config.Config, config.ClientConfig, config.SharedInformerFactory, o.ProcessInfo, o.Webhook); err != nil { + if err := o.Audit.ApplyTo(&config.Config); err != nil { return err } if err := o.Features.ApplyTo(&config.Config); err != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/webhook.go b/staging/src/k8s.io/apiserver/pkg/server/options/webhook.go deleted file mode 100644 index bd3ec124d6d..00000000000 --- a/staging/src/k8s.io/apiserver/pkg/server/options/webhook.go +++ /dev/null @@ -1,34 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package options - -import ( - utilwebhook "k8s.io/apiserver/pkg/util/webhook" -) - -// WebhookOptions holds the outgoing webhook options -type WebhookOptions struct { - ServiceResolver utilwebhook.ServiceResolver - AuthInfoResolverWrapper utilwebhook.AuthenticationInfoResolverWrapper -} - -// NewWebhookOptions returns the default options for outgoing webhooks -func NewWebhookOptions() *WebhookOptions { - return &WebhookOptions{ - ServiceResolver: utilwebhook.NewDefaultServiceResolver(), - } -} diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/BUILD b/staging/src/k8s.io/apiserver/plugin/pkg/audit/BUILD index c2c4d07fcdf..12df108356d 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/BUILD +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/BUILD @@ -24,7 +24,6 @@ filegroup( srcs = [ ":package-srcs", "//staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered:all-srcs", - "//staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic:all-srcs", "//staging/src/k8s.io/apiserver/plugin/pkg/audit/fake:all-srcs", "//staging/src/k8s.io/apiserver/plugin/pkg/audit/log:all-srcs", "//staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate:all-srcs", diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/BUILD b/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/BUILD deleted file mode 100644 index e681389366b..00000000000 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/BUILD +++ /dev/null @@ -1,76 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") - -go_library( - name = "go_default_library", - srcs = [ - "defaults.go", - "dynamic.go", - "factory.go", - ], - importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/plugin/pkg/audit/dynamic", - importpath = "k8s.io/apiserver/plugin/pkg/audit/dynamic", - visibility = ["//visibility:public"], - deps = [ - "//staging/src/k8s.io/api/auditregistration/v1alpha1:go_default_library", - "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/apis/audit/install:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/apis/audit/v1:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/audit:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/audit/policy:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/audit/util:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/util/webhook:go_default_library", - "//staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered:go_default_library", - "//staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced:go_default_library", - "//staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook:go_default_library", - "//staging/src/k8s.io/client-go/informers/auditregistration/v1alpha1:go_default_library", - "//staging/src/k8s.io/client-go/tools/cache:go_default_library", - "//staging/src/k8s.io/client-go/tools/record:go_default_library", - "//vendor/k8s.io/klog/v2:go_default_library", - ], -) - -go_test( - name = "go_default_test", - srcs = [ - "dynamic_test.go", - "factory_test.go", - ], - embed = [":go_default_library"], - deps = [ - "//staging/src/k8s.io/api/auditregistration/v1alpha1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/apis/audit/v1:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/audit:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/util/webhook:go_default_library", - "//staging/src/k8s.io/client-go/informers:go_default_library", - "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", - "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", - "//vendor/github.com/stretchr/testify/require:go_default_library", - "//vendor/k8s.io/utils/pointer:go_default_library", - ], -) - -filegroup( - name = "package-srcs", - srcs = glob(["**"]), - tags = ["automanaged"], - visibility = ["//visibility:private"], -) - -filegroup( - name = "all-srcs", - srcs = [ - ":package-srcs", - "//staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced:all-srcs", - ], - tags = ["automanaged"], - visibility = ["//visibility:public"], -) diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/defaults.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/defaults.go deleted file mode 100644 index f442954b500..00000000000 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/defaults.go +++ /dev/null @@ -1,46 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package dynamic - -import ( - "time" - - bufferedplugin "k8s.io/apiserver/plugin/pkg/audit/buffered" -) - -const ( - // Default configuration values for ModeBatch when applied to a dynamic plugin - defaultBatchBufferSize = 5000 // Buffer up to 5000 events before starting discarding. - defaultBatchMaxSize = 400 // Only send up to 400 events at a time. - defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute. - defaultBatchThrottleQPS = 10 // Limit the send rate by 10 QPS. - defaultBatchThrottleBurst = 15 // Allow up to 15 QPS burst. -) - -// NewDefaultWebhookBatchConfig returns new Batch Config objects populated by default values -// for dynamic webhooks -func NewDefaultWebhookBatchConfig() *bufferedplugin.BatchConfig { - return &bufferedplugin.BatchConfig{ - BufferSize: defaultBatchBufferSize, - MaxBatchSize: defaultBatchMaxSize, - MaxBatchWait: defaultBatchMaxWait, - ThrottleEnable: true, - ThrottleQPS: defaultBatchThrottleQPS, - ThrottleBurst: defaultBatchThrottleBurst, - AsyncDelegate: true, - } -} diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/dynamic.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/dynamic.go deleted file mode 100644 index ec03fe43d20..00000000000 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/dynamic.go +++ /dev/null @@ -1,365 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package dynamic - -import ( - "fmt" - "reflect" - "strings" - "sync" - "sync/atomic" - - "k8s.io/klog/v2" - - auditregv1alpha1 "k8s.io/api/auditregistration/v1alpha1" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" - auditinternal "k8s.io/apiserver/pkg/apis/audit" - auditinstall "k8s.io/apiserver/pkg/apis/audit/install" - auditv1 "k8s.io/apiserver/pkg/apis/audit/v1" - "k8s.io/apiserver/pkg/audit" - webhook "k8s.io/apiserver/pkg/util/webhook" - bufferedplugin "k8s.io/apiserver/plugin/pkg/audit/buffered" - auditinformer "k8s.io/client-go/informers/auditregistration/v1alpha1" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" -) - -// PluginName is the name reported in error metrics. -const PluginName = "dynamic" - -// Config holds the configuration for the dynamic backend -type Config struct { - // Informer for the audit sinks - Informer auditinformer.AuditSinkInformer - // EventConfig holds the configuration for event notifications about the AuditSink API objects - EventConfig EventConfig - // BufferedConfig is the runtime buffered configuration - BufferedConfig *bufferedplugin.BatchConfig - // WebhookConfig holds the configuration for outgoing webhooks - WebhookConfig WebhookConfig -} - -// WebhookConfig holds the configurations for outgoing webhooks -type WebhookConfig struct { - // AuthInfoResolverWrapper provides the webhook authentication for in-cluster endpoints - AuthInfoResolverWrapper webhook.AuthenticationInfoResolverWrapper - // ServiceResolver knows how to convert a webhook service reference into an actual location. - ServiceResolver webhook.ServiceResolver -} - -// EventConfig holds the configurations for sending event notifiations about AuditSink API objects -type EventConfig struct { - // Sink for emitting events - Sink record.EventSink - // Source holds the source information about the event emitter - Source corev1.EventSource -} - -// delegate represents a delegate backend that was created from an audit sink configuration -type delegate struct { - audit.Backend - configuration *auditregv1alpha1.AuditSink - stopChan chan struct{} -} - -// gracefulShutdown will gracefully shutdown the delegate -func (d *delegate) gracefulShutdown() { - close(d.stopChan) - d.Shutdown() -} - -// NewBackend returns a backend that dynamically updates its configuration -// based on a shared informer. -func NewBackend(c *Config) (audit.Backend, error) { - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(klog.Infof) - eventBroadcaster.StartRecordingToSink(c.EventConfig.Sink) - - scheme := runtime.NewScheme() - err := auditregv1alpha1.AddToScheme(scheme) - if err != nil { - return nil, err - } - recorder := eventBroadcaster.NewRecorder(scheme, c.EventConfig.Source) - - if c.BufferedConfig == nil { - c.BufferedConfig = NewDefaultWebhookBatchConfig() - } - cm, err := webhook.NewClientManager([]schema.GroupVersion{auditv1.SchemeGroupVersion}, func(s *runtime.Scheme) error { - auditinstall.Install(s) - return nil - }) - if err != nil { - return nil, err - } - - // TODO: need a way of injecting authentication before beta - authInfoResolver, err := webhook.NewDefaultAuthenticationInfoResolver("") - if err != nil { - return nil, err - } - cm.SetAuthenticationInfoResolver(authInfoResolver) - cm.SetServiceResolver(c.WebhookConfig.ServiceResolver) - cm.SetAuthenticationInfoResolverWrapper(c.WebhookConfig.AuthInfoResolverWrapper) - - manager := &backend{ - config: c, - delegates: atomic.Value{}, - delegateUpdateMutex: sync.Mutex{}, - stopped: false, - webhookClientManager: cm, - recorder: recorder, - } - manager.delegates.Store(syncedDelegates{}) - - c.Informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - manager.addSink(obj.(*auditregv1alpha1.AuditSink)) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - manager.updateSink(oldObj.(*auditregv1alpha1.AuditSink), newObj.(*auditregv1alpha1.AuditSink)) - }, - DeleteFunc: func(obj interface{}) { - sink, ok := obj.(*auditregv1alpha1.AuditSink) - if !ok { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - klog.V(2).Infof("Couldn't get object from tombstone %#v", obj) - return - } - sink, ok = tombstone.Obj.(*auditregv1alpha1.AuditSink) - if !ok { - klog.V(2).Infof("Tombstone contained object that is not an AuditSink: %#v", obj) - return - } - } - manager.deleteSink(sink) - }, - }) - - return manager, nil -} - -type backend struct { - // delegateUpdateMutex holds an update lock on the delegates - delegateUpdateMutex sync.Mutex - stopped bool - config *Config - delegates atomic.Value - webhookClientManager webhook.ClientManager - recorder record.EventRecorder -} - -type syncedDelegates map[types.UID]*delegate - -// Names returns the names of the delegate configurations -func (s syncedDelegates) Names() []string { - names := []string{} - for _, delegate := range s { - names = append(names, delegate.configuration.Name) - } - return names -} - -// ProcessEvents proccesses the given events per current delegate map -func (b *backend) ProcessEvents(events ...*auditinternal.Event) bool { - for _, d := range b.GetDelegates() { - d.ProcessEvents(events...) - } - // Returning true regardless of results, since dynamic audit backends - // can never cause apiserver request to fail. - return true -} - -// Run starts a goroutine that propagates the shutdown signal, -// individual delegates are ran as they are created. -func (b *backend) Run(stopCh <-chan struct{}) error { - go func() { - <-stopCh - b.stopAllDelegates() - }() - return nil -} - -// stopAllDelegates closes the stopChan for every delegate to enable -// goroutines to terminate gracefully. This is a helper method to propagate -// the primary stopChan to the current delegate map. -func (b *backend) stopAllDelegates() { - b.delegateUpdateMutex.Lock() - defer b.delegateUpdateMutex.Unlock() - if b.stopped { - return - } - b.stopped = true - for _, d := range b.GetDelegates() { - close(d.stopChan) - } -} - -// Shutdown calls the shutdown method on all delegates. The stopChan should -// be closed before this is called. -func (b *backend) Shutdown() { - for _, d := range b.GetDelegates() { - d.Shutdown() - } -} - -// GetDelegates retrieves current delegates in a safe manner -func (b *backend) GetDelegates() syncedDelegates { - return b.delegates.Load().(syncedDelegates) -} - -// copyDelegates returns a copied delegate map -func (b *backend) copyDelegates() syncedDelegates { - c := make(syncedDelegates) - for u, s := range b.GetDelegates() { - c[u] = s - } - return c -} - -// setDelegates sets the current delegates in a safe manner -func (b *backend) setDelegates(delegates syncedDelegates) { - b.delegates.Store(delegates) -} - -// addSink is called by the shared informer when a sink is added -func (b *backend) addSink(sink *auditregv1alpha1.AuditSink) { - b.delegateUpdateMutex.Lock() - defer b.delegateUpdateMutex.Unlock() - if b.stopped { - msg := fmt.Sprintf("Could not add audit sink %q uid: %s. Update to all delegates is stopped.", sink.Name, sink.UID) - klog.Error(msg) - return - } - delegates := b.copyDelegates() - if _, ok := delegates[sink.UID]; ok { - klog.Errorf("Audit sink %q uid: %s already exists, could not readd", sink.Name, sink.UID) - return - } - d, err := b.createAndStartDelegate(sink) - if err != nil { - msg := fmt.Sprintf("Could not add audit sink %q: %v", sink.Name, err) - klog.Error(msg) - b.recorder.Event(sink, corev1.EventTypeWarning, "CreateFailed", msg) - return - } - delegates[sink.UID] = d - b.setDelegates(delegates) - klog.V(2).Infof("Added audit sink: %s", sink.Name) - klog.V(2).Infof("Current audit sinks: %v", delegates.Names()) -} - -// updateSink is called by the shared informer when a sink is updated. -// The new sink is only rebuilt on spec changes. The new sink must not have -// the same uid as the previous. The new sink will be started before the old -// one is shutdown so no events will be lost -func (b *backend) updateSink(oldSink, newSink *auditregv1alpha1.AuditSink) { - b.delegateUpdateMutex.Lock() - defer b.delegateUpdateMutex.Unlock() - if b.stopped { - msg := fmt.Sprintf("Could not update old audit sink %q to new audit sink %q. Update to all delegates is stopped.", oldSink.Name, newSink.Name) - klog.Error(msg) - return - } - delegates := b.copyDelegates() - oldDelegate, ok := delegates[oldSink.UID] - if !ok { - klog.Errorf("Could not update audit sink %q uid: %s, old sink does not exist", - oldSink.Name, oldSink.UID) - return - } - - // check if spec has changed - eq := reflect.DeepEqual(oldSink.Spec, newSink.Spec) - if eq { - delete(delegates, oldSink.UID) - delegates[newSink.UID] = oldDelegate - b.setDelegates(delegates) - } else { - d, err := b.createAndStartDelegate(newSink) - if err != nil { - msg := fmt.Sprintf("Could not update audit sink %q: %v", oldSink.Name, err) - klog.Error(msg) - b.recorder.Event(newSink, corev1.EventTypeWarning, "UpdateFailed", msg) - return - } - delete(delegates, oldSink.UID) - delegates[newSink.UID] = d - b.setDelegates(delegates) - - // graceful shutdown in goroutine as to not block - go oldDelegate.gracefulShutdown() - } - - klog.V(2).Infof("Updated audit sink: %s", newSink.Name) - klog.V(2).Infof("Current audit sinks: %v", delegates.Names()) -} - -// deleteSink is called by the shared informer when a sink is deleted -func (b *backend) deleteSink(sink *auditregv1alpha1.AuditSink) { - b.delegateUpdateMutex.Lock() - defer b.delegateUpdateMutex.Unlock() - if b.stopped { - msg := fmt.Sprintf("Could not delete audit sink %q uid: %s. Update to all delegates is stopped.", sink.Name, sink.UID) - klog.Warning(msg) - return - } - delegates := b.copyDelegates() - delegate, ok := delegates[sink.UID] - if !ok { - klog.Errorf("Could not delete audit sink %q uid: %s, does not exist", sink.Name, sink.UID) - return - } - delete(delegates, sink.UID) - b.setDelegates(delegates) - - // graceful shutdown in goroutine as to not block - go delegate.gracefulShutdown() - klog.V(2).Infof("Deleted audit sink: %s", sink.Name) - klog.V(2).Infof("Current audit sinks: %v", delegates.Names()) -} - -// createAndStartDelegate will build a delegate from an audit sink configuration and run it -func (b *backend) createAndStartDelegate(sink *auditregv1alpha1.AuditSink) (*delegate, error) { - f := factory{ - config: b.config, - webhookClientManager: b.webhookClientManager, - sink: sink, - } - delegate, err := f.BuildDelegate() - if err != nil { - return nil, err - } - err = delegate.Run(delegate.stopChan) - if err != nil { - return nil, err - } - return delegate, nil -} - -// String returns a string representation of the backend -func (b *backend) String() string { - var delegateStrings []string - for _, delegate := range b.GetDelegates() { - delegateStrings = append(delegateStrings, fmt.Sprintf("%s", delegate)) - } - return fmt.Sprintf("%s[%s]", PluginName, strings.Join(delegateStrings, ",")) -} diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/dynamic_test.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/dynamic_test.go deleted file mode 100644 index 9f49929a7bd..00000000000 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/dynamic_test.go +++ /dev/null @@ -1,319 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package dynamic - -import ( - "fmt" - "io/ioutil" - "net/http" - "net/http/httptest" - "reflect" - "sync/atomic" - "testing" - "time" - - "github.com/stretchr/testify/require" - - auditregv1alpha1 "k8s.io/api/auditregistration/v1alpha1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" - auditinternal "k8s.io/apiserver/pkg/apis/audit" - auditv1 "k8s.io/apiserver/pkg/apis/audit/v1" - "k8s.io/apiserver/pkg/audit" - webhook "k8s.io/apiserver/pkg/util/webhook" - informers "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes/fake" - v1core "k8s.io/client-go/kubernetes/typed/core/v1" -) - -func TestDynamic(t *testing.T) { - eventList1 := &atomic.Value{} - eventList1.Store(auditinternal.EventList{}) - eventList2 := &atomic.Value{} - eventList2.Store(auditinternal.EventList{}) - - // start test servers - server1 := httptest.NewServer(buildTestHandler(t, eventList1)) - defer server1.Close() - server2 := httptest.NewServer(buildTestHandler(t, eventList2)) - defer server2.Close() - - testPolicy := auditregv1alpha1.Policy{ - Level: auditregv1alpha1.LevelMetadata, - Stages: []auditregv1alpha1.Stage{ - auditregv1alpha1.StageResponseStarted, - }, - } - testEvent := auditinternal.Event{ - Level: auditinternal.LevelMetadata, - Stage: auditinternal.StageResponseStarted, - Verb: "get", - RequestURI: "/test/path", - } - testConfig1 := &auditregv1alpha1.AuditSink{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test1", - UID: types.UID("test1"), - }, - Spec: auditregv1alpha1.AuditSinkSpec{ - Policy: testPolicy, - Webhook: auditregv1alpha1.Webhook{ - ClientConfig: auditregv1alpha1.WebhookClientConfig{ - URL: &server1.URL, - }, - }, - }, - } - testConfig2 := &auditregv1alpha1.AuditSink{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test2", - UID: types.UID("test2"), - }, - Spec: auditregv1alpha1.AuditSinkSpec{ - Policy: testPolicy, - Webhook: auditregv1alpha1.Webhook{ - ClientConfig: auditregv1alpha1.WebhookClientConfig{ - URL: &server2.URL, - }, - }, - }, - } - - badURL := "http://badtest" - badConfig := &auditregv1alpha1.AuditSink{ - ObjectMeta: metav1.ObjectMeta{ - Name: "bad", - UID: types.UID("bad"), - }, - Spec: auditregv1alpha1.AuditSinkSpec{ - Policy: testPolicy, - Webhook: auditregv1alpha1.Webhook{ - ClientConfig: auditregv1alpha1.WebhookClientConfig{ - URL: &badURL, - }, - }, - }, - } - - config, stopChan := defaultTestConfig() - config.BufferedConfig.MaxBatchSize = 1 - - b, err := NewBackend(config) - require.NoError(t, err) - d := b.(*backend) - err = b.Run(stopChan) - require.NoError(t, err) - - t.Run("find none", func(t *testing.T) { - require.Len(t, d.GetDelegates(), 0) - }) - - success := t.Run("find one", func(t *testing.T) { - d.addSink(testConfig1) - delegates := d.GetDelegates() - require.Len(t, delegates, 1) - require.Contains(t, delegates, types.UID("test1")) - require.Equal(t, testConfig1, delegates["test1"].configuration) - - // send event and check that it arrives - b.ProcessEvents(&testEvent) - err := checkForEvent(eventList1, testEvent) - require.NoError(t, err, "unable to find events sent to sink") - }) - require.True(t, success) // propagate failure - - // test that a bad webhook configuration can be recovered from - success = t.Run("bad config", func(t *testing.T) { - d.addSink(badConfig) - delegates := d.GetDelegates() - require.Len(t, delegates, 2) - require.Contains(t, delegates, types.UID("bad")) - require.Equal(t, badConfig, delegates["bad"].configuration) - - // send events to the buffer - b.ProcessEvents(&testEvent, &testEvent) - - // event is in the buffer see if the sink can be deleted - // this will hang and fail if not handled properly - d.deleteSink(badConfig) - - delegates = d.GetDelegates() - require.Len(t, delegates, 1) - require.Contains(t, delegates, types.UID("test1")) - require.Equal(t, testConfig1, delegates["test1"].configuration) - }) - require.True(t, success) // propagate failure - - success = t.Run("find two", func(t *testing.T) { - eventList1.Store(auditinternal.EventList{}) - d.addSink(testConfig2) - delegates := d.GetDelegates() - require.Len(t, delegates, 2) - require.Contains(t, delegates, types.UID("test1")) - require.Contains(t, delegates, types.UID("test2")) - require.Equal(t, testConfig1, delegates["test1"].configuration) - require.Equal(t, testConfig2, delegates["test2"].configuration) - - // send event to both delegates and check that it arrives in both places - b.ProcessEvents(&testEvent) - err := checkForEvent(eventList1, testEvent) - require.NoError(t, err, "unable to find events sent to sink 1") - err = checkForEvent(eventList2, testEvent) - require.NoError(t, err, "unable to find events sent to sink 2") - }) - require.True(t, success) // propagate failure - - success = t.Run("delete one", func(t *testing.T) { - eventList2.Store(auditinternal.EventList{}) - d.deleteSink(testConfig1) - delegates := d.GetDelegates() - require.Len(t, delegates, 1) - require.Contains(t, delegates, types.UID("test2")) - require.Equal(t, testConfig2, delegates["test2"].configuration) - - // send event and check that it arrives to remaining sink - b.ProcessEvents(&testEvent) - err := checkForEvent(eventList2, testEvent) - require.NoError(t, err, "unable to find events sent to sink") - }) - require.True(t, success) // propagate failure - - success = t.Run("update one", func(t *testing.T) { - eventList1.Store(auditinternal.EventList{}) - oldConfig := *testConfig2 - testConfig2.Spec.Webhook.ClientConfig.URL = &server1.URL - testConfig2.UID = types.UID("test2.1") - d.updateSink(&oldConfig, testConfig2) - delegates := d.GetDelegates() - require.Len(t, delegates, 1) - require.Contains(t, delegates, types.UID("test2.1")) - require.Equal(t, testConfig2, delegates["test2.1"].configuration) - - // send event and check that it arrives to updated sink - b.ProcessEvents(&testEvent) - err := checkForEvent(eventList1, testEvent) - require.NoError(t, err, "unable to find events sent to sink") - }) - require.True(t, success) // propagate failure - - success = t.Run("update meta only", func(t *testing.T) { - eventList1.Store(auditinternal.EventList{}) - oldConfig := *testConfig2 - testConfig2.UID = types.UID("test2.2") - testConfig2.Labels = map[string]string{"my": "label"} - d.updateSink(&oldConfig, testConfig2) - delegates := d.GetDelegates() - require.Len(t, delegates, 1) - require.Contains(t, delegates, types.UID("test2.2")) - - // send event and check that it arrives to same sink - b.ProcessEvents(&testEvent) - err := checkForEvent(eventList1, testEvent) - require.NoError(t, err, "unable to find events sent to sink") - }) - require.True(t, success) // propagate failure - - success = t.Run("shutdown", func(t *testing.T) { - // if the stop signal is not propagated correctly the buffers will not - // close down gracefully, and the shutdown method will hang causing - // the test will timeout. - timeoutChan := make(chan struct{}) - successChan := make(chan struct{}) - go func() { - time.Sleep(1 * time.Second) - timeoutChan <- struct{}{} - }() - go func() { - close(stopChan) - d.Shutdown() - successChan <- struct{}{} - }() - for { - select { - case <-timeoutChan: - t.Error("shutdown timed out") - return - case <-successChan: - return - } - } - }) - require.True(t, success) // propagate failure -} - -// checkForEvent will poll to check for an audit event in an atomic event list -func checkForEvent(a *atomic.Value, evSent auditinternal.Event) error { - return wait.Poll(100*time.Millisecond, 1*time.Second, func() (bool, error) { - el := a.Load().(auditinternal.EventList) - if len(el.Items) != 1 { - return false, nil - } - evFound := el.Items[0] - eq := reflect.DeepEqual(evSent, evFound) - if !eq { - return false, fmt.Errorf("event mismatch -- sent: %+v found: %+v", evSent, evFound) - } - return true, nil - }) -} - -// buildTestHandler returns a handler that will update the atomic value passed in -// with the event list it receives -func buildTestHandler(t *testing.T, a *atomic.Value) http.HandlerFunc { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - body, err := ioutil.ReadAll(r.Body) - if err != nil { - t.Fatalf("could not read request body: %v", err) - } - el := auditinternal.EventList{} - decoder := audit.Codecs.UniversalDecoder(auditv1.SchemeGroupVersion) - if err := runtime.DecodeInto(decoder, body, &el); err != nil { - t.Fatalf("failed decoding buf: %b, apiVersion: %s", body, auditv1.SchemeGroupVersion) - } - defer r.Body.Close() - a.Store(el) - w.WriteHeader(200) - }) -} - -// defaultTestConfig returns a Config object suitable for testing along with its -// associated stopChan -func defaultTestConfig() (*Config, chan struct{}) { - authWrapper := webhook.AuthenticationInfoResolverWrapper( - func(a webhook.AuthenticationInfoResolver) webhook.AuthenticationInfoResolver { return a }, - ) - client := fake.NewSimpleClientset() - informerFactory := informers.NewSharedInformerFactory(client, 0) - stop := make(chan struct{}) - - eventSink := &v1core.EventSinkImpl{Interface: client.CoreV1().Events("")} - - informerFactory.Start(stop) - informerFactory.WaitForCacheSync(stop) - informer := informerFactory.Auditregistration().V1alpha1().AuditSinks() - return &Config{ - Informer: informer, - EventConfig: EventConfig{Sink: eventSink}, - BufferedConfig: NewDefaultWebhookBatchConfig(), - WebhookConfig: WebhookConfig{ - AuthInfoResolverWrapper: authWrapper, - ServiceResolver: webhook.NewDefaultServiceResolver(), - }, - }, stop -} diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced/BUILD b/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced/BUILD deleted file mode 100644 index 8a4c454dbe0..00000000000 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced/BUILD +++ /dev/null @@ -1,45 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") - -go_library( - name = "go_default_library", - srcs = ["enforced.go"], - importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced", - importpath = "k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced", - visibility = ["//visibility:public"], - deps = [ - "//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/audit:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/audit/event:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/audit/policy:go_default_library", - ], -) - -go_test( - name = "go_default_test", - srcs = ["enforced_test.go"], - embed = [":go_default_library"], - deps = [ - "//staging/src/k8s.io/api/authentication/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/audit/policy:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library", - "//staging/src/k8s.io/apiserver/plugin/pkg/audit/fake:go_default_library", - "//vendor/github.com/stretchr/testify/require:go_default_library", - ], -) - -filegroup( - name = "package-srcs", - srcs = glob(["**"]), - tags = ["automanaged"], - visibility = ["//visibility:private"], -) - -filegroup( - name = "all-srcs", - srcs = [":package-srcs"], - tags = ["automanaged"], - visibility = ["//visibility:public"], -) diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced/enforced.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced/enforced.go deleted file mode 100644 index 8feb523bedf..00000000000 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced/enforced.go +++ /dev/null @@ -1,93 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package enforced - -import ( - "fmt" - - auditinternal "k8s.io/apiserver/pkg/apis/audit" - "k8s.io/apiserver/pkg/audit" - ev "k8s.io/apiserver/pkg/audit/event" - "k8s.io/apiserver/pkg/audit/policy" -) - -// PluginName is the name reported in error metrics. -const PluginName = "enforced" - -// Backend filters audit events according to the policy -// trimming them as necessary to match the level -type Backend struct { - policyChecker policy.Checker - delegateBackend audit.Backend -} - -// NewBackend returns an enforced audit backend that wraps delegate backend. -// Enforced backend automatically runs and shuts down the delegate backend. -func NewBackend(delegate audit.Backend, p policy.Checker) audit.Backend { - return &Backend{ - policyChecker: p, - delegateBackend: delegate, - } -} - -// Run the delegate backend -func (b Backend) Run(stopCh <-chan struct{}) error { - return b.delegateBackend.Run(stopCh) -} - -// Shutdown the delegate backend -func (b Backend) Shutdown() { - b.delegateBackend.Shutdown() -} - -// ProcessEvents enforces policy on a shallow copy of the given event -// dropping any sections that don't conform -func (b Backend) ProcessEvents(events ...*auditinternal.Event) bool { - for _, event := range events { - if event == nil { - continue - } - attr, err := ev.NewAttributes(event) - if err != nil { - audit.HandlePluginError(PluginName, err, event) - continue - } - level, stages := b.policyChecker.LevelAndStages(attr) - if level == auditinternal.LevelNone { - continue - } - // make shallow copy before modifying to satisfy interface definition - ev := *event - e, err := policy.EnforcePolicy(&ev, level, stages) - if err != nil { - audit.HandlePluginError(PluginName, err, event) - continue - } - if e == nil { - continue - } - b.delegateBackend.ProcessEvents(e) - } - // Returning true regardless of results, since dynamic audit backends - // can never cause apiserver request to fail. - return true -} - -// String returns a string representation of the backend -func (b Backend) String() string { - return fmt.Sprintf("%s<%s>", PluginName, b.delegateBackend) -} diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced/enforced_test.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced/enforced_test.go deleted file mode 100644 index 25acad90f0c..00000000000 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced/enforced_test.go +++ /dev/null @@ -1,118 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package enforced - -import ( - "testing" - - "github.com/stretchr/testify/require" - - authnv1 "k8s.io/api/authentication/v1" - "k8s.io/apimachinery/pkg/runtime" - auditinternal "k8s.io/apiserver/pkg/apis/audit" - "k8s.io/apiserver/pkg/audit/policy" - "k8s.io/apiserver/pkg/authentication/user" - "k8s.io/apiserver/pkg/authorization/authorizer" - fakeplugin "k8s.io/apiserver/plugin/pkg/audit/fake" -) - -func TestEnforced(t *testing.T) { - testCases := []struct { - name string - event *auditinternal.Event - policy auditinternal.Policy - attribs authorizer.Attributes - expected []*auditinternal.Event - }{ - { - name: "enforce level", - event: &auditinternal.Event{ - Level: auditinternal.LevelRequestResponse, - Stage: auditinternal.StageResponseComplete, - RequestURI: "/apis/extensions/v1beta1", - RequestObject: &runtime.Unknown{Raw: []byte(`test`)}, - ResponseObject: &runtime.Unknown{Raw: []byte(`test`)}, - }, - policy: auditinternal.Policy{ - Rules: []auditinternal.PolicyRule{ - { - Level: auditinternal.LevelMetadata, - }, - }, - }, - expected: []*auditinternal.Event{ - { - Level: auditinternal.LevelMetadata, - Stage: auditinternal.StageResponseComplete, - RequestURI: "/apis/extensions/v1beta1", - }, - }, - }, - { - name: "enforce policy rule", - event: &auditinternal.Event{ - Level: auditinternal.LevelRequestResponse, - Stage: auditinternal.StageResponseComplete, - RequestURI: "/apis/extensions/v1beta1", - User: authnv1.UserInfo{ - Username: user.Anonymous, - }, - RequestObject: &runtime.Unknown{Raw: []byte(`test`)}, - ResponseObject: &runtime.Unknown{Raw: []byte(`test`)}, - }, - policy: auditinternal.Policy{ - Rules: []auditinternal.PolicyRule{ - { - Level: auditinternal.LevelNone, - Users: []string{user.Anonymous}, - }, - { - Level: auditinternal.LevelMetadata, - }, - }, - }, - expected: []*auditinternal.Event{}, - }, - { - name: "nil event", - event: nil, - policy: auditinternal.Policy{ - Rules: []auditinternal.PolicyRule{ - { - Level: auditinternal.LevelMetadata, - }, - }, - }, - expected: []*auditinternal.Event{}, - }, - } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - ev := []*auditinternal.Event{} - fakeBackend := fakeplugin.Backend{ - OnRequest: func(events []*auditinternal.Event) { - ev = events - }, - } - b := NewBackend(&fakeBackend, policy.NewChecker(&tc.policy)) - defer b.Shutdown() - - b.ProcessEvents(tc.event) - require.Equal(t, tc.expected, ev) - }) - } -} diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/factory.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/factory.go deleted file mode 100644 index f9ce7abf790..00000000000 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/factory.go +++ /dev/null @@ -1,91 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package dynamic - -import ( - "fmt" - "time" - - auditregv1alpha1 "k8s.io/api/auditregistration/v1alpha1" - "k8s.io/apiserver/pkg/audit" - "k8s.io/apiserver/pkg/audit/policy" - auditutil "k8s.io/apiserver/pkg/audit/util" - "k8s.io/apiserver/pkg/util/webhook" - bufferedplugin "k8s.io/apiserver/plugin/pkg/audit/buffered" - enforcedplugin "k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced" - webhookplugin "k8s.io/apiserver/plugin/pkg/audit/webhook" -) - -// TODO: find a common place for all the default retry backoffs -const retryBackoff = 500 * time.Millisecond - -// factory builds a delegate from an AuditSink -type factory struct { - config *Config - webhookClientManager webhook.ClientManager - sink *auditregv1alpha1.AuditSink -} - -// BuildDelegate creates a delegate from the AuditSink object -func (f *factory) BuildDelegate() (*delegate, error) { - backend, err := f.buildWebhookBackend() - if err != nil { - return nil, err - } - backend = f.applyEnforcedOpts(backend) - backend = f.applyBufferedOpts(backend) - ch := make(chan struct{}) - return &delegate{ - Backend: backend, - configuration: f.sink, - stopChan: ch, - }, nil -} - -func (f *factory) buildWebhookBackend() (audit.Backend, error) { - hookClient := auditutil.HookClientConfigForSink(f.sink) - client, err := f.webhookClientManager.HookClient(hookClient) - if err != nil { - return nil, fmt.Errorf("could not create webhook client: %v", err) - } - backend := webhookplugin.NewDynamicBackend(client, retryBackoff) - return backend, nil -} - -func (f *factory) applyEnforcedOpts(delegate audit.Backend) audit.Backend { - pol := policy.ConvertDynamicPolicyToInternal(&f.sink.Spec.Policy) - checker := policy.NewChecker(pol) - eb := enforcedplugin.NewBackend(delegate, checker) - return eb -} - -func (f *factory) applyBufferedOpts(delegate audit.Backend) audit.Backend { - bc := f.config.BufferedConfig - tc := f.sink.Spec.Webhook.Throttle - if tc != nil { - bc.ThrottleEnable = true - if tc.Burst != nil { - bc.ThrottleBurst = int(*tc.Burst) - } - if tc.QPS != nil { - bc.ThrottleQPS = float32(*tc.QPS) - } - } else { - bc.ThrottleEnable = false - } - return bufferedplugin.NewBackend(delegate, *bc) -} diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/factory_test.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/factory_test.go deleted file mode 100644 index 3caa2c3a5dc..00000000000 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/factory_test.go +++ /dev/null @@ -1,146 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package dynamic - -import ( - "testing" - - "github.com/stretchr/testify/require" - - auditregv1alpha1 "k8s.io/api/auditregistration/v1alpha1" - utilpointer "k8s.io/utils/pointer" -) - -func TestToDelegate(t *testing.T) { - config, _ := defaultTestConfig() - defaultPolicy := auditregv1alpha1.Policy{ - Level: auditregv1alpha1.LevelMetadata, - } - u := "http://localhost:4444" - for _, tc := range []struct { - name string - auditConfig *auditregv1alpha1.AuditSink - throttleConfig *auditregv1alpha1.WebhookThrottleConfig - expectedBackend string - }{ - { - name: "build full", - auditConfig: &auditregv1alpha1.AuditSink{ - Spec: auditregv1alpha1.AuditSinkSpec{ - Policy: defaultPolicy, - Webhook: auditregv1alpha1.Webhook{ - Throttle: &auditregv1alpha1.WebhookThrottleConfig{ - QPS: utilpointer.Int64Ptr(10), - Burst: utilpointer.Int64Ptr(5), - }, - ClientConfig: auditregv1alpha1.WebhookClientConfig{ - URL: &u, - }, - }, - }, - }, - expectedBackend: "buffered>", - }, - { - name: "build no throttle", - auditConfig: &auditregv1alpha1.AuditSink{ - Spec: auditregv1alpha1.AuditSinkSpec{ - Policy: defaultPolicy, - Webhook: auditregv1alpha1.Webhook{ - ClientConfig: auditregv1alpha1.WebhookClientConfig{ - URL: &u, - }, - }, - }, - }, - expectedBackend: "buffered>", - }, - } { - t.Run(tc.name, func(t *testing.T) { - b, err := NewBackend(config) - require.NoError(t, err) - c := factory{ - config: b.(*backend).config, - webhookClientManager: b.(*backend).webhookClientManager, - sink: tc.auditConfig, - } - d, err := c.BuildDelegate() - require.NoError(t, err) - require.Equal(t, tc.expectedBackend, d.String()) - }) - } -} - -func TestBuildWebhookBackend(t *testing.T) { - defaultPolicy := auditregv1alpha1.Policy{ - Level: auditregv1alpha1.LevelMetadata, - } - config, _ := defaultTestConfig() - b, err := NewBackend(config) - require.NoError(t, err) - d := b.(*backend) - u := "http://localhost:4444" - for _, tc := range []struct { - name string - auditConfig *auditregv1alpha1.AuditSink - shouldErr bool - expectedBackend string - }{ - { - name: "build full", - auditConfig: &auditregv1alpha1.AuditSink{ - Spec: auditregv1alpha1.AuditSinkSpec{ - Policy: defaultPolicy, - Webhook: auditregv1alpha1.Webhook{ - ClientConfig: auditregv1alpha1.WebhookClientConfig{ - URL: &u, - }, - }, - }, - }, - expectedBackend: "dynamic_webhook", - shouldErr: false, - }, - { - name: "fail missing url", - auditConfig: &auditregv1alpha1.AuditSink{ - Spec: auditregv1alpha1.AuditSinkSpec{ - Policy: defaultPolicy, - Webhook: auditregv1alpha1.Webhook{ - ClientConfig: auditregv1alpha1.WebhookClientConfig{}, - }, - }, - }, - shouldErr: true, - }, - } { - t.Run(tc.name, func(t *testing.T) { - c := &factory{ - config: config, - webhookClientManager: d.webhookClientManager, - sink: tc.auditConfig, - } - ab, err := c.buildWebhookBackend() - if tc.shouldErr { - require.Error(t, err) - return - } - require.NoError(t, err) - require.Equal(t, tc.expectedBackend, ab.String()) - }) - } -} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go b/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go index 9dd8d7021f7..b35b955f70c 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go @@ -89,7 +89,6 @@ func NewDefaultOptions(out, err io.Writer) *AggregatorOptions { RecommendedOptions: genericoptions.NewRecommendedOptions( defaultEtcdPathPrefix, aggregatorscheme.Codecs.LegacyCodec(v1beta1.SchemeGroupVersion), - genericoptions.NewProcessInfo("kube-aggregator", "kube-system"), ), APIEnablement: genericoptions.NewAPIEnablementOptions(), diff --git a/staging/src/k8s.io/sample-apiserver/pkg/cmd/server/start.go b/staging/src/k8s.io/sample-apiserver/pkg/cmd/server/start.go index 37533a27f25..9b229b88f6e 100644 --- a/staging/src/k8s.io/sample-apiserver/pkg/cmd/server/start.go +++ b/staging/src/k8s.io/sample-apiserver/pkg/cmd/server/start.go @@ -58,7 +58,6 @@ func NewWardleServerOptions(out, errOut io.Writer) *WardleServerOptions { RecommendedOptions: genericoptions.NewRecommendedOptions( defaultEtcdPathPrefix, apiserver.Codecs.LegacyCodec(v1alpha1.SchemeGroupVersion), - genericoptions.NewProcessInfo("wardle-apiserver", "wardle"), ), StdOut: out, diff --git a/test/e2e/auth/BUILD b/test/e2e/auth/BUILD index 91aeabb0312..2befbae484b 100644 --- a/test/e2e/auth/BUILD +++ b/test/e2e/auth/BUILD @@ -4,7 +4,6 @@ go_library( name = "go_default_library", srcs = [ "audit.go", - "audit_dynamic.go", "certificates.go", "framework.go", "metadata_concealment.go", @@ -21,7 +20,6 @@ go_library( "//pkg/security/podsecuritypolicy/util:go_default_library", "//plugin/pkg/admission/serviceaccount:go_default_library", "//staging/src/k8s.io/api/apps/v1:go_default_library", - "//staging/src/k8s.io/api/auditregistration/v1alpha1:go_default_library", "//staging/src/k8s.io/api/authentication/v1:go_default_library", "//staging/src/k8s.io/api/batch/v1:go_default_library", "//staging/src/k8s.io/api/certificates/v1beta1:go_default_library", @@ -35,7 +33,6 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", diff --git a/test/e2e/auth/audit_dynamic.go b/test/e2e/auth/audit_dynamic.go deleted file mode 100644 index b9fa0ebdedf..00000000000 --- a/test/e2e/auth/audit_dynamic.go +++ /dev/null @@ -1,382 +0,0 @@ -/* -Copyright 2019 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package auth - -import ( - "context" - "fmt" - "strings" - "time" - - "github.com/onsi/ginkgo" - - auditregistrationv1alpha1 "k8s.io/api/auditregistration/v1alpha1" - v1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/apimachinery/pkg/util/wait" - auditinternal "k8s.io/apiserver/pkg/apis/audit" - auditv1 "k8s.io/apiserver/pkg/apis/audit/v1" - clientset "k8s.io/client-go/kubernetes" - restclient "k8s.io/client-go/rest" - "k8s.io/kubernetes/test/e2e/framework" - e2eauth "k8s.io/kubernetes/test/e2e/framework/auth" - e2epod "k8s.io/kubernetes/test/e2e/framework/pod" - "k8s.io/kubernetes/test/utils" - imageutils "k8s.io/kubernetes/test/utils/image" -) - -var _ = SIGDescribe("[Feature:DynamicAudit]", func() { - f := framework.NewDefaultFramework("audit") - - ginkgo.It("should dynamically audit API calls", func() { - namespace := f.Namespace.Name - - ginkgo.By("Creating a kubernetes client that impersonates an unauthorized anonymous user") - config, err := framework.LoadConfig() - framework.ExpectNoError(err, "failed to fetch config") - - config.Impersonate = restclient.ImpersonationConfig{ - UserName: "system:anonymous", - Groups: []string{"system:unauthenticated"}, - } - anonymousClient, err := clientset.NewForConfig(config) - framework.ExpectNoError(err, "failed to create the anonymous client") - - _, err = f.ClientSet.CoreV1().Namespaces().Create(context.TODO(), &v1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: "audit", - }, - }, metav1.CreateOptions{}) - framework.ExpectNoError(err, "failed to create namespace") - - _, err = f.ClientSet.CoreV1().Pods(namespace).Create(context.TODO(), &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "audit-proxy", - Labels: map[string]string{ - "app": "audit", - }, - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "proxy", - Image: imageutils.GetE2EImage(imageutils.Agnhost), - Args: []string{"audit-proxy"}, - Ports: []v1.ContainerPort{ - { - ContainerPort: 8080, - }, - }, - }, - }, - }, - }, metav1.CreateOptions{}) - framework.ExpectNoError(err, "failed to create proxy pod") - - _, err = f.ClientSet.CoreV1().Services(namespace).Create(context.TODO(), &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "audit", - }, - Spec: v1.ServiceSpec{ - Ports: []v1.ServicePort{ - { - Port: 80, - TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 8080}, - }, - }, - Selector: map[string]string{ - "app": "audit", - }, - }, - }, metav1.CreateOptions{}) - framework.ExpectNoError(err, "failed to create proxy service") - - var podIP string - // get pod ip - err = wait.Poll(100*time.Millisecond, 10*time.Second, func() (done bool, err error) { - p, err := f.ClientSet.CoreV1().Pods(namespace).Get(context.TODO(), "audit-proxy", metav1.GetOptions{}) - if apierrors.IsNotFound(err) { - framework.Logf("waiting for audit-proxy pod to be present") - return false, nil - } else if err != nil { - return false, err - } - podIP = p.Status.PodIP - if podIP == "" { - framework.Logf("waiting for audit-proxy pod IP to be ready") - return false, nil - } - return true, nil - }) - framework.ExpectNoError(err, "timed out waiting for audit-proxy pod to be ready") - - podURL := fmt.Sprintf("http://%s:8080", podIP) - // create audit sink - sink := auditregistrationv1alpha1.AuditSink{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - }, - Spec: auditregistrationv1alpha1.AuditSinkSpec{ - Policy: auditregistrationv1alpha1.Policy{ - Level: auditregistrationv1alpha1.LevelRequestResponse, - Stages: []auditregistrationv1alpha1.Stage{ - auditregistrationv1alpha1.StageRequestReceived, - auditregistrationv1alpha1.StageResponseStarted, - auditregistrationv1alpha1.StageResponseComplete, - auditregistrationv1alpha1.StagePanic, - }, - }, - Webhook: auditregistrationv1alpha1.Webhook{ - ClientConfig: auditregistrationv1alpha1.WebhookClientConfig{ - URL: &podURL, - }, - }, - }, - } - - _, err = f.ClientSet.AuditregistrationV1alpha1().AuditSinks().Create(context.TODO(), &sink, metav1.CreateOptions{}) - framework.ExpectNoError(err, "failed to create audit sink") - framework.Logf("created audit sink") - - // check that we are receiving logs in the proxy - err = wait.Poll(100*time.Millisecond, 10*time.Second, func() (done bool, err error) { - logs, err := e2epod.GetPodLogs(f.ClientSet, namespace, "audit-proxy", "proxy") - if err != nil { - framework.Logf("waiting for audit-proxy pod logs to be available") - return false, nil - } - if logs == "" { - framework.Logf("waiting for audit-proxy pod logs to be non-empty") - return false, nil - } - return true, nil - }) - framework.ExpectNoError(err, "failed to get logs from audit-proxy pod") - - auditTestUser = "kubernetes-admin" - testCases := []struct { - action func() - events []utils.AuditEvent - }{ - // Create, get, update, patch, delete, list, watch pods. - // TODO(@pbarker): dedupe this with the main audit test once policy functionality is available - // https://github.com/kubernetes/kubernetes/issues/70818 - { - func() { - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "audit-pod", - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{{ - Name: "pause", - Image: imageutils.GetPauseImageName(), - }}, - }, - } - updatePod := func(pod *v1.Pod) {} - - f.PodClient().CreateSync(pod) - - _, err := f.PodClient().Get(context.TODO(), pod.Name, metav1.GetOptions{}) - framework.ExpectNoError(err, "failed to get audit-pod") - - podChan, err := f.PodClient().Watch(context.TODO(), watchOptions) - framework.ExpectNoError(err, "failed to create watch for pods") - for range podChan.ResultChan() { - } - - f.PodClient().Update(pod.Name, updatePod) - - _, err = f.PodClient().List(context.TODO(), metav1.ListOptions{}) - framework.ExpectNoError(err, "failed to list pods") - - _, err = f.PodClient().Patch(context.TODO(), pod.Name, types.JSONPatchType, patch, metav1.PatchOptions{}) - framework.ExpectNoError(err, "failed to patch pod") - - f.PodClient().DeleteSync(pod.Name, metav1.DeleteOptions{}, framework.DefaultPodDeletionTimeout) - }, - []utils.AuditEvent{ - { - Level: auditinternal.LevelRequestResponse, - Stage: auditinternal.StageResponseComplete, - RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/pods", namespace), - Verb: "create", - Code: 201, - User: auditTestUser, - Resource: "pods", - Namespace: namespace, - RequestObject: true, - ResponseObject: true, - AuthorizeDecision: "allow", - }, { - Level: auditinternal.LevelRequestResponse, - Stage: auditinternal.StageResponseComplete, - RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/pods/audit-pod", namespace), - Verb: "get", - Code: 200, - User: auditTestUser, - Resource: "pods", - Namespace: namespace, - RequestObject: false, - ResponseObject: true, - AuthorizeDecision: "allow", - }, { - Level: auditinternal.LevelRequestResponse, - Stage: auditinternal.StageResponseComplete, - RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/pods", namespace), - Verb: "list", - Code: 200, - User: auditTestUser, - Resource: "pods", - Namespace: namespace, - RequestObject: false, - ResponseObject: true, - AuthorizeDecision: "allow", - }, { - Level: auditinternal.LevelRequestResponse, - Stage: auditinternal.StageResponseStarted, - RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/pods?timeout=%ds&timeoutSeconds=%d&watch=true", namespace, watchTestTimeout, watchTestTimeout), - Verb: "watch", - Code: 200, - User: auditTestUser, - Resource: "pods", - Namespace: namespace, - RequestObject: false, - ResponseObject: false, - AuthorizeDecision: "allow", - }, { - Level: auditinternal.LevelRequestResponse, - Stage: auditinternal.StageResponseComplete, - RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/pods?timeout=%ds&timeoutSeconds=%d&watch=true", namespace, watchTestTimeout, watchTestTimeout), - Verb: "watch", - Code: 200, - User: auditTestUser, - Resource: "pods", - Namespace: namespace, - RequestObject: false, - ResponseObject: false, - AuthorizeDecision: "allow", - }, { - Level: auditinternal.LevelRequestResponse, - Stage: auditinternal.StageResponseComplete, - RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/pods/audit-pod", namespace), - Verb: "update", - Code: 200, - User: auditTestUser, - Resource: "pods", - Namespace: namespace, - RequestObject: true, - ResponseObject: true, - AuthorizeDecision: "allow", - }, { - Level: auditinternal.LevelRequestResponse, - Stage: auditinternal.StageResponseComplete, - RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/pods/audit-pod", namespace), - Verb: "patch", - Code: 200, - User: auditTestUser, - Resource: "pods", - Namespace: namespace, - RequestObject: true, - ResponseObject: true, - AuthorizeDecision: "allow", - }, { - Level: auditinternal.LevelRequestResponse, - Stage: auditinternal.StageResponseComplete, - RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/pods/audit-pod", namespace), - Verb: "delete", - Code: 200, - User: auditTestUser, - Resource: "pods", - Namespace: namespace, - RequestObject: true, - ResponseObject: true, - AuthorizeDecision: "allow", - }, - }, - }, - } - - // test authorizer annotations, RBAC is required. - annotationTestCases := []struct { - action func() - events []utils.AuditEvent - }{ - - // get a pod with unauthorized user - { - func() { - _, err := anonymousClient.CoreV1().Pods(namespace).Get(context.TODO(), "another-audit-pod", metav1.GetOptions{}) - expectForbidden(err) - }, - []utils.AuditEvent{ - { - Level: auditinternal.LevelRequestResponse, - Stage: auditinternal.StageResponseComplete, - RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/pods/another-audit-pod", namespace), - Verb: "get", - Code: 403, - User: auditTestUser, - ImpersonatedUser: "system:anonymous", - ImpersonatedGroups: "system:unauthenticated", - Resource: "pods", - Namespace: namespace, - RequestObject: false, - ResponseObject: true, - AuthorizeDecision: "forbid", - }, - }, - }, - } - - if e2eauth.IsRBACEnabled(f.ClientSet.RbacV1()) { - testCases = append(testCases, annotationTestCases...) - } - expectedEvents := []utils.AuditEvent{} - for _, t := range testCases { - t.action() - expectedEvents = append(expectedEvents, t.events...) - } - - // The default flush timeout is 30 seconds, therefore it should be enough to retry once - // to find all expected events. However, we're waiting for 5 minutes to avoid flakes. - pollingInterval := 30 * time.Second - pollingTimeout := 5 * time.Minute - err = wait.Poll(pollingInterval, pollingTimeout, func() (bool, error) { - // Fetch the logs - logs, err := e2epod.GetPodLogs(f.ClientSet, namespace, "audit-proxy", "proxy") - if err != nil { - return false, err - } - reader := strings.NewReader(logs) - missingReport, err := utils.CheckAuditLines(reader, expectedEvents, auditv1.SchemeGroupVersion) - if err != nil { - framework.Logf("Failed to observe audit events: %v", err) - } else if len(missingReport.MissingEvents) > 0 { - framework.Logf(missingReport.String()) - } - return len(missingReport.MissingEvents) == 0, nil - }) - framework.ExpectNoError(err, "after %v failed to observe audit events", pollingTimeout) - err = f.ClientSet.AuditregistrationV1alpha1().AuditSinks().Delete(context.TODO(), "test", metav1.DeleteOptions{}) - framework.ExpectNoError(err, "could not delete audit configuration") - }) -}) diff --git a/test/integration/master/BUILD b/test/integration/master/BUILD index c2558f71e27..9acb4055e62 100644 --- a/test/integration/master/BUILD +++ b/test/integration/master/BUILD @@ -10,7 +10,6 @@ go_test( name = "go_default_test", size = "large", srcs = [ - "audit_dynamic_test.go", "audit_test.go", "crd_test.go", "graceful_shutdown_test.go", @@ -24,7 +23,6 @@ go_test( rundir = ".", tags = ["integration"], deps = [ - "//cmd/kube-apiserver/app/options:go_default_library", "//cmd/kube-apiserver/app/testing:go_default_library", "//pkg/master:go_default_library", "//staging/src/k8s.io/api/admission/v1beta1:go_default_library", @@ -50,17 +48,14 @@ go_test( "//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authorization/authorizerfactory:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/features:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/aes:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/tokentest:go_default_library", "//staging/src/k8s.io/client-go/dynamic:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", - "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library", "//test/integration:go_default_library", "//test/integration/etcd:go_default_library", @@ -68,7 +63,6 @@ go_test( "//test/utils:go_default_library", "//vendor/github.com/evanphx/json-patch:go_default_library", "//vendor/github.com/go-openapi/spec:go_default_library", - "//vendor/github.com/stretchr/testify/require:go_default_library", "//vendor/k8s.io/utils/pointer:go_default_library", "//vendor/sigs.k8s.io/yaml:go_default_library", ] + select({ diff --git a/test/integration/master/audit_dynamic_test.go b/test/integration/master/audit_dynamic_test.go deleted file mode 100644 index c1713035a1f..00000000000 --- a/test/integration/master/audit_dynamic_test.go +++ /dev/null @@ -1,282 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package master - -import ( - "context" - "fmt" - "sync" - "sync/atomic" - "testing" - "time" - - "github.com/stretchr/testify/require" - - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - auditinternal "k8s.io/apiserver/pkg/apis/audit" - "k8s.io/apiserver/pkg/features" - utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/client-go/kubernetes" - featuregatetesting "k8s.io/component-base/featuregate/testing" - "k8s.io/kubernetes/cmd/kube-apiserver/app/options" - "k8s.io/kubernetes/test/integration/framework" - "k8s.io/kubernetes/test/utils" -) - -// TestDynamicAudit ensures that v1alpha of the auditregistration api works -func TestDynamicAudit(t *testing.T) { - // start api server - stopCh := make(chan struct{}) - defer close(stopCh) - - defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicAuditing, true)() - kubeclient, _ := framework.StartTestServer(t, stopCh, framework.TestServerSetup{ - ModifyServerRunOptions: func(opts *options.ServerRunOptions) { - opts.Audit.DynamicOptions.Enabled = true - // set max batch size so the buffers flush immediately - opts.Audit.DynamicOptions.BatchConfig.MaxBatchSize = 1 - opts.APIEnablement.RuntimeConfig.Set("auditregistration.k8s.io/v1alpha1=true") - }, - }) - - // create test sinks - testServer1 := utils.NewAuditTestServer(t, "test1") - defer testServer1.Close() - testServer2 := utils.NewAuditTestServer(t, "test2") - defer testServer2.Close() - - // check that servers are healthy - require.NoError(t, testServer1.Health(), "server1 never became healthy") - require.NoError(t, testServer2.Health(), "server2 never became healthy") - - // build AuditSink configurations - sinkConfig1 := testServer1.BuildSinkConfiguration() - sinkConfig2 := testServer2.BuildSinkConfiguration() - - // test creates a single audit sink, generates audit events, and ensures they arrive at the server - success := t.Run("one sink", func(t *testing.T) { - _, err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Create(context.TODO(), sinkConfig1, metav1.CreateOptions{}) - require.NoError(t, err, "failed to create audit sink1") - t.Log("created audit sink1") - - // verify sink is ready - sinkHealth(t, kubeclient, testServer1) - - // perform configmap ops - configMapOperations(t, kubeclient) - - // check for corresponding events - missing, err := testServer1.WaitForEvents(expectedEvents) - require.NoError(t, err, "failed to match all expected events for server1, events %#v not found", missing) - }) - require.True(t, success) // propagate failure - - // test creates a second audit sink, generates audit events, and ensures events arrive in both servers - success = t.Run("two sink", func(t *testing.T) { - _, err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Create(context.TODO(), sinkConfig2, metav1.CreateOptions{}) - require.NoError(t, err, "failed to create audit sink2") - t.Log("created audit sink2") - - // verify both sinks are ready - sinkHealth(t, kubeclient, testServer1, testServer2) - - // perform configmap ops - configMapOperations(t, kubeclient) - - // check for corresponding events in both sinks - missing, err := testServer1.WaitForEvents(expectedEvents) - require.NoError(t, err, "failed to match all expected events for server1, events %#v not found", missing) - missing, err = testServer2.WaitForEvents(expectedEvents) - require.NoError(t, err, "failed to match all expected events for server2, events %#v not found", missing) - }) - require.True(t, success) // propagate failure - - // test deletes an audit sink, generates audit events, and ensures they don't arrive in the corresponding server - success = t.Run("delete sink", func(t *testing.T) { - err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Delete(context.TODO(), sinkConfig2.Name, metav1.DeleteOptions{}) - require.NoError(t, err, "failed to delete audit sink2") - t.Log("deleted audit sink2") - - var finalErr error - err = wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { - // reset event lists - testServer1.ResetEventList() - testServer2.ResetEventList() - - // perform configmap ops - configMapOperations(t, kubeclient) - - // check for corresponding events in server1 - missing, err := testServer1.WaitForEvents(expectedEvents) - if err != nil { - finalErr = fmt.Errorf("%v: failed to match all expected events for server1, events %#v not found", err, missing) - return false, nil - } - - // check that server2 is empty - if len(testServer2.GetEventList().Items) != 0 { - finalErr = fmt.Errorf("server2 event list should be empty") - return false, nil - } - return true, nil - }) - require.NoError(t, err, finalErr) - }) - require.True(t, success) // propagate failure - - // This test will run a background process that generates audit events sending them to a sink. - // Whilst that generation is occurring, the sink is updated to point to a different server. - // The test checks that no events are lost or duplicated during the update. - t.Run("update sink", func(t *testing.T) { - // fetch sink1 config - sink1, err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Get(context.TODO(), sinkConfig1.Name, metav1.GetOptions{}) - require.NoError(t, err) - - // reset event lists - testServer1.ResetEventList() - testServer2.ResetEventList() - - // run operations in background - stopChan := make(chan struct{}) - expectedEvents := &atomic.Value{} - expectedEvents.Store([]utils.AuditEvent{}) - wg := &sync.WaitGroup{} - wg.Add(1) - go asyncOps(stopChan, wg, kubeclient, expectedEvents) - - // check to see that at least 20 events have arrived in server1 - err = testServer1.WaitForNumEvents(20) - require.NoError(t, err, "failed to find enough events in server1") - - // check that no events are in server 2 yet - require.Len(t, testServer2.GetEventList().Items, 0, "server2 should not have events yet") - - // update the url - sink1.Spec.Webhook.ClientConfig.URL = &testServer2.Server.URL - _, err = kubeclient.AuditregistrationV1alpha1().AuditSinks().Update(context.TODO(), sink1, metav1.UpdateOptions{}) - require.NoError(t, err, "failed to update audit sink1") - t.Log("updated audit sink1 to point to server2") - - // check that at least 20 events have arrived in server2 - err = testServer2.WaitForNumEvents(20) - require.NoError(t, err, "failed to find enough events in server2") - - // stop the operations and ensure they have finished - close(stopChan) - wg.Wait() - - // check that the final events have arrived - expected := expectedEvents.Load().([]utils.AuditEvent) - missing, err := testServer2.WaitForEvents(expected[len(expected)-4:]) - require.NoError(t, err, "failed to find the final events in server2, events %#v not found", missing) - - // combine the event lists - el1 := testServer1.GetEventList() - el2 := testServer2.GetEventList() - combinedList := auditinternal.EventList{} - combinedList.Items = append(el1.Items, el2.Items...) - - // check that there are no duplicate events - dups, err := utils.CheckForDuplicates(combinedList) - require.NoError(t, err, "duplicate events found: %#v", dups) - - // check that no events are missing - missing, err = utils.CheckAuditList(combinedList, expected) - require.NoError(t, err, "failed to match all expected events: %#v not found", missing) - }) -} - -// sinkHealth checks if sinks are running by verifying that uniquely identified events are found -// in the given servers -func sinkHealth(t *testing.T, kubeclient kubernetes.Interface, servers ...*utils.AuditTestServer) { - var missing []utils.AuditEvent - i := 0 - var finalErr error - err := wait.PollImmediate(50*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { - i++ - name := fmt.Sprintf("health-%d-%d", i, time.Now().UnixNano()) - expected, err := simpleOp(name, kubeclient) - require.NoError(t, err, "could not perform config map operations") - - // check that all given servers have received events - for _, server := range servers { - missing, err = server.WaitForEvents(expected) - if err != nil { - finalErr = fmt.Errorf("not all events found in %s health check: missing %#v", server.Name, missing) - return false, nil - } - server.ResetEventList() - } - return true, nil - }) - require.NoError(t, err, finalErr) -} - -// simpleOp is a function that simply tries to get a configmap with the given name and returns the -// corresponding expected audit event -func simpleOp(name string, kubeclient kubernetes.Interface) ([]utils.AuditEvent, error) { - _, err := kubeclient.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{}) - if err != nil && !apierrors.IsNotFound(err) { - return nil, err - } - - expectedEvents := []utils.AuditEvent{ - { - Level: auditinternal.LevelRequestResponse, - Stage: auditinternal.StageResponseComplete, - RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/configmaps/%s", namespace, name), - Verb: "get", - Code: 404, - User: auditTestUser, - Resource: "configmaps", - Namespace: namespace, - RequestObject: false, - ResponseObject: true, - AuthorizeDecision: "allow", - }, - } - return expectedEvents, nil -} - -// asyncOps runs the simpleOp function until the stopChan is closed updating -// the expected atomic events list -func asyncOps( - stopChan <-chan struct{}, - wg *sync.WaitGroup, - kubeclient kubernetes.Interface, - expected *atomic.Value, -) { - for i := 0; ; i++ { - select { - case <-stopChan: - wg.Done() - return - default: - name := fmt.Sprintf("health-%d-%d", i, time.Now().UnixNano()) - exp, err := simpleOp(name, kubeclient) - if err != nil { - // retry on errors - continue - } - e := expected.Load().([]utils.AuditEvent) - evList := append(e, exp...) - expected.Store(evList) - } - } -}