remove dynamic audit

This commit is contained in:
David Eads 2020-05-27 14:04:09 -04:00
parent 356c121c6d
commit ed4e6f1026
28 changed files with 14 additions and 2289 deletions

View File

@ -60,7 +60,6 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered:go_default_library",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic:go_default_library",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/component-base/cli/flag:go_default_library",

View File

@ -30,7 +30,6 @@ import (
apiserveroptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/storage/storagebackend"
auditbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered"
auditdynamic "k8s.io/apiserver/plugin/pkg/audit/dynamic"
audittruncate "k8s.io/apiserver/plugin/pkg/audit/truncate"
restclient "k8s.io/client-go/rest"
cliflag "k8s.io/component-base/cli/flag"
@ -252,9 +251,6 @@ func TestAddFlags(t *testing.T) {
InitialBackoff: 2 * time.Second,
GroupVersionString: "audit.k8s.io/v1alpha1",
},
DynamicOptions: apiserveroptions.AuditDynamicOptions{
BatchConfig: auditdynamic.NewDefaultWebhookBatchConfig(),
},
PolicyFile: "/policy",
},
Features: &apiserveroptions.FeatureOptions{

View File

@ -505,29 +505,17 @@ func buildGenericConfig(
genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName)
}
lastErr = s.Audit.ApplyTo(genericConfig)
if lastErr != nil {
return
}
admissionConfig := &kubeapiserveradmission.Config{
ExternalInformers: versionedInformers,
LoopbackClientConfig: genericConfig.LoopbackClientConfig,
CloudConfigFile: s.CloudProvider.CloudConfigFile,
}
serviceResolver = buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)
authInfoResolverWrapper := webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, genericConfig.EgressSelector, genericConfig.LoopbackClientConfig)
lastErr = s.Audit.ApplyTo(
genericConfig,
genericConfig.LoopbackClientConfig,
versionedInformers,
serveroptions.NewProcessInfo("kube-apiserver", "kube-system"),
&serveroptions.WebhookOptions{
AuthInfoResolverWrapper: authInfoResolverWrapper,
ServiceResolver: serviceResolver,
},
)
if lastErr != nil {
return
}
pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, genericConfig.EgressSelector, serviceResolver)
if err != nil {
lastErr = fmt.Errorf("failed to create admission plugin initializer: %v", err)

View File

@ -669,7 +669,6 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
genericfeatures.StreamingProxyRedirects: {Default: true, PreRelease: featuregate.Deprecated},
genericfeatures.ValidateProxyRedirects: {Default: true, PreRelease: featuregate.Beta},
genericfeatures.AdvancedAuditing: {Default: true, PreRelease: featuregate.GA},
genericfeatures.DynamicAuditing: {Default: false, PreRelease: featuregate.Alpha},
genericfeatures.APIResponseCompression: {Default: true, PreRelease: featuregate.Beta},
genericfeatures.APIListChunking: {Default: true, PreRelease: featuregate.Beta},
genericfeatures.DryRun: {Default: true, PreRelease: featuregate.GA},

View File

@ -24,7 +24,7 @@ import (
"github.com/spf13/pflag"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apiextensions-apiserver/pkg/apiserver"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@ -54,7 +54,6 @@ func NewCustomResourceDefinitionsServerOptions(out, errOut io.Writer) *CustomRes
RecommendedOptions: genericoptions.NewRecommendedOptions(
defaultEtcdPathPrefix,
apiserver.Codecs.LegacyCodec(v1beta1.SchemeGroupVersion, v1.SchemeGroupVersion),
genericoptions.NewProcessInfo("apiextensions-apiserver", "kube-system"),
),
APIEnablement: genericoptions.NewAPIEnablementOptions(),

View File

@ -59,13 +59,6 @@ const (
// audited.
AdvancedAuditing featuregate.Feature = "AdvancedAuditing"
// owner: @pbarker
// alpha: v1.13
//
// DynamicAuditing enables configuration of audit policy and webhook backends through an
// AuditSink API object.
DynamicAuditing featuregate.Feature = "DynamicAuditing"
// owner: @ilackams
// alpha: v1.7
//
@ -163,7 +156,6 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
StreamingProxyRedirects: {Default: true, PreRelease: featuregate.Deprecated},
ValidateProxyRedirects: {Default: true, PreRelease: featuregate.Beta},
AdvancedAuditing: {Default: true, PreRelease: featuregate.GA},
DynamicAuditing: {Default: false, PreRelease: featuregate.Alpha},
APIResponseCompression: {Default: true, PreRelease: featuregate.Beta},
APIListChunking: {Default: true, PreRelease: featuregate.Beta},
DryRun: {Default: true, PreRelease: featuregate.GA},

View File

@ -14,7 +14,6 @@ go_library(
"doc.go",
"egress_selector.go",
"etcd.go",
"events.go",
"feature.go",
"recommended.go",
"server_run_options.go",
@ -22,13 +21,11 @@ go_library(
"serving_unix.go",
"serving_windows.go",
"serving_with_loopback.go",
"webhook.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/server/options",
importpath = "k8s.io/apiserver/pkg/server/options",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
@ -72,16 +69,12 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/webhook:go_default_library",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered:go_default_library",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic:go_default_library",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced:go_default_library",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/log:go_default_library",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate:go_default_library",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/tools/clientcmd:go_default_library",
"//staging/src/k8s.io/client-go/util/cert:go_default_library",
@ -158,17 +151,12 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/apis/audit/v1:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/authenticatorfactory:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/request/headerrequest:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/discovery:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/tools/clientcmd/api/v1:go_default_library",
"//staging/src/k8s.io/component-base/cli/flag:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
"//vendor/github.com/spf13/pflag:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",

View File

@ -27,7 +27,6 @@ import (
"gopkg.in/natefinch/lumberjack.v2"
"k8s.io/klog/v2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
utilnet "k8s.io/apimachinery/pkg/util/net"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
@ -36,20 +35,12 @@ import (
auditv1beta1 "k8s.io/apiserver/pkg/apis/audit/v1beta1"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/audit/policy"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/egressselector"
utilfeature "k8s.io/apiserver/pkg/util/feature"
pluginbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered"
plugindynamic "k8s.io/apiserver/plugin/pkg/audit/dynamic"
pluginenforced "k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced"
pluginlog "k8s.io/apiserver/plugin/pkg/audit/log"
plugintruncate "k8s.io/apiserver/plugin/pkg/audit/truncate"
pluginwebhook "k8s.io/apiserver/plugin/pkg/audit/webhook"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
)
const (
@ -80,7 +71,6 @@ type AuditOptions struct {
// Plugin options
LogOptions AuditLogOptions
WebhookOptions AuditWebhookOptions
DynamicOptions AuditDynamicOptions
}
const (
@ -180,10 +170,6 @@ func NewAuditOptions() *AuditOptions {
TruncateOptions: NewAuditTruncateOptions(),
GroupVersionString: "audit.k8s.io/v1",
},
DynamicOptions: AuditDynamicOptions{
Enabled: false,
BatchConfig: plugindynamic.NewDefaultWebhookBatchConfig(),
},
}
}
@ -206,7 +192,6 @@ func (o *AuditOptions) Validate() []error {
var allErrors []error
allErrors = append(allErrors, o.LogOptions.Validate()...)
allErrors = append(allErrors, o.WebhookOptions.Validate()...)
allErrors = append(allErrors, o.DynamicOptions.Validate()...)
return allErrors
}
@ -286,15 +271,10 @@ func (o *AuditOptions) AddFlags(fs *pflag.FlagSet) {
o.WebhookOptions.AddFlags(fs)
o.WebhookOptions.BatchOptions.AddFlags(pluginwebhook.PluginName, fs)
o.WebhookOptions.TruncateOptions.AddFlags(pluginwebhook.PluginName, fs)
o.DynamicOptions.AddFlags(fs)
}
func (o *AuditOptions) ApplyTo(
c *server.Config,
kubeClientConfig *restclient.Config,
informers informers.SharedInformerFactory,
processInfo *ProcessInfo,
webhookOptions *WebhookOptions,
) error {
if o == nil {
return nil
@ -347,23 +327,7 @@ func (o *AuditOptions) ApplyTo(
// 4. Apply dynamic options.
var dynamicBackend audit.Backend
if o.DynamicOptions.enabled() {
// if dynamic is enabled the webhook and log backends need to be wrapped in an enforced backend with the static policy
if webhookBackend != nil {
webhookBackend = pluginenforced.NewBackend(webhookBackend, checker)
}
if logBackend != nil {
logBackend = pluginenforced.NewBackend(logBackend, checker)
}
// build dynamic backend
dynamicBackend, checker, err = o.DynamicOptions.newBackend(c.ExternalAddress, kubeClientConfig, informers, processInfo, webhookOptions)
if err != nil {
return err
}
// union dynamic and webhook backends so that truncate options can be applied to both
dynamicBackend = appendBackend(webhookBackend, dynamicBackend)
dynamicBackend = o.WebhookOptions.TruncateOptions.wrapBackend(dynamicBackend, groupVersion)
} else if webhookBackend != nil {
// if only webhook is enabled wrap it in the truncate options
dynamicBackend = o.WebhookOptions.TruncateOptions.wrapBackend(webhookBackend, groupVersion)
}
@ -610,66 +574,6 @@ func (o *AuditWebhookOptions) newUntruncatedBackend(customDial utilnet.DialFunc)
return webhook, nil
}
func (o *AuditDynamicOptions) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&o.Enabled, "audit-dynamic-configuration", o.Enabled,
"Enables dynamic audit configuration. This feature also requires the DynamicAuditing feature flag")
}
func (o *AuditDynamicOptions) enabled() bool {
return o.Enabled && utilfeature.DefaultFeatureGate.Enabled(features.DynamicAuditing)
}
func (o *AuditDynamicOptions) Validate() []error {
var allErrors []error
if o.Enabled && !utilfeature.DefaultFeatureGate.Enabled(features.DynamicAuditing) {
allErrors = append(allErrors, fmt.Errorf("--audit-dynamic-configuration set, but DynamicAuditing feature gate is not enabled"))
}
return allErrors
}
func (o *AuditDynamicOptions) newBackend(
hostname string,
kubeClientConfig *restclient.Config,
informers informers.SharedInformerFactory,
processInfo *ProcessInfo,
webhookOptions *WebhookOptions,
) (audit.Backend, policy.Checker, error) {
if err := validateProcessInfo(processInfo); err != nil {
return nil, nil, err
}
clientset, err := kubernetes.NewForConfig(kubeClientConfig)
if err != nil {
return nil, nil, err
}
if webhookOptions == nil {
webhookOptions = NewWebhookOptions()
}
checker := policy.NewDynamicChecker()
informer := informers.Auditregistration().V1alpha1().AuditSinks()
eventSink := &v1core.EventSinkImpl{Interface: clientset.CoreV1().Events(processInfo.Namespace)}
dc := &plugindynamic.Config{
Informer: informer,
BufferedConfig: o.BatchConfig,
EventConfig: plugindynamic.EventConfig{
Sink: eventSink,
Source: corev1.EventSource{
Component: processInfo.Name,
Host: hostname,
},
},
WebhookConfig: plugindynamic.WebhookConfig{
AuthInfoResolverWrapper: webhookOptions.AuthInfoResolverWrapper,
ServiceResolver: webhookOptions.ServiceResolver,
},
}
backend, err := plugindynamic.NewBackend(dc)
if err != nil {
return nil, nil, fmt.Errorf("could not create dynamic audit backend: %v", err)
}
return backend, checker, nil
}
// defaultWebhookBatchConfig returns the default BatchConfig used by the Webhook backend.
func defaultWebhookBatchConfig() pluginbuffered.BatchConfig {
return pluginbuffered.BatchConfig{

View File

@ -23,20 +23,13 @@ import (
"os"
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/server"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd/api/v1"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"github.com/spf13/pflag"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
"k8s.io/apiserver/pkg/server"
v1 "k8s.io/client-go/tools/clientcmd/api/v1"
)
func TestAuditValidOptions(t *testing.T) {
@ -46,12 +39,6 @@ func TestAuditValidOptions(t *testing.T) {
policy := makeTmpPolicy(t)
defer os.Remove(policy)
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicAuditing, true)()
clientConfig := &restclient.Config{}
informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0)
processInfo := &ProcessInfo{"test", "test"}
testCases := []struct {
name string
options func() *AuditOptions
@ -135,56 +122,6 @@ func TestAuditValidOptions(t *testing.T) {
return o
},
expected: "truncate<buffered<webhook>>",
}, {
name: "dynamic",
options: func() *AuditOptions {
o := NewAuditOptions()
o.DynamicOptions.Enabled = true
return o
},
expected: "dynamic[]",
}, {
name: "dynamic with truncating",
options: func() *AuditOptions {
o := NewAuditOptions()
o.DynamicOptions.Enabled = true
o.WebhookOptions.TruncateOptions.Enabled = true
return o
},
expected: "truncate<dynamic[]>",
}, {
name: "dynamic with log",
options: func() *AuditOptions {
o := NewAuditOptions()
o.DynamicOptions.Enabled = true
o.LogOptions.Path = "/audit"
o.PolicyFile = policy
return o
},
expected: "union[enforced<ignoreErrors<log>>,dynamic[]]",
}, {
name: "dynamic with truncating and webhook",
options: func() *AuditOptions {
o := NewAuditOptions()
o.DynamicOptions.Enabled = true
o.WebhookOptions.TruncateOptions.Enabled = true
o.WebhookOptions.ConfigFile = webhookConfig
o.PolicyFile = policy
return o
},
expected: "truncate<union[enforced<buffered<webhook>>,dynamic[]]>",
}, {
name: "dynamic with truncating and webhook and log",
options: func() *AuditOptions {
o := NewAuditOptions()
o.DynamicOptions.Enabled = true
o.WebhookOptions.TruncateOptions.Enabled = true
o.WebhookOptions.ConfigFile = webhookConfig
o.PolicyFile = policy
o.LogOptions.Path = "/audit"
return o
},
expected: "union[enforced<ignoreErrors<log>>,truncate<union[enforced<buffered<webhook>>,dynamic[]]>]",
},
}
for _, tc := range testCases {
@ -200,7 +137,7 @@ func TestAuditValidOptions(t *testing.T) {
assert.Empty(t, options.Validate(), "Options should be valid.")
config := &server.Config{}
require.NoError(t, options.ApplyTo(config, clientConfig, informerFactory, processInfo, nil))
require.NoError(t, options.ApplyTo(config))
if tc.expected == "" {
assert.Nil(t, config.AuditBackend)
} else {
@ -275,13 +212,6 @@ func TestAuditInvalidOptions(t *testing.T) {
o.WebhookOptions.TruncateOptions.TruncateConfig.MaxBatchSize = 1
return o
},
}, {
name: "invalid dynamic flag group",
options: func() *AuditOptions {
o := NewAuditOptions()
o.DynamicOptions.Enabled = true
return o
},
},
}
for _, tc := range testCases {

View File

@ -1,56 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package options
import (
"fmt"
"os"
)
// ProcessInfo holds the apiserver process information used to send events
type ProcessInfo struct {
// Name of the api process to identify events
Name string
// Namespace of the api process to send events
Namespace string
}
// NewProcessInfo returns a new process info with the hostname concatenated to the name given
func NewProcessInfo(name, namespace string) *ProcessInfo {
// try to concat the hostname if available
host, _ := os.Hostname()
if host != "" {
name = fmt.Sprintf("%s-%s", name, host)
}
return &ProcessInfo{
Name: name,
Namespace: namespace,
}
}
// validateProcessInfo checks for a complete process info
func validateProcessInfo(p *ProcessInfo) error {
if p == nil {
return fmt.Errorf("ProcessInfo must be set")
} else if p.Name == "" {
return fmt.Errorf("ProcessInfo name must be set")
} else if p.Namespace == "" {
return fmt.Errorf("ProcessInfo namespace must be set")
}
return nil
}

View File

@ -48,14 +48,11 @@ type RecommendedOptions struct {
// admission plugin initializers to Admission.ApplyTo.
ExtraAdmissionInitializers func(c *server.RecommendedConfig) ([]admission.PluginInitializer, error)
Admission *AdmissionOptions
// ProcessInfo is used to identify events created by the server.
ProcessInfo *ProcessInfo
Webhook *WebhookOptions
// API Server Egress Selector is used to control outbound traffic from the API Server
EgressSelector *EgressSelectorOptions
}
func NewRecommendedOptions(prefix string, codec runtime.Codec, processInfo *ProcessInfo) *RecommendedOptions {
func NewRecommendedOptions(prefix string, codec runtime.Codec) *RecommendedOptions {
sso := NewSecureServingOptions()
// We are composing recommended options for an aggregated api-server,
@ -78,8 +75,6 @@ func NewRecommendedOptions(prefix string, codec runtime.Codec, processInfo *Proc
FeatureGate: feature.DefaultFeatureGate,
ExtraAdmissionInitializers: func(c *server.RecommendedConfig) ([]admission.PluginInitializer, error) { return nil, nil },
Admission: NewAdmissionOptions(),
ProcessInfo: processInfo,
Webhook: NewWebhookOptions(),
EgressSelector: NewEgressSelectorOptions(),
}
}
@ -111,7 +106,7 @@ func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error {
if err := o.Authorization.ApplyTo(&config.Config.Authorization); err != nil {
return err
}
if err := o.Audit.ApplyTo(&config.Config, config.ClientConfig, config.SharedInformerFactory, o.ProcessInfo, o.Webhook); err != nil {
if err := o.Audit.ApplyTo(&config.Config); err != nil {
return err
}
if err := o.Features.ApplyTo(&config.Config); err != nil {

View File

@ -1,34 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package options
import (
utilwebhook "k8s.io/apiserver/pkg/util/webhook"
)
// WebhookOptions holds the outgoing webhook options
type WebhookOptions struct {
ServiceResolver utilwebhook.ServiceResolver
AuthInfoResolverWrapper utilwebhook.AuthenticationInfoResolverWrapper
}
// NewWebhookOptions returns the default options for outgoing webhooks
func NewWebhookOptions() *WebhookOptions {
return &WebhookOptions{
ServiceResolver: utilwebhook.NewDefaultServiceResolver(),
}
}

View File

@ -24,7 +24,6 @@ filegroup(
srcs = [
":package-srcs",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered:all-srcs",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic:all-srcs",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/fake:all-srcs",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/log:all-srcs",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/truncate:all-srcs",

View File

@ -1,76 +0,0 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"defaults.go",
"dynamic.go",
"factory.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/plugin/pkg/audit/dynamic",
importpath = "k8s.io/apiserver/plugin/pkg/audit/dynamic",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/api/auditregistration/v1alpha1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/audit/install:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/audit/v1:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/audit:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/audit/policy:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/audit/util:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/webhook:go_default_library",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered:go_default_library",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced:go_default_library",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook:go_default_library",
"//staging/src/k8s.io/client-go/informers/auditregistration/v1alpha1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = [
"dynamic_test.go",
"factory_test.go",
],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/api/auditregistration/v1alpha1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/audit/v1:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/audit:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/webhook:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -1,46 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamic
import (
"time"
bufferedplugin "k8s.io/apiserver/plugin/pkg/audit/buffered"
)
const (
// Default configuration values for ModeBatch when applied to a dynamic plugin
defaultBatchBufferSize = 5000 // Buffer up to 5000 events before starting discarding.
defaultBatchMaxSize = 400 // Only send up to 400 events at a time.
defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute.
defaultBatchThrottleQPS = 10 // Limit the send rate by 10 QPS.
defaultBatchThrottleBurst = 15 // Allow up to 15 QPS burst.
)
// NewDefaultWebhookBatchConfig returns new Batch Config objects populated by default values
// for dynamic webhooks
func NewDefaultWebhookBatchConfig() *bufferedplugin.BatchConfig {
return &bufferedplugin.BatchConfig{
BufferSize: defaultBatchBufferSize,
MaxBatchSize: defaultBatchMaxSize,
MaxBatchWait: defaultBatchMaxWait,
ThrottleEnable: true,
ThrottleQPS: defaultBatchThrottleQPS,
ThrottleBurst: defaultBatchThrottleBurst,
AsyncDelegate: true,
}
}

View File

@ -1,365 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamic
import (
"fmt"
"reflect"
"strings"
"sync"
"sync/atomic"
"k8s.io/klog/v2"
auditregv1alpha1 "k8s.io/api/auditregistration/v1alpha1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
auditinstall "k8s.io/apiserver/pkg/apis/audit/install"
auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
"k8s.io/apiserver/pkg/audit"
webhook "k8s.io/apiserver/pkg/util/webhook"
bufferedplugin "k8s.io/apiserver/plugin/pkg/audit/buffered"
auditinformer "k8s.io/client-go/informers/auditregistration/v1alpha1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
)
// PluginName is the name reported in error metrics.
const PluginName = "dynamic"
// Config holds the configuration for the dynamic backend
type Config struct {
// Informer for the audit sinks
Informer auditinformer.AuditSinkInformer
// EventConfig holds the configuration for event notifications about the AuditSink API objects
EventConfig EventConfig
// BufferedConfig is the runtime buffered configuration
BufferedConfig *bufferedplugin.BatchConfig
// WebhookConfig holds the configuration for outgoing webhooks
WebhookConfig WebhookConfig
}
// WebhookConfig holds the configurations for outgoing webhooks
type WebhookConfig struct {
// AuthInfoResolverWrapper provides the webhook authentication for in-cluster endpoints
AuthInfoResolverWrapper webhook.AuthenticationInfoResolverWrapper
// ServiceResolver knows how to convert a webhook service reference into an actual location.
ServiceResolver webhook.ServiceResolver
}
// EventConfig holds the configurations for sending event notifiations about AuditSink API objects
type EventConfig struct {
// Sink for emitting events
Sink record.EventSink
// Source holds the source information about the event emitter
Source corev1.EventSource
}
// delegate represents a delegate backend that was created from an audit sink configuration
type delegate struct {
audit.Backend
configuration *auditregv1alpha1.AuditSink
stopChan chan struct{}
}
// gracefulShutdown will gracefully shutdown the delegate
func (d *delegate) gracefulShutdown() {
close(d.stopChan)
d.Shutdown()
}
// NewBackend returns a backend that dynamically updates its configuration
// based on a shared informer.
func NewBackend(c *Config) (audit.Backend, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(c.EventConfig.Sink)
scheme := runtime.NewScheme()
err := auditregv1alpha1.AddToScheme(scheme)
if err != nil {
return nil, err
}
recorder := eventBroadcaster.NewRecorder(scheme, c.EventConfig.Source)
if c.BufferedConfig == nil {
c.BufferedConfig = NewDefaultWebhookBatchConfig()
}
cm, err := webhook.NewClientManager([]schema.GroupVersion{auditv1.SchemeGroupVersion}, func(s *runtime.Scheme) error {
auditinstall.Install(s)
return nil
})
if err != nil {
return nil, err
}
// TODO: need a way of injecting authentication before beta
authInfoResolver, err := webhook.NewDefaultAuthenticationInfoResolver("")
if err != nil {
return nil, err
}
cm.SetAuthenticationInfoResolver(authInfoResolver)
cm.SetServiceResolver(c.WebhookConfig.ServiceResolver)
cm.SetAuthenticationInfoResolverWrapper(c.WebhookConfig.AuthInfoResolverWrapper)
manager := &backend{
config: c,
delegates: atomic.Value{},
delegateUpdateMutex: sync.Mutex{},
stopped: false,
webhookClientManager: cm,
recorder: recorder,
}
manager.delegates.Store(syncedDelegates{})
c.Informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
manager.addSink(obj.(*auditregv1alpha1.AuditSink))
},
UpdateFunc: func(oldObj, newObj interface{}) {
manager.updateSink(oldObj.(*auditregv1alpha1.AuditSink), newObj.(*auditregv1alpha1.AuditSink))
},
DeleteFunc: func(obj interface{}) {
sink, ok := obj.(*auditregv1alpha1.AuditSink)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
klog.V(2).Infof("Couldn't get object from tombstone %#v", obj)
return
}
sink, ok = tombstone.Obj.(*auditregv1alpha1.AuditSink)
if !ok {
klog.V(2).Infof("Tombstone contained object that is not an AuditSink: %#v", obj)
return
}
}
manager.deleteSink(sink)
},
})
return manager, nil
}
type backend struct {
// delegateUpdateMutex holds an update lock on the delegates
delegateUpdateMutex sync.Mutex
stopped bool
config *Config
delegates atomic.Value
webhookClientManager webhook.ClientManager
recorder record.EventRecorder
}
type syncedDelegates map[types.UID]*delegate
// Names returns the names of the delegate configurations
func (s syncedDelegates) Names() []string {
names := []string{}
for _, delegate := range s {
names = append(names, delegate.configuration.Name)
}
return names
}
// ProcessEvents proccesses the given events per current delegate map
func (b *backend) ProcessEvents(events ...*auditinternal.Event) bool {
for _, d := range b.GetDelegates() {
d.ProcessEvents(events...)
}
// Returning true regardless of results, since dynamic audit backends
// can never cause apiserver request to fail.
return true
}
// Run starts a goroutine that propagates the shutdown signal,
// individual delegates are ran as they are created.
func (b *backend) Run(stopCh <-chan struct{}) error {
go func() {
<-stopCh
b.stopAllDelegates()
}()
return nil
}
// stopAllDelegates closes the stopChan for every delegate to enable
// goroutines to terminate gracefully. This is a helper method to propagate
// the primary stopChan to the current delegate map.
func (b *backend) stopAllDelegates() {
b.delegateUpdateMutex.Lock()
defer b.delegateUpdateMutex.Unlock()
if b.stopped {
return
}
b.stopped = true
for _, d := range b.GetDelegates() {
close(d.stopChan)
}
}
// Shutdown calls the shutdown method on all delegates. The stopChan should
// be closed before this is called.
func (b *backend) Shutdown() {
for _, d := range b.GetDelegates() {
d.Shutdown()
}
}
// GetDelegates retrieves current delegates in a safe manner
func (b *backend) GetDelegates() syncedDelegates {
return b.delegates.Load().(syncedDelegates)
}
// copyDelegates returns a copied delegate map
func (b *backend) copyDelegates() syncedDelegates {
c := make(syncedDelegates)
for u, s := range b.GetDelegates() {
c[u] = s
}
return c
}
// setDelegates sets the current delegates in a safe manner
func (b *backend) setDelegates(delegates syncedDelegates) {
b.delegates.Store(delegates)
}
// addSink is called by the shared informer when a sink is added
func (b *backend) addSink(sink *auditregv1alpha1.AuditSink) {
b.delegateUpdateMutex.Lock()
defer b.delegateUpdateMutex.Unlock()
if b.stopped {
msg := fmt.Sprintf("Could not add audit sink %q uid: %s. Update to all delegates is stopped.", sink.Name, sink.UID)
klog.Error(msg)
return
}
delegates := b.copyDelegates()
if _, ok := delegates[sink.UID]; ok {
klog.Errorf("Audit sink %q uid: %s already exists, could not readd", sink.Name, sink.UID)
return
}
d, err := b.createAndStartDelegate(sink)
if err != nil {
msg := fmt.Sprintf("Could not add audit sink %q: %v", sink.Name, err)
klog.Error(msg)
b.recorder.Event(sink, corev1.EventTypeWarning, "CreateFailed", msg)
return
}
delegates[sink.UID] = d
b.setDelegates(delegates)
klog.V(2).Infof("Added audit sink: %s", sink.Name)
klog.V(2).Infof("Current audit sinks: %v", delegates.Names())
}
// updateSink is called by the shared informer when a sink is updated.
// The new sink is only rebuilt on spec changes. The new sink must not have
// the same uid as the previous. The new sink will be started before the old
// one is shutdown so no events will be lost
func (b *backend) updateSink(oldSink, newSink *auditregv1alpha1.AuditSink) {
b.delegateUpdateMutex.Lock()
defer b.delegateUpdateMutex.Unlock()
if b.stopped {
msg := fmt.Sprintf("Could not update old audit sink %q to new audit sink %q. Update to all delegates is stopped.", oldSink.Name, newSink.Name)
klog.Error(msg)
return
}
delegates := b.copyDelegates()
oldDelegate, ok := delegates[oldSink.UID]
if !ok {
klog.Errorf("Could not update audit sink %q uid: %s, old sink does not exist",
oldSink.Name, oldSink.UID)
return
}
// check if spec has changed
eq := reflect.DeepEqual(oldSink.Spec, newSink.Spec)
if eq {
delete(delegates, oldSink.UID)
delegates[newSink.UID] = oldDelegate
b.setDelegates(delegates)
} else {
d, err := b.createAndStartDelegate(newSink)
if err != nil {
msg := fmt.Sprintf("Could not update audit sink %q: %v", oldSink.Name, err)
klog.Error(msg)
b.recorder.Event(newSink, corev1.EventTypeWarning, "UpdateFailed", msg)
return
}
delete(delegates, oldSink.UID)
delegates[newSink.UID] = d
b.setDelegates(delegates)
// graceful shutdown in goroutine as to not block
go oldDelegate.gracefulShutdown()
}
klog.V(2).Infof("Updated audit sink: %s", newSink.Name)
klog.V(2).Infof("Current audit sinks: %v", delegates.Names())
}
// deleteSink is called by the shared informer when a sink is deleted
func (b *backend) deleteSink(sink *auditregv1alpha1.AuditSink) {
b.delegateUpdateMutex.Lock()
defer b.delegateUpdateMutex.Unlock()
if b.stopped {
msg := fmt.Sprintf("Could not delete audit sink %q uid: %s. Update to all delegates is stopped.", sink.Name, sink.UID)
klog.Warning(msg)
return
}
delegates := b.copyDelegates()
delegate, ok := delegates[sink.UID]
if !ok {
klog.Errorf("Could not delete audit sink %q uid: %s, does not exist", sink.Name, sink.UID)
return
}
delete(delegates, sink.UID)
b.setDelegates(delegates)
// graceful shutdown in goroutine as to not block
go delegate.gracefulShutdown()
klog.V(2).Infof("Deleted audit sink: %s", sink.Name)
klog.V(2).Infof("Current audit sinks: %v", delegates.Names())
}
// createAndStartDelegate will build a delegate from an audit sink configuration and run it
func (b *backend) createAndStartDelegate(sink *auditregv1alpha1.AuditSink) (*delegate, error) {
f := factory{
config: b.config,
webhookClientManager: b.webhookClientManager,
sink: sink,
}
delegate, err := f.BuildDelegate()
if err != nil {
return nil, err
}
err = delegate.Run(delegate.stopChan)
if err != nil {
return nil, err
}
return delegate, nil
}
// String returns a string representation of the backend
func (b *backend) String() string {
var delegateStrings []string
for _, delegate := range b.GetDelegates() {
delegateStrings = append(delegateStrings, fmt.Sprintf("%s", delegate))
}
return fmt.Sprintf("%s[%s]", PluginName, strings.Join(delegateStrings, ","))
}

View File

@ -1,319 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamic
import (
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"reflect"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/require"
auditregv1alpha1 "k8s.io/api/auditregistration/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
"k8s.io/apiserver/pkg/audit"
webhook "k8s.io/apiserver/pkg/util/webhook"
informers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
)
func TestDynamic(t *testing.T) {
eventList1 := &atomic.Value{}
eventList1.Store(auditinternal.EventList{})
eventList2 := &atomic.Value{}
eventList2.Store(auditinternal.EventList{})
// start test servers
server1 := httptest.NewServer(buildTestHandler(t, eventList1))
defer server1.Close()
server2 := httptest.NewServer(buildTestHandler(t, eventList2))
defer server2.Close()
testPolicy := auditregv1alpha1.Policy{
Level: auditregv1alpha1.LevelMetadata,
Stages: []auditregv1alpha1.Stage{
auditregv1alpha1.StageResponseStarted,
},
}
testEvent := auditinternal.Event{
Level: auditinternal.LevelMetadata,
Stage: auditinternal.StageResponseStarted,
Verb: "get",
RequestURI: "/test/path",
}
testConfig1 := &auditregv1alpha1.AuditSink{
ObjectMeta: metav1.ObjectMeta{
Name: "test1",
UID: types.UID("test1"),
},
Spec: auditregv1alpha1.AuditSinkSpec{
Policy: testPolicy,
Webhook: auditregv1alpha1.Webhook{
ClientConfig: auditregv1alpha1.WebhookClientConfig{
URL: &server1.URL,
},
},
},
}
testConfig2 := &auditregv1alpha1.AuditSink{
ObjectMeta: metav1.ObjectMeta{
Name: "test2",
UID: types.UID("test2"),
},
Spec: auditregv1alpha1.AuditSinkSpec{
Policy: testPolicy,
Webhook: auditregv1alpha1.Webhook{
ClientConfig: auditregv1alpha1.WebhookClientConfig{
URL: &server2.URL,
},
},
},
}
badURL := "http://badtest"
badConfig := &auditregv1alpha1.AuditSink{
ObjectMeta: metav1.ObjectMeta{
Name: "bad",
UID: types.UID("bad"),
},
Spec: auditregv1alpha1.AuditSinkSpec{
Policy: testPolicy,
Webhook: auditregv1alpha1.Webhook{
ClientConfig: auditregv1alpha1.WebhookClientConfig{
URL: &badURL,
},
},
},
}
config, stopChan := defaultTestConfig()
config.BufferedConfig.MaxBatchSize = 1
b, err := NewBackend(config)
require.NoError(t, err)
d := b.(*backend)
err = b.Run(stopChan)
require.NoError(t, err)
t.Run("find none", func(t *testing.T) {
require.Len(t, d.GetDelegates(), 0)
})
success := t.Run("find one", func(t *testing.T) {
d.addSink(testConfig1)
delegates := d.GetDelegates()
require.Len(t, delegates, 1)
require.Contains(t, delegates, types.UID("test1"))
require.Equal(t, testConfig1, delegates["test1"].configuration)
// send event and check that it arrives
b.ProcessEvents(&testEvent)
err := checkForEvent(eventList1, testEvent)
require.NoError(t, err, "unable to find events sent to sink")
})
require.True(t, success) // propagate failure
// test that a bad webhook configuration can be recovered from
success = t.Run("bad config", func(t *testing.T) {
d.addSink(badConfig)
delegates := d.GetDelegates()
require.Len(t, delegates, 2)
require.Contains(t, delegates, types.UID("bad"))
require.Equal(t, badConfig, delegates["bad"].configuration)
// send events to the buffer
b.ProcessEvents(&testEvent, &testEvent)
// event is in the buffer see if the sink can be deleted
// this will hang and fail if not handled properly
d.deleteSink(badConfig)
delegates = d.GetDelegates()
require.Len(t, delegates, 1)
require.Contains(t, delegates, types.UID("test1"))
require.Equal(t, testConfig1, delegates["test1"].configuration)
})
require.True(t, success) // propagate failure
success = t.Run("find two", func(t *testing.T) {
eventList1.Store(auditinternal.EventList{})
d.addSink(testConfig2)
delegates := d.GetDelegates()
require.Len(t, delegates, 2)
require.Contains(t, delegates, types.UID("test1"))
require.Contains(t, delegates, types.UID("test2"))
require.Equal(t, testConfig1, delegates["test1"].configuration)
require.Equal(t, testConfig2, delegates["test2"].configuration)
// send event to both delegates and check that it arrives in both places
b.ProcessEvents(&testEvent)
err := checkForEvent(eventList1, testEvent)
require.NoError(t, err, "unable to find events sent to sink 1")
err = checkForEvent(eventList2, testEvent)
require.NoError(t, err, "unable to find events sent to sink 2")
})
require.True(t, success) // propagate failure
success = t.Run("delete one", func(t *testing.T) {
eventList2.Store(auditinternal.EventList{})
d.deleteSink(testConfig1)
delegates := d.GetDelegates()
require.Len(t, delegates, 1)
require.Contains(t, delegates, types.UID("test2"))
require.Equal(t, testConfig2, delegates["test2"].configuration)
// send event and check that it arrives to remaining sink
b.ProcessEvents(&testEvent)
err := checkForEvent(eventList2, testEvent)
require.NoError(t, err, "unable to find events sent to sink")
})
require.True(t, success) // propagate failure
success = t.Run("update one", func(t *testing.T) {
eventList1.Store(auditinternal.EventList{})
oldConfig := *testConfig2
testConfig2.Spec.Webhook.ClientConfig.URL = &server1.URL
testConfig2.UID = types.UID("test2.1")
d.updateSink(&oldConfig, testConfig2)
delegates := d.GetDelegates()
require.Len(t, delegates, 1)
require.Contains(t, delegates, types.UID("test2.1"))
require.Equal(t, testConfig2, delegates["test2.1"].configuration)
// send event and check that it arrives to updated sink
b.ProcessEvents(&testEvent)
err := checkForEvent(eventList1, testEvent)
require.NoError(t, err, "unable to find events sent to sink")
})
require.True(t, success) // propagate failure
success = t.Run("update meta only", func(t *testing.T) {
eventList1.Store(auditinternal.EventList{})
oldConfig := *testConfig2
testConfig2.UID = types.UID("test2.2")
testConfig2.Labels = map[string]string{"my": "label"}
d.updateSink(&oldConfig, testConfig2)
delegates := d.GetDelegates()
require.Len(t, delegates, 1)
require.Contains(t, delegates, types.UID("test2.2"))
// send event and check that it arrives to same sink
b.ProcessEvents(&testEvent)
err := checkForEvent(eventList1, testEvent)
require.NoError(t, err, "unable to find events sent to sink")
})
require.True(t, success) // propagate failure
success = t.Run("shutdown", func(t *testing.T) {
// if the stop signal is not propagated correctly the buffers will not
// close down gracefully, and the shutdown method will hang causing
// the test will timeout.
timeoutChan := make(chan struct{})
successChan := make(chan struct{})
go func() {
time.Sleep(1 * time.Second)
timeoutChan <- struct{}{}
}()
go func() {
close(stopChan)
d.Shutdown()
successChan <- struct{}{}
}()
for {
select {
case <-timeoutChan:
t.Error("shutdown timed out")
return
case <-successChan:
return
}
}
})
require.True(t, success) // propagate failure
}
// checkForEvent will poll to check for an audit event in an atomic event list
func checkForEvent(a *atomic.Value, evSent auditinternal.Event) error {
return wait.Poll(100*time.Millisecond, 1*time.Second, func() (bool, error) {
el := a.Load().(auditinternal.EventList)
if len(el.Items) != 1 {
return false, nil
}
evFound := el.Items[0]
eq := reflect.DeepEqual(evSent, evFound)
if !eq {
return false, fmt.Errorf("event mismatch -- sent: %+v found: %+v", evSent, evFound)
}
return true, nil
})
}
// buildTestHandler returns a handler that will update the atomic value passed in
// with the event list it receives
func buildTestHandler(t *testing.T, a *atomic.Value) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
t.Fatalf("could not read request body: %v", err)
}
el := auditinternal.EventList{}
decoder := audit.Codecs.UniversalDecoder(auditv1.SchemeGroupVersion)
if err := runtime.DecodeInto(decoder, body, &el); err != nil {
t.Fatalf("failed decoding buf: %b, apiVersion: %s", body, auditv1.SchemeGroupVersion)
}
defer r.Body.Close()
a.Store(el)
w.WriteHeader(200)
})
}
// defaultTestConfig returns a Config object suitable for testing along with its
// associated stopChan
func defaultTestConfig() (*Config, chan struct{}) {
authWrapper := webhook.AuthenticationInfoResolverWrapper(
func(a webhook.AuthenticationInfoResolver) webhook.AuthenticationInfoResolver { return a },
)
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
stop := make(chan struct{})
eventSink := &v1core.EventSinkImpl{Interface: client.CoreV1().Events("")}
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
informer := informerFactory.Auditregistration().V1alpha1().AuditSinks()
return &Config{
Informer: informer,
EventConfig: EventConfig{Sink: eventSink},
BufferedConfig: NewDefaultWebhookBatchConfig(),
WebhookConfig: WebhookConfig{
AuthInfoResolverWrapper: authWrapper,
ServiceResolver: webhook.NewDefaultServiceResolver(),
},
}, stop
}

View File

@ -1,45 +0,0 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["enforced.go"],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced",
importpath = "k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/audit:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/audit/event:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/audit/policy:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["enforced_test.go"],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/api/authentication/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/audit/policy:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/fake:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -1,93 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package enforced
import (
"fmt"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/audit"
ev "k8s.io/apiserver/pkg/audit/event"
"k8s.io/apiserver/pkg/audit/policy"
)
// PluginName is the name reported in error metrics.
const PluginName = "enforced"
// Backend filters audit events according to the policy
// trimming them as necessary to match the level
type Backend struct {
policyChecker policy.Checker
delegateBackend audit.Backend
}
// NewBackend returns an enforced audit backend that wraps delegate backend.
// Enforced backend automatically runs and shuts down the delegate backend.
func NewBackend(delegate audit.Backend, p policy.Checker) audit.Backend {
return &Backend{
policyChecker: p,
delegateBackend: delegate,
}
}
// Run the delegate backend
func (b Backend) Run(stopCh <-chan struct{}) error {
return b.delegateBackend.Run(stopCh)
}
// Shutdown the delegate backend
func (b Backend) Shutdown() {
b.delegateBackend.Shutdown()
}
// ProcessEvents enforces policy on a shallow copy of the given event
// dropping any sections that don't conform
func (b Backend) ProcessEvents(events ...*auditinternal.Event) bool {
for _, event := range events {
if event == nil {
continue
}
attr, err := ev.NewAttributes(event)
if err != nil {
audit.HandlePluginError(PluginName, err, event)
continue
}
level, stages := b.policyChecker.LevelAndStages(attr)
if level == auditinternal.LevelNone {
continue
}
// make shallow copy before modifying to satisfy interface definition
ev := *event
e, err := policy.EnforcePolicy(&ev, level, stages)
if err != nil {
audit.HandlePluginError(PluginName, err, event)
continue
}
if e == nil {
continue
}
b.delegateBackend.ProcessEvents(e)
}
// Returning true regardless of results, since dynamic audit backends
// can never cause apiserver request to fail.
return true
}
// String returns a string representation of the backend
func (b Backend) String() string {
return fmt.Sprintf("%s<%s>", PluginName, b.delegateBackend)
}

View File

@ -1,118 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package enforced
import (
"testing"
"github.com/stretchr/testify/require"
authnv1 "k8s.io/api/authentication/v1"
"k8s.io/apimachinery/pkg/runtime"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/audit/policy"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/authorization/authorizer"
fakeplugin "k8s.io/apiserver/plugin/pkg/audit/fake"
)
func TestEnforced(t *testing.T) {
testCases := []struct {
name string
event *auditinternal.Event
policy auditinternal.Policy
attribs authorizer.Attributes
expected []*auditinternal.Event
}{
{
name: "enforce level",
event: &auditinternal.Event{
Level: auditinternal.LevelRequestResponse,
Stage: auditinternal.StageResponseComplete,
RequestURI: "/apis/extensions/v1beta1",
RequestObject: &runtime.Unknown{Raw: []byte(`test`)},
ResponseObject: &runtime.Unknown{Raw: []byte(`test`)},
},
policy: auditinternal.Policy{
Rules: []auditinternal.PolicyRule{
{
Level: auditinternal.LevelMetadata,
},
},
},
expected: []*auditinternal.Event{
{
Level: auditinternal.LevelMetadata,
Stage: auditinternal.StageResponseComplete,
RequestURI: "/apis/extensions/v1beta1",
},
},
},
{
name: "enforce policy rule",
event: &auditinternal.Event{
Level: auditinternal.LevelRequestResponse,
Stage: auditinternal.StageResponseComplete,
RequestURI: "/apis/extensions/v1beta1",
User: authnv1.UserInfo{
Username: user.Anonymous,
},
RequestObject: &runtime.Unknown{Raw: []byte(`test`)},
ResponseObject: &runtime.Unknown{Raw: []byte(`test`)},
},
policy: auditinternal.Policy{
Rules: []auditinternal.PolicyRule{
{
Level: auditinternal.LevelNone,
Users: []string{user.Anonymous},
},
{
Level: auditinternal.LevelMetadata,
},
},
},
expected: []*auditinternal.Event{},
},
{
name: "nil event",
event: nil,
policy: auditinternal.Policy{
Rules: []auditinternal.PolicyRule{
{
Level: auditinternal.LevelMetadata,
},
},
},
expected: []*auditinternal.Event{},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ev := []*auditinternal.Event{}
fakeBackend := fakeplugin.Backend{
OnRequest: func(events []*auditinternal.Event) {
ev = events
},
}
b := NewBackend(&fakeBackend, policy.NewChecker(&tc.policy))
defer b.Shutdown()
b.ProcessEvents(tc.event)
require.Equal(t, tc.expected, ev)
})
}
}

View File

@ -1,91 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamic
import (
"fmt"
"time"
auditregv1alpha1 "k8s.io/api/auditregistration/v1alpha1"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/audit/policy"
auditutil "k8s.io/apiserver/pkg/audit/util"
"k8s.io/apiserver/pkg/util/webhook"
bufferedplugin "k8s.io/apiserver/plugin/pkg/audit/buffered"
enforcedplugin "k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced"
webhookplugin "k8s.io/apiserver/plugin/pkg/audit/webhook"
)
// TODO: find a common place for all the default retry backoffs
const retryBackoff = 500 * time.Millisecond
// factory builds a delegate from an AuditSink
type factory struct {
config *Config
webhookClientManager webhook.ClientManager
sink *auditregv1alpha1.AuditSink
}
// BuildDelegate creates a delegate from the AuditSink object
func (f *factory) BuildDelegate() (*delegate, error) {
backend, err := f.buildWebhookBackend()
if err != nil {
return nil, err
}
backend = f.applyEnforcedOpts(backend)
backend = f.applyBufferedOpts(backend)
ch := make(chan struct{})
return &delegate{
Backend: backend,
configuration: f.sink,
stopChan: ch,
}, nil
}
func (f *factory) buildWebhookBackend() (audit.Backend, error) {
hookClient := auditutil.HookClientConfigForSink(f.sink)
client, err := f.webhookClientManager.HookClient(hookClient)
if err != nil {
return nil, fmt.Errorf("could not create webhook client: %v", err)
}
backend := webhookplugin.NewDynamicBackend(client, retryBackoff)
return backend, nil
}
func (f *factory) applyEnforcedOpts(delegate audit.Backend) audit.Backend {
pol := policy.ConvertDynamicPolicyToInternal(&f.sink.Spec.Policy)
checker := policy.NewChecker(pol)
eb := enforcedplugin.NewBackend(delegate, checker)
return eb
}
func (f *factory) applyBufferedOpts(delegate audit.Backend) audit.Backend {
bc := f.config.BufferedConfig
tc := f.sink.Spec.Webhook.Throttle
if tc != nil {
bc.ThrottleEnable = true
if tc.Burst != nil {
bc.ThrottleBurst = int(*tc.Burst)
}
if tc.QPS != nil {
bc.ThrottleQPS = float32(*tc.QPS)
}
} else {
bc.ThrottleEnable = false
}
return bufferedplugin.NewBackend(delegate, *bc)
}

View File

@ -1,146 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamic
import (
"testing"
"github.com/stretchr/testify/require"
auditregv1alpha1 "k8s.io/api/auditregistration/v1alpha1"
utilpointer "k8s.io/utils/pointer"
)
func TestToDelegate(t *testing.T) {
config, _ := defaultTestConfig()
defaultPolicy := auditregv1alpha1.Policy{
Level: auditregv1alpha1.LevelMetadata,
}
u := "http://localhost:4444"
for _, tc := range []struct {
name string
auditConfig *auditregv1alpha1.AuditSink
throttleConfig *auditregv1alpha1.WebhookThrottleConfig
expectedBackend string
}{
{
name: "build full",
auditConfig: &auditregv1alpha1.AuditSink{
Spec: auditregv1alpha1.AuditSinkSpec{
Policy: defaultPolicy,
Webhook: auditregv1alpha1.Webhook{
Throttle: &auditregv1alpha1.WebhookThrottleConfig{
QPS: utilpointer.Int64Ptr(10),
Burst: utilpointer.Int64Ptr(5),
},
ClientConfig: auditregv1alpha1.WebhookClientConfig{
URL: &u,
},
},
},
},
expectedBackend: "buffered<enforced<dynamic_webhook>>",
},
{
name: "build no throttle",
auditConfig: &auditregv1alpha1.AuditSink{
Spec: auditregv1alpha1.AuditSinkSpec{
Policy: defaultPolicy,
Webhook: auditregv1alpha1.Webhook{
ClientConfig: auditregv1alpha1.WebhookClientConfig{
URL: &u,
},
},
},
},
expectedBackend: "buffered<enforced<dynamic_webhook>>",
},
} {
t.Run(tc.name, func(t *testing.T) {
b, err := NewBackend(config)
require.NoError(t, err)
c := factory{
config: b.(*backend).config,
webhookClientManager: b.(*backend).webhookClientManager,
sink: tc.auditConfig,
}
d, err := c.BuildDelegate()
require.NoError(t, err)
require.Equal(t, tc.expectedBackend, d.String())
})
}
}
func TestBuildWebhookBackend(t *testing.T) {
defaultPolicy := auditregv1alpha1.Policy{
Level: auditregv1alpha1.LevelMetadata,
}
config, _ := defaultTestConfig()
b, err := NewBackend(config)
require.NoError(t, err)
d := b.(*backend)
u := "http://localhost:4444"
for _, tc := range []struct {
name string
auditConfig *auditregv1alpha1.AuditSink
shouldErr bool
expectedBackend string
}{
{
name: "build full",
auditConfig: &auditregv1alpha1.AuditSink{
Spec: auditregv1alpha1.AuditSinkSpec{
Policy: defaultPolicy,
Webhook: auditregv1alpha1.Webhook{
ClientConfig: auditregv1alpha1.WebhookClientConfig{
URL: &u,
},
},
},
},
expectedBackend: "dynamic_webhook",
shouldErr: false,
},
{
name: "fail missing url",
auditConfig: &auditregv1alpha1.AuditSink{
Spec: auditregv1alpha1.AuditSinkSpec{
Policy: defaultPolicy,
Webhook: auditregv1alpha1.Webhook{
ClientConfig: auditregv1alpha1.WebhookClientConfig{},
},
},
},
shouldErr: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
c := &factory{
config: config,
webhookClientManager: d.webhookClientManager,
sink: tc.auditConfig,
}
ab, err := c.buildWebhookBackend()
if tc.shouldErr {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tc.expectedBackend, ab.String())
})
}
}

View File

@ -89,7 +89,6 @@ func NewDefaultOptions(out, err io.Writer) *AggregatorOptions {
RecommendedOptions: genericoptions.NewRecommendedOptions(
defaultEtcdPathPrefix,
aggregatorscheme.Codecs.LegacyCodec(v1beta1.SchemeGroupVersion),
genericoptions.NewProcessInfo("kube-aggregator", "kube-system"),
),
APIEnablement: genericoptions.NewAPIEnablementOptions(),

View File

@ -58,7 +58,6 @@ func NewWardleServerOptions(out, errOut io.Writer) *WardleServerOptions {
RecommendedOptions: genericoptions.NewRecommendedOptions(
defaultEtcdPathPrefix,
apiserver.Codecs.LegacyCodec(v1alpha1.SchemeGroupVersion),
genericoptions.NewProcessInfo("wardle-apiserver", "wardle"),
),
StdOut: out,

View File

@ -4,7 +4,6 @@ go_library(
name = "go_default_library",
srcs = [
"audit.go",
"audit_dynamic.go",
"certificates.go",
"framework.go",
"metadata_concealment.go",
@ -21,7 +20,6 @@ go_library(
"//pkg/security/podsecuritypolicy/util:go_default_library",
"//plugin/pkg/admission/serviceaccount:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library",
"//staging/src/k8s.io/api/auditregistration/v1alpha1:go_default_library",
"//staging/src/k8s.io/api/authentication/v1:go_default_library",
"//staging/src/k8s.io/api/batch/v1:go_default_library",
"//staging/src/k8s.io/api/certificates/v1beta1:go_default_library",
@ -35,7 +33,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",

View File

@ -1,382 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package auth
import (
"context"
"fmt"
"strings"
"time"
"github.com/onsi/ginkgo"
auditregistrationv1alpha1 "k8s.io/api/auditregistration/v1alpha1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/test/e2e/framework"
e2eauth "k8s.io/kubernetes/test/e2e/framework/auth"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
"k8s.io/kubernetes/test/utils"
imageutils "k8s.io/kubernetes/test/utils/image"
)
var _ = SIGDescribe("[Feature:DynamicAudit]", func() {
f := framework.NewDefaultFramework("audit")
ginkgo.It("should dynamically audit API calls", func() {
namespace := f.Namespace.Name
ginkgo.By("Creating a kubernetes client that impersonates an unauthorized anonymous user")
config, err := framework.LoadConfig()
framework.ExpectNoError(err, "failed to fetch config")
config.Impersonate = restclient.ImpersonationConfig{
UserName: "system:anonymous",
Groups: []string{"system:unauthenticated"},
}
anonymousClient, err := clientset.NewForConfig(config)
framework.ExpectNoError(err, "failed to create the anonymous client")
_, err = f.ClientSet.CoreV1().Namespaces().Create(context.TODO(), &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "audit",
},
}, metav1.CreateOptions{})
framework.ExpectNoError(err, "failed to create namespace")
_, err = f.ClientSet.CoreV1().Pods(namespace).Create(context.TODO(), &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "audit-proxy",
Labels: map[string]string{
"app": "audit",
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "proxy",
Image: imageutils.GetE2EImage(imageutils.Agnhost),
Args: []string{"audit-proxy"},
Ports: []v1.ContainerPort{
{
ContainerPort: 8080,
},
},
},
},
},
}, metav1.CreateOptions{})
framework.ExpectNoError(err, "failed to create proxy pod")
_, err = f.ClientSet.CoreV1().Services(namespace).Create(context.TODO(), &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "audit",
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{
{
Port: 80,
TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
},
},
Selector: map[string]string{
"app": "audit",
},
},
}, metav1.CreateOptions{})
framework.ExpectNoError(err, "failed to create proxy service")
var podIP string
// get pod ip
err = wait.Poll(100*time.Millisecond, 10*time.Second, func() (done bool, err error) {
p, err := f.ClientSet.CoreV1().Pods(namespace).Get(context.TODO(), "audit-proxy", metav1.GetOptions{})
if apierrors.IsNotFound(err) {
framework.Logf("waiting for audit-proxy pod to be present")
return false, nil
} else if err != nil {
return false, err
}
podIP = p.Status.PodIP
if podIP == "" {
framework.Logf("waiting for audit-proxy pod IP to be ready")
return false, nil
}
return true, nil
})
framework.ExpectNoError(err, "timed out waiting for audit-proxy pod to be ready")
podURL := fmt.Sprintf("http://%s:8080", podIP)
// create audit sink
sink := auditregistrationv1alpha1.AuditSink{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: auditregistrationv1alpha1.AuditSinkSpec{
Policy: auditregistrationv1alpha1.Policy{
Level: auditregistrationv1alpha1.LevelRequestResponse,
Stages: []auditregistrationv1alpha1.Stage{
auditregistrationv1alpha1.StageRequestReceived,
auditregistrationv1alpha1.StageResponseStarted,
auditregistrationv1alpha1.StageResponseComplete,
auditregistrationv1alpha1.StagePanic,
},
},
Webhook: auditregistrationv1alpha1.Webhook{
ClientConfig: auditregistrationv1alpha1.WebhookClientConfig{
URL: &podURL,
},
},
},
}
_, err = f.ClientSet.AuditregistrationV1alpha1().AuditSinks().Create(context.TODO(), &sink, metav1.CreateOptions{})
framework.ExpectNoError(err, "failed to create audit sink")
framework.Logf("created audit sink")
// check that we are receiving logs in the proxy
err = wait.Poll(100*time.Millisecond, 10*time.Second, func() (done bool, err error) {
logs, err := e2epod.GetPodLogs(f.ClientSet, namespace, "audit-proxy", "proxy")
if err != nil {
framework.Logf("waiting for audit-proxy pod logs to be available")
return false, nil
}
if logs == "" {
framework.Logf("waiting for audit-proxy pod logs to be non-empty")
return false, nil
}
return true, nil
})
framework.ExpectNoError(err, "failed to get logs from audit-proxy pod")
auditTestUser = "kubernetes-admin"
testCases := []struct {
action func()
events []utils.AuditEvent
}{
// Create, get, update, patch, delete, list, watch pods.
// TODO(@pbarker): dedupe this with the main audit test once policy functionality is available
// https://github.com/kubernetes/kubernetes/issues/70818
{
func() {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "audit-pod",
},
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "pause",
Image: imageutils.GetPauseImageName(),
}},
},
}
updatePod := func(pod *v1.Pod) {}
f.PodClient().CreateSync(pod)
_, err := f.PodClient().Get(context.TODO(), pod.Name, metav1.GetOptions{})
framework.ExpectNoError(err, "failed to get audit-pod")
podChan, err := f.PodClient().Watch(context.TODO(), watchOptions)
framework.ExpectNoError(err, "failed to create watch for pods")
for range podChan.ResultChan() {
}
f.PodClient().Update(pod.Name, updatePod)
_, err = f.PodClient().List(context.TODO(), metav1.ListOptions{})
framework.ExpectNoError(err, "failed to list pods")
_, err = f.PodClient().Patch(context.TODO(), pod.Name, types.JSONPatchType, patch, metav1.PatchOptions{})
framework.ExpectNoError(err, "failed to patch pod")
f.PodClient().DeleteSync(pod.Name, metav1.DeleteOptions{}, framework.DefaultPodDeletionTimeout)
},
[]utils.AuditEvent{
{
Level: auditinternal.LevelRequestResponse,
Stage: auditinternal.StageResponseComplete,
RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/pods", namespace),
Verb: "create",
Code: 201,
User: auditTestUser,
Resource: "pods",
Namespace: namespace,
RequestObject: true,
ResponseObject: true,
AuthorizeDecision: "allow",
}, {
Level: auditinternal.LevelRequestResponse,
Stage: auditinternal.StageResponseComplete,
RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/pods/audit-pod", namespace),
Verb: "get",
Code: 200,
User: auditTestUser,
Resource: "pods",
Namespace: namespace,
RequestObject: false,
ResponseObject: true,
AuthorizeDecision: "allow",
}, {
Level: auditinternal.LevelRequestResponse,
Stage: auditinternal.StageResponseComplete,
RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/pods", namespace),
Verb: "list",
Code: 200,
User: auditTestUser,
Resource: "pods",
Namespace: namespace,
RequestObject: false,
ResponseObject: true,
AuthorizeDecision: "allow",
}, {
Level: auditinternal.LevelRequestResponse,
Stage: auditinternal.StageResponseStarted,
RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/pods?timeout=%ds&timeoutSeconds=%d&watch=true", namespace, watchTestTimeout, watchTestTimeout),
Verb: "watch",
Code: 200,
User: auditTestUser,
Resource: "pods",
Namespace: namespace,
RequestObject: false,
ResponseObject: false,
AuthorizeDecision: "allow",
}, {
Level: auditinternal.LevelRequestResponse,
Stage: auditinternal.StageResponseComplete,
RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/pods?timeout=%ds&timeoutSeconds=%d&watch=true", namespace, watchTestTimeout, watchTestTimeout),
Verb: "watch",
Code: 200,
User: auditTestUser,
Resource: "pods",
Namespace: namespace,
RequestObject: false,
ResponseObject: false,
AuthorizeDecision: "allow",
}, {
Level: auditinternal.LevelRequestResponse,
Stage: auditinternal.StageResponseComplete,
RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/pods/audit-pod", namespace),
Verb: "update",
Code: 200,
User: auditTestUser,
Resource: "pods",
Namespace: namespace,
RequestObject: true,
ResponseObject: true,
AuthorizeDecision: "allow",
}, {
Level: auditinternal.LevelRequestResponse,
Stage: auditinternal.StageResponseComplete,
RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/pods/audit-pod", namespace),
Verb: "patch",
Code: 200,
User: auditTestUser,
Resource: "pods",
Namespace: namespace,
RequestObject: true,
ResponseObject: true,
AuthorizeDecision: "allow",
}, {
Level: auditinternal.LevelRequestResponse,
Stage: auditinternal.StageResponseComplete,
RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/pods/audit-pod", namespace),
Verb: "delete",
Code: 200,
User: auditTestUser,
Resource: "pods",
Namespace: namespace,
RequestObject: true,
ResponseObject: true,
AuthorizeDecision: "allow",
},
},
},
}
// test authorizer annotations, RBAC is required.
annotationTestCases := []struct {
action func()
events []utils.AuditEvent
}{
// get a pod with unauthorized user
{
func() {
_, err := anonymousClient.CoreV1().Pods(namespace).Get(context.TODO(), "another-audit-pod", metav1.GetOptions{})
expectForbidden(err)
},
[]utils.AuditEvent{
{
Level: auditinternal.LevelRequestResponse,
Stage: auditinternal.StageResponseComplete,
RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/pods/another-audit-pod", namespace),
Verb: "get",
Code: 403,
User: auditTestUser,
ImpersonatedUser: "system:anonymous",
ImpersonatedGroups: "system:unauthenticated",
Resource: "pods",
Namespace: namespace,
RequestObject: false,
ResponseObject: true,
AuthorizeDecision: "forbid",
},
},
},
}
if e2eauth.IsRBACEnabled(f.ClientSet.RbacV1()) {
testCases = append(testCases, annotationTestCases...)
}
expectedEvents := []utils.AuditEvent{}
for _, t := range testCases {
t.action()
expectedEvents = append(expectedEvents, t.events...)
}
// The default flush timeout is 30 seconds, therefore it should be enough to retry once
// to find all expected events. However, we're waiting for 5 minutes to avoid flakes.
pollingInterval := 30 * time.Second
pollingTimeout := 5 * time.Minute
err = wait.Poll(pollingInterval, pollingTimeout, func() (bool, error) {
// Fetch the logs
logs, err := e2epod.GetPodLogs(f.ClientSet, namespace, "audit-proxy", "proxy")
if err != nil {
return false, err
}
reader := strings.NewReader(logs)
missingReport, err := utils.CheckAuditLines(reader, expectedEvents, auditv1.SchemeGroupVersion)
if err != nil {
framework.Logf("Failed to observe audit events: %v", err)
} else if len(missingReport.MissingEvents) > 0 {
framework.Logf(missingReport.String())
}
return len(missingReport.MissingEvents) == 0, nil
})
framework.ExpectNoError(err, "after %v failed to observe audit events", pollingTimeout)
err = f.ClientSet.AuditregistrationV1alpha1().AuditSinks().Delete(context.TODO(), "test", metav1.DeleteOptions{})
framework.ExpectNoError(err, "could not delete audit configuration")
})
})

View File

@ -10,7 +10,6 @@ go_test(
name = "go_default_test",
size = "large",
srcs = [
"audit_dynamic_test.go",
"audit_test.go",
"crd_test.go",
"graceful_shutdown_test.go",
@ -24,7 +23,6 @@ go_test(
rundir = ".",
tags = ["integration"],
deps = [
"//cmd/kube-apiserver/app/options:go_default_library",
"//cmd/kube-apiserver/app/testing:go_default_library",
"//pkg/master:go_default_library",
"//staging/src/k8s.io/api/admission/v1beta1:go_default_library",
@ -50,17 +48,14 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authorization/authorizerfactory:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/aes:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/tokentest:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
"//test/integration:go_default_library",
"//test/integration/etcd:go_default_library",
@ -68,7 +63,6 @@ go_test(
"//test/utils:go_default_library",
"//vendor/github.com/evanphx/json-patch:go_default_library",
"//vendor/github.com/go-openapi/spec:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
"//vendor/sigs.k8s.io/yaml:go_default_library",
] + select({

View File

@ -1,282 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package master
import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/require"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/kubernetes/test/utils"
)
// TestDynamicAudit ensures that v1alpha of the auditregistration api works
func TestDynamicAudit(t *testing.T) {
// start api server
stopCh := make(chan struct{})
defer close(stopCh)
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicAuditing, true)()
kubeclient, _ := framework.StartTestServer(t, stopCh, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.Audit.DynamicOptions.Enabled = true
// set max batch size so the buffers flush immediately
opts.Audit.DynamicOptions.BatchConfig.MaxBatchSize = 1
opts.APIEnablement.RuntimeConfig.Set("auditregistration.k8s.io/v1alpha1=true")
},
})
// create test sinks
testServer1 := utils.NewAuditTestServer(t, "test1")
defer testServer1.Close()
testServer2 := utils.NewAuditTestServer(t, "test2")
defer testServer2.Close()
// check that servers are healthy
require.NoError(t, testServer1.Health(), "server1 never became healthy")
require.NoError(t, testServer2.Health(), "server2 never became healthy")
// build AuditSink configurations
sinkConfig1 := testServer1.BuildSinkConfiguration()
sinkConfig2 := testServer2.BuildSinkConfiguration()
// test creates a single audit sink, generates audit events, and ensures they arrive at the server
success := t.Run("one sink", func(t *testing.T) {
_, err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Create(context.TODO(), sinkConfig1, metav1.CreateOptions{})
require.NoError(t, err, "failed to create audit sink1")
t.Log("created audit sink1")
// verify sink is ready
sinkHealth(t, kubeclient, testServer1)
// perform configmap ops
configMapOperations(t, kubeclient)
// check for corresponding events
missing, err := testServer1.WaitForEvents(expectedEvents)
require.NoError(t, err, "failed to match all expected events for server1, events %#v not found", missing)
})
require.True(t, success) // propagate failure
// test creates a second audit sink, generates audit events, and ensures events arrive in both servers
success = t.Run("two sink", func(t *testing.T) {
_, err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Create(context.TODO(), sinkConfig2, metav1.CreateOptions{})
require.NoError(t, err, "failed to create audit sink2")
t.Log("created audit sink2")
// verify both sinks are ready
sinkHealth(t, kubeclient, testServer1, testServer2)
// perform configmap ops
configMapOperations(t, kubeclient)
// check for corresponding events in both sinks
missing, err := testServer1.WaitForEvents(expectedEvents)
require.NoError(t, err, "failed to match all expected events for server1, events %#v not found", missing)
missing, err = testServer2.WaitForEvents(expectedEvents)
require.NoError(t, err, "failed to match all expected events for server2, events %#v not found", missing)
})
require.True(t, success) // propagate failure
// test deletes an audit sink, generates audit events, and ensures they don't arrive in the corresponding server
success = t.Run("delete sink", func(t *testing.T) {
err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Delete(context.TODO(), sinkConfig2.Name, metav1.DeleteOptions{})
require.NoError(t, err, "failed to delete audit sink2")
t.Log("deleted audit sink2")
var finalErr error
err = wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
// reset event lists
testServer1.ResetEventList()
testServer2.ResetEventList()
// perform configmap ops
configMapOperations(t, kubeclient)
// check for corresponding events in server1
missing, err := testServer1.WaitForEvents(expectedEvents)
if err != nil {
finalErr = fmt.Errorf("%v: failed to match all expected events for server1, events %#v not found", err, missing)
return false, nil
}
// check that server2 is empty
if len(testServer2.GetEventList().Items) != 0 {
finalErr = fmt.Errorf("server2 event list should be empty")
return false, nil
}
return true, nil
})
require.NoError(t, err, finalErr)
})
require.True(t, success) // propagate failure
// This test will run a background process that generates audit events sending them to a sink.
// Whilst that generation is occurring, the sink is updated to point to a different server.
// The test checks that no events are lost or duplicated during the update.
t.Run("update sink", func(t *testing.T) {
// fetch sink1 config
sink1, err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Get(context.TODO(), sinkConfig1.Name, metav1.GetOptions{})
require.NoError(t, err)
// reset event lists
testServer1.ResetEventList()
testServer2.ResetEventList()
// run operations in background
stopChan := make(chan struct{})
expectedEvents := &atomic.Value{}
expectedEvents.Store([]utils.AuditEvent{})
wg := &sync.WaitGroup{}
wg.Add(1)
go asyncOps(stopChan, wg, kubeclient, expectedEvents)
// check to see that at least 20 events have arrived in server1
err = testServer1.WaitForNumEvents(20)
require.NoError(t, err, "failed to find enough events in server1")
// check that no events are in server 2 yet
require.Len(t, testServer2.GetEventList().Items, 0, "server2 should not have events yet")
// update the url
sink1.Spec.Webhook.ClientConfig.URL = &testServer2.Server.URL
_, err = kubeclient.AuditregistrationV1alpha1().AuditSinks().Update(context.TODO(), sink1, metav1.UpdateOptions{})
require.NoError(t, err, "failed to update audit sink1")
t.Log("updated audit sink1 to point to server2")
// check that at least 20 events have arrived in server2
err = testServer2.WaitForNumEvents(20)
require.NoError(t, err, "failed to find enough events in server2")
// stop the operations and ensure they have finished
close(stopChan)
wg.Wait()
// check that the final events have arrived
expected := expectedEvents.Load().([]utils.AuditEvent)
missing, err := testServer2.WaitForEvents(expected[len(expected)-4:])
require.NoError(t, err, "failed to find the final events in server2, events %#v not found", missing)
// combine the event lists
el1 := testServer1.GetEventList()
el2 := testServer2.GetEventList()
combinedList := auditinternal.EventList{}
combinedList.Items = append(el1.Items, el2.Items...)
// check that there are no duplicate events
dups, err := utils.CheckForDuplicates(combinedList)
require.NoError(t, err, "duplicate events found: %#v", dups)
// check that no events are missing
missing, err = utils.CheckAuditList(combinedList, expected)
require.NoError(t, err, "failed to match all expected events: %#v not found", missing)
})
}
// sinkHealth checks if sinks are running by verifying that uniquely identified events are found
// in the given servers
func sinkHealth(t *testing.T, kubeclient kubernetes.Interface, servers ...*utils.AuditTestServer) {
var missing []utils.AuditEvent
i := 0
var finalErr error
err := wait.PollImmediate(50*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
i++
name := fmt.Sprintf("health-%d-%d", i, time.Now().UnixNano())
expected, err := simpleOp(name, kubeclient)
require.NoError(t, err, "could not perform config map operations")
// check that all given servers have received events
for _, server := range servers {
missing, err = server.WaitForEvents(expected)
if err != nil {
finalErr = fmt.Errorf("not all events found in %s health check: missing %#v", server.Name, missing)
return false, nil
}
server.ResetEventList()
}
return true, nil
})
require.NoError(t, err, finalErr)
}
// simpleOp is a function that simply tries to get a configmap with the given name and returns the
// corresponding expected audit event
func simpleOp(name string, kubeclient kubernetes.Interface) ([]utils.AuditEvent, error) {
_, err := kubeclient.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return nil, err
}
expectedEvents := []utils.AuditEvent{
{
Level: auditinternal.LevelRequestResponse,
Stage: auditinternal.StageResponseComplete,
RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/configmaps/%s", namespace, name),
Verb: "get",
Code: 404,
User: auditTestUser,
Resource: "configmaps",
Namespace: namespace,
RequestObject: false,
ResponseObject: true,
AuthorizeDecision: "allow",
},
}
return expectedEvents, nil
}
// asyncOps runs the simpleOp function until the stopChan is closed updating
// the expected atomic events list
func asyncOps(
stopChan <-chan struct{},
wg *sync.WaitGroup,
kubeclient kubernetes.Interface,
expected *atomic.Value,
) {
for i := 0; ; i++ {
select {
case <-stopChan:
wg.Done()
return
default:
name := fmt.Sprintf("health-%d-%d", i, time.Now().UnixNano())
exp, err := simpleOp(name, kubeclient)
if err != nil {
// retry on errors
continue
}
e := expected.Load().([]utils.AuditEvent)
evList := append(e, exp...)
expected.Store(evList)
}
}
}