From d995047366153d86f0061b829ee4e7657f17996b Mon Sep 17 00:00:00 2001 From: Patrick Barker Date: Tue, 16 Oct 2018 16:17:33 -0600 Subject: [PATCH] adds dynamic audit integration test --- .../app/options/options_test.go | 4 + .../apiserver/pkg/server/options/audit.go | 10 +- test/integration/examples/webhook_test.go | 3 +- .../test_server.go} | 9 +- test/integration/master/audit_dynamic_test.go | 282 ++++++++++++++++++ test/integration/master/audit_test.go | 171 ++++++----- test/utils/audit.go | 95 ++++-- test/utils/audit_dynamic.go | 193 ++++++++++++ 8 files changed, 662 insertions(+), 105 deletions(-) rename test/integration/{examples/setup_test.go => framework/test_server.go} (96%) create mode 100644 test/integration/master/audit_dynamic_test.go create mode 100644 test/utils/audit_dynamic.go diff --git a/cmd/kube-apiserver/app/options/options_test.go b/cmd/kube-apiserver/app/options/options_test.go index 39768558d39..e4173130a3b 100644 --- a/cmd/kube-apiserver/app/options/options_test.go +++ b/cmd/kube-apiserver/app/options/options_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/apiserver/pkg/storage/storagebackend" utilflag "k8s.io/apiserver/pkg/util/flag" auditbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered" + auditdynamic "k8s.io/apiserver/plugin/pkg/audit/dynamic" audittruncate "k8s.io/apiserver/plugin/pkg/audit/truncate" restclient "k8s.io/client-go/rest" "k8s.io/kubernetes/pkg/api/legacyscheme" @@ -241,6 +242,9 @@ func TestAddFlags(t *testing.T) { InitialBackoff: 2 * time.Second, GroupVersionString: "audit.k8s.io/v1alpha1", }, + DynamicOptions: apiserveroptions.AuditDynamicOptions{ + BatchConfig: auditdynamic.NewDefaultWebhookBatchConfig(), + }, PolicyFile: "/policy", }, Features: &apiserveroptions.FeatureOptions{ diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/audit.go b/staging/src/k8s.io/apiserver/pkg/server/options/audit.go index 402a703ac61..98b3fc615c2 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/audit.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/audit.go @@ -148,9 +148,14 @@ type AuditWebhookOptions struct { GroupVersionString string } +// AuditDynamicOptions control the configuration of dynamic backends for audit events type AuditDynamicOptions struct { // Enabled tells whether the dynamic audit capability is enabled. Enabled bool + + // Configuration for batching backend. This is currently only used as an override + // for integration tests + BatchConfig *pluginbuffered.BatchConfig } func NewAuditOptions() *AuditOptions { @@ -174,7 +179,8 @@ func NewAuditOptions() *AuditOptions { GroupVersionString: "audit.k8s.io/v1", }, DynamicOptions: AuditDynamicOptions{ - Enabled: false, + Enabled: false, + BatchConfig: plugindynamic.NewDefaultWebhookBatchConfig(), }, } } @@ -634,7 +640,7 @@ func (o *AuditDynamicOptions) newBackend( dc := &plugindynamic.Config{ Informer: informer, - BufferedConfig: plugindynamic.NewDefaultWebhookBatchConfig(), + BufferedConfig: o.BatchConfig, EventConfig: plugindynamic.EventConfig{ Sink: eventSink, Source: corev1.EventSource{ diff --git a/test/integration/examples/webhook_test.go b/test/integration/examples/webhook_test.go index 4d538043078..611925a5f96 100644 --- a/test/integration/examples/webhook_test.go +++ b/test/integration/examples/webhook_test.go @@ -30,6 +30,7 @@ import ( "k8s.io/kubernetes/cmd/kube-apiserver/app/options" "k8s.io/kubernetes/pkg/master" "k8s.io/kubernetes/pkg/master/reconcilers" + "k8s.io/kubernetes/test/integration/framework" ) func TestWebhookLoopback(t *testing.T) { @@ -40,7 +41,7 @@ func TestWebhookLoopback(t *testing.T) { called := int32(0) - client, _ := startTestServer(t, stopCh, TestServerSetup{ + client, _ := framework.StartTestServer(t, stopCh, framework.TestServerSetup{ ModifyServerRunOptions: func(opts *options.ServerRunOptions) { }, ModifyServerConfig: func(config *master.Config) { diff --git a/test/integration/examples/setup_test.go b/test/integration/framework/test_server.go similarity index 96% rename from test/integration/examples/setup_test.go rename to test/integration/framework/test_server.go index 3c9f2becb79..c9f69fc7606 100644 --- a/test/integration/examples/setup_test.go +++ b/test/integration/framework/test_server.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package apiserver +package framework import ( "io/ioutil" @@ -36,7 +36,6 @@ import ( "k8s.io/kubernetes/cmd/kube-apiserver/app" "k8s.io/kubernetes/cmd/kube-apiserver/app/options" "k8s.io/kubernetes/pkg/master" - "k8s.io/kubernetes/test/integration/framework" ) type TestServerSetup struct { @@ -44,8 +43,8 @@ type TestServerSetup struct { ModifyServerConfig func(*master.Config) } -// startTestServer runs a kube-apiserver, optionally calling out to the setup.ModifyServerRunOptions and setup.ModifyServerConfig functions -func startTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup) (client.Interface, *rest.Config) { +// StartTestServer runs a kube-apiserver, optionally calling out to the setup.ModifyServerRunOptions and setup.ModifyServerConfig functions +func StartTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup) (client.Interface, *rest.Config) { certDir, _ := ioutil.TempDir("", "test-integration-"+t.Name()) go func() { <-stopCh @@ -89,7 +88,7 @@ func startTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir kubeAPIServerOptions.InsecureServing.BindPort = 0 kubeAPIServerOptions.Etcd.StorageConfig.Prefix = path.Join("/", uuid.New(), "registry") - kubeAPIServerOptions.Etcd.StorageConfig.Transport.ServerList = []string{framework.GetEtcdURL()} + kubeAPIServerOptions.Etcd.StorageConfig.Transport.ServerList = []string{GetEtcdURL()} kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange kubeAPIServerOptions.Authentication.RequestHeader.UsernameHeaders = []string{"X-Remote-User"} kubeAPIServerOptions.Authentication.RequestHeader.GroupHeaders = []string{"X-Remote-Group"} diff --git a/test/integration/master/audit_dynamic_test.go b/test/integration/master/audit_dynamic_test.go new file mode 100644 index 00000000000..a0cc6c4d0cd --- /dev/null +++ b/test/integration/master/audit_dynamic_test.go @@ -0,0 +1,282 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package master + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + auditinternal "k8s.io/apiserver/pkg/apis/audit" + "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" + utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing" + "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/cmd/kube-apiserver/app/options" + "k8s.io/kubernetes/test/integration/framework" + "k8s.io/kubernetes/test/utils" +) + +// TestDynamicAudit ensures that v1alpha of the auditregistration api works +func TestDynamicAudit(t *testing.T) { + // start api server + stopCh := make(chan struct{}) + defer close(stopCh) + + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicAuditing, true)() + kubeclient, _ := framework.StartTestServer(t, stopCh, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + opts.Audit.DynamicOptions.Enabled = true + // set max batch size so the buffers flush immediately + opts.Audit.DynamicOptions.BatchConfig.MaxBatchSize = 1 + opts.APIEnablement.RuntimeConfig.Set("auditregistration.k8s.io/v1alpha1=true") + }, + }) + + // create test sinks + testServer1 := utils.NewAuditTestServer(t, "test1") + defer testServer1.Close() + testServer2 := utils.NewAuditTestServer(t, "test2") + defer testServer2.Close() + + // check that servers are healthy + require.NoError(t, testServer1.Health(), "server1 never became healthy") + require.NoError(t, testServer2.Health(), "server2 never became healthy") + + // build AuditSink configurations + sinkConfig1 := testServer1.BuildSinkConfiguration() + sinkConfig2 := testServer2.BuildSinkConfiguration() + + // test creates a single audit sink, generates audit events, and ensures they arrive at the server + success := t.Run("one sink", func(t *testing.T) { + _, err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Create(sinkConfig1) + require.NoError(t, err, "failed to create audit sink1") + t.Log("created audit sink1") + + // verify sink is ready + sinkHealth(t, kubeclient, testServer1) + + // perform configmap ops + configMapOperations(t, kubeclient) + + // check for corresponding events + missing, err := testServer1.WaitForEvents(expectedEvents) + require.NoError(t, err, "failed to match all expected events for server1, events %#v not found", missing) + }) + require.True(t, success) // propagate failure + + // test creates a second audit sink, generates audit events, and ensures events arrive in both servers + success = t.Run("two sink", func(t *testing.T) { + _, err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Create(sinkConfig2) + require.NoError(t, err, "failed to create audit sink2") + t.Log("created audit sink2") + + // verify both sinks are ready + sinkHealth(t, kubeclient, testServer1, testServer2) + + // perform configmap ops + configMapOperations(t, kubeclient) + + // check for corresponding events in both sinks + missing, err := testServer1.WaitForEvents(expectedEvents) + require.NoError(t, err, "failed to match all expected events for server1, events %#v not found", missing) + missing, err = testServer2.WaitForEvents(expectedEvents) + require.NoError(t, err, "failed to match all expected events for server2, events %#v not found", missing) + }) + require.True(t, success) // propagate failure + + // test deletes an audit sink, generates audit events, and ensures they don't arrive in the corresponding server + success = t.Run("delete sink", func(t *testing.T) { + err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Delete(sinkConfig2.Name, &metav1.DeleteOptions{}) + require.NoError(t, err, "failed to delete audit sink2") + t.Log("deleted audit sink2") + + var finalErr error + err = wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + // reset event lists + testServer1.ResetEventList() + testServer2.ResetEventList() + + // perform configmap ops + configMapOperations(t, kubeclient) + + // check for corresponding events in server1 + missing, err := testServer1.WaitForEvents(expectedEvents) + if err != nil { + finalErr = fmt.Errorf("%v: failed to match all expected events for server1, events %#v not found", err, missing) + return false, nil + } + + // check that server2 is empty + if len(testServer2.GetEventList().Items) != 0 { + finalErr = fmt.Errorf("server2 event list should be empty") + return false, nil + } + return true, nil + }) + require.NoError(t, err, finalErr) + }) + require.True(t, success) // propagate failure + + // This test will run a background process that generates audit events sending them to a sink. + // Whilst that generation is occurring, the sink is updated to point to a different server. + // The test checks that no events are lost or duplicated during the update. + t.Run("update sink", func(t *testing.T) { + // fetch sink1 config + sink1, err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Get(sinkConfig1.Name, metav1.GetOptions{}) + require.NoError(t, err) + + // reset event lists + testServer1.ResetEventList() + testServer2.ResetEventList() + + // run operations in background + stopChan := make(chan struct{}) + expectedEvents := &atomic.Value{} + expectedEvents.Store([]utils.AuditEvent{}) + wg := &sync.WaitGroup{} + wg.Add(1) + go asyncOps(stopChan, wg, kubeclient, expectedEvents) + + // check to see that at least 20 events have arrived in server1 + err = testServer1.WaitForNumEvents(20) + require.NoError(t, err, "failed to find enough events in server1") + + // check that no events are in server 2 yet + require.Len(t, testServer2.GetEventList().Items, 0, "server2 should not have events yet") + + // update the url + sink1.Spec.Webhook.ClientConfig.URL = &testServer2.Server.URL + _, err = kubeclient.AuditregistrationV1alpha1().AuditSinks().Update(sink1) + require.NoError(t, err, "failed to update audit sink1") + t.Log("updated audit sink1 to point to server2") + + // check that at least 20 events have arrived in server2 + err = testServer2.WaitForNumEvents(20) + require.NoError(t, err, "failed to find enough events in server2") + + // stop the operations and ensure they have finished + close(stopChan) + wg.Wait() + + // check that the final events have arrived + expected := expectedEvents.Load().([]utils.AuditEvent) + missing, err := testServer2.WaitForEvents(expected[len(expected)-4:]) + require.NoError(t, err, "failed to find the final events in server2, events %#v not found", missing) + + // combine the event lists + el1 := testServer1.GetEventList() + el2 := testServer2.GetEventList() + combinedList := auditinternal.EventList{} + combinedList.Items = append(el1.Items, el2.Items...) + + // check that there are no duplicate events + dups, err := utils.CheckForDuplicates(combinedList) + require.NoError(t, err, "duplicate events found: %#v", dups) + + // check that no events are missing + missing, err = utils.CheckAuditList(combinedList, expected) + require.NoError(t, err, "failed to match all expected events: %#v not found", missing) + }) +} + +// sinkHealth checks if sinks are running by verifying that uniquely identified events are found +// in the given servers +func sinkHealth(t *testing.T, kubeclient kubernetes.Interface, servers ...*utils.AuditTestServer) { + var missing []utils.AuditEvent + i := 0 + var finalErr error + err := wait.PollImmediate(50*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + i++ + name := fmt.Sprintf("health-%d-%d", i, time.Now().UnixNano()) + expected, err := simpleOp(name, kubeclient) + require.NoError(t, err, "could not perform config map operations") + + // check that all given servers have received events + for _, server := range servers { + missing, err = server.WaitForEvents(expected) + if err != nil { + finalErr = fmt.Errorf("not all events found in %s health check: missing %#v", server.Name, missing) + return false, nil + } + server.ResetEventList() + } + return true, nil + }) + require.NoError(t, err, finalErr) +} + +// simpleOp is a function that simply tries to get a configmap with the given name and returns the +// corresponding expected audit event +func simpleOp(name string, kubeclient kubernetes.Interface) ([]utils.AuditEvent, error) { + _, err := kubeclient.CoreV1().ConfigMaps(namespace).Get(name, metav1.GetOptions{}) + if err != nil && !errors.IsNotFound(err) { + return nil, err + } + + expectedEvents := []utils.AuditEvent{ + { + Level: auditinternal.LevelRequestResponse, + Stage: auditinternal.StageResponseComplete, + RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/configmaps/%s", namespace, name), + Verb: "get", + Code: 404, + User: auditTestUser, + Resource: "configmaps", + Namespace: namespace, + RequestObject: false, + ResponseObject: true, + AuthorizeDecision: "allow", + }, + } + return expectedEvents, nil +} + +// asyncOps runs the simpleOp function until the stopChan is closed updating +// the expected atomic events list +func asyncOps( + stopChan <-chan struct{}, + wg *sync.WaitGroup, + kubeclient kubernetes.Interface, + expected *atomic.Value, +) { + for i := 0; ; i++ { + select { + case <-stopChan: + wg.Done() + return + default: + name := fmt.Sprintf("health-%d-%d", i, time.Now().UnixNano()) + exp, err := simpleOp(name, kubeclient) + if err != nil { + // retry on errors + continue + } + e := expected.Load().([]utils.AuditEvent) + evList := []utils.AuditEvent{} + evList = append(e, exp...) + expected.Store(evList) + } + } +} diff --git a/test/integration/master/audit_test.go b/test/integration/master/audit_test.go index ecdf2d2d0d4..e9ffff1c66d 100644 --- a/test/integration/master/audit_test.go +++ b/test/integration/master/audit_test.go @@ -59,87 +59,8 @@ rules: "audit.k8s.io/v1": auditv1.SchemeGroupVersion, "audit.k8s.io/v1beta1": auditv1beta1.SchemeGroupVersion, } -) -// TestAudit ensures that both v1beta1 and v1 version audit api could work. -func TestAudit(t *testing.T) { - for version := range versions { - testAudit(t, version) - } -} - -func testAudit(t *testing.T, version string) { - // prepare audit policy file - auditPolicy := []byte(strings.Replace(auditPolicyPattern, "{version}", version, 1)) - policyFile, err := ioutil.TempFile("", "audit-policy.yaml") - if err != nil { - t.Fatalf("Failed to create audit policy file: %v", err) - } - defer os.Remove(policyFile.Name()) - if _, err := policyFile.Write(auditPolicy); err != nil { - t.Fatalf("Failed to write audit policy file: %v", err) - } - if err := policyFile.Close(); err != nil { - t.Fatalf("Failed to close audit policy file: %v", err) - } - - // prepare audit log file - logFile, err := ioutil.TempFile("", "audit.log") - if err != nil { - t.Fatalf("Failed to create audit log file: %v", err) - } - defer os.Remove(logFile.Name()) - - // start api server - result := kubeapiservertesting.StartTestServerOrDie(t, nil, - []string{ - "--audit-policy-file", policyFile.Name(), - "--audit-log-version", version, - "--audit-log-mode", "blocking", - "--audit-log-path", logFile.Name()}, - framework.SharedEtcd()) - defer result.TearDownFn() - - kubeclient, err := kubernetes.NewForConfig(result.ClientConfig) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - func() { - // create, get, watch, update, patch, list and delete configmap. - configMap := &apiv1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "audit-configmap", - }, - Data: map[string]string{ - "map-key": "map-value", - }, - } - - _, err := kubeclient.CoreV1().ConfigMaps(namespace).Create(configMap) - expectNoError(t, err, "failed to create audit-configmap") - - _, err = kubeclient.CoreV1().ConfigMaps(namespace).Get(configMap.Name, metav1.GetOptions{}) - expectNoError(t, err, "failed to get audit-configmap") - - configMapChan, err := kubeclient.CoreV1().ConfigMaps(namespace).Watch(watchOptions) - expectNoError(t, err, "failed to create watch for config maps") - configMapChan.Stop() - - _, err = kubeclient.CoreV1().ConfigMaps(namespace).Update(configMap) - expectNoError(t, err, "failed to update audit-configmap") - - _, err = kubeclient.CoreV1().ConfigMaps(namespace).Patch(configMap.Name, types.JSONPatchType, patch) - expectNoError(t, err, "failed to patch configmap") - - _, err = kubeclient.CoreV1().ConfigMaps(namespace).List(metav1.ListOptions{}) - expectNoError(t, err, "failed to list config maps") - - err = kubeclient.CoreV1().ConfigMaps(namespace).Delete(configMap.Name, &metav1.DeleteOptions{}) - expectNoError(t, err, "failed to delete audit-configmap") - }() - - expectedEvents := []utils.AuditEvent{ + expectedEvents = []utils.AuditEvent{ { Level: auditinternal.LevelRequestResponse, Stage: auditinternal.StageResponseComplete, @@ -238,7 +159,56 @@ func testAudit(t *testing.T, version string) { AuthorizeDecision: "allow", }, } +) +// TestAudit ensures that both v1beta1 and v1 version audit api could work. +func TestAudit(t *testing.T) { + for version := range versions { + testAudit(t, version) + } +} + +func testAudit(t *testing.T, version string) { + // prepare audit policy file + auditPolicy := []byte(strings.Replace(auditPolicyPattern, "{version}", version, 1)) + policyFile, err := ioutil.TempFile("", "audit-policy.yaml") + if err != nil { + t.Fatalf("Failed to create audit policy file: %v", err) + } + defer os.Remove(policyFile.Name()) + if _, err := policyFile.Write(auditPolicy); err != nil { + t.Fatalf("Failed to write audit policy file: %v", err) + } + if err := policyFile.Close(); err != nil { + t.Fatalf("Failed to close audit policy file: %v", err) + } + + // prepare audit log file + logFile, err := ioutil.TempFile("", "audit.log") + if err != nil { + t.Fatalf("Failed to create audit log file: %v", err) + } + defer os.Remove(logFile.Name()) + + // start api server + result := kubeapiservertesting.StartTestServerOrDie(t, nil, + []string{ + "--audit-policy-file", policyFile.Name(), + "--audit-log-version", version, + "--audit-log-mode", "blocking", + "--audit-log-path", logFile.Name()}, + framework.SharedEtcd()) + defer result.TearDownFn() + + kubeclient, err := kubernetes.NewForConfig(result.ClientConfig) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // perform configmap operations + configMapOperations(t, kubeclient) + + // check for corresponding audit logs stream, err := os.Open(logFile.Name()) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -253,6 +223,47 @@ func testAudit(t *testing.T, version string) { } } +// configMapOperations is a set of known operations perfomed on the configmap type +// which correspond to the expected events. +// This is shared by the dynamic test +func configMapOperations(t *testing.T, kubeclient kubernetes.Interface) { + // create, get, watch, update, patch, list and delete configmap. + configMap := &apiv1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "audit-configmap", + }, + Data: map[string]string{ + "map-key": "map-value", + }, + } + + _, err := kubeclient.CoreV1().ConfigMaps(namespace).Create(configMap) + expectNoError(t, err, "failed to create audit-configmap") + + _, err = kubeclient.CoreV1().ConfigMaps(namespace).Get(configMap.Name, metav1.GetOptions{}) + expectNoError(t, err, "failed to get audit-configmap") + + configMapChan, err := kubeclient.CoreV1().ConfigMaps(namespace).Watch(watchOptions) + expectNoError(t, err, "failed to create watch for config maps") + for range configMapChan.ResultChan() { + // Block until watchOptions.TimeoutSeconds expires. + // If the test finishes before watchOptions.TimeoutSeconds expires, the watch audit + // event at stage ResponseComplete will not be generated. + } + + _, err = kubeclient.CoreV1().ConfigMaps(namespace).Update(configMap) + expectNoError(t, err, "failed to update audit-configmap") + + _, err = kubeclient.CoreV1().ConfigMaps(namespace).Patch(configMap.Name, types.JSONPatchType, patch) + expectNoError(t, err, "failed to patch configmap") + + _, err = kubeclient.CoreV1().ConfigMaps(namespace).List(metav1.ListOptions{}) + expectNoError(t, err, "failed to list config maps") + + err = kubeclient.CoreV1().ConfigMaps(namespace).Delete(configMap.Name, &metav1.DeleteOptions{}) + expectNoError(t, err, "failed to delete audit-configmap") +} + func expectNoError(t *testing.T, err error, msg string) { if err != nil { t.Fatalf("%s: %v", msg, err) diff --git a/test/utils/audit.go b/test/utils/audit.go index c78cb2719a2..028829c7943 100644 --- a/test/utils/audit.go +++ b/test/utils/audit.go @@ -25,11 +25,14 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" auditinternal "k8s.io/apiserver/pkg/apis/audit" "k8s.io/apiserver/pkg/audit" ) +// AuditEvent is a simplified representation of an audit event for testing purposes type AuditEvent struct { + ID types.UID Level auditinternal.Level Stage auditinternal.Stage RequestURI string @@ -45,17 +48,21 @@ type AuditEvent struct { AuthorizeDecision string } -// Search the audit log for the expected audit lines. +// CheckAuditLines searches the audit log for the expected audit lines. +// if includeID is true the event ids will also be verified func CheckAuditLines(stream io.Reader, expected []AuditEvent, version schema.GroupVersion) (missing []AuditEvent, err error) { - expectations := map[AuditEvent]bool{} - for _, event := range expected { - expectations[event] = false - } + expectations := buildEventExpectations(expected) scanner := bufio.NewScanner(stream) for scanner.Scan() { line := scanner.Text() - event, err := parseAuditLine(line, version) + e := &auditinternal.Event{} + decoder := audit.Codecs.UniversalDecoder(version) + if err := runtime.DecodeInto(decoder, []byte(line), e); err != nil { + return expected, fmt.Errorf("failed decoding buf: %s, apiVersion: %s", line, version) + } + + event, err := testEventFromInternal(e) if err != nil { return expected, err } @@ -69,22 +76,65 @@ func CheckAuditLines(stream io.Reader, expected []AuditEvent, version schema.Gro return expected, err } - missing = make([]AuditEvent, 0) - for event, found := range expectations { - if !found { - missing = append(missing, event) - } - } + missing = findMissing(expectations) return missing, nil } -func parseAuditLine(line string, version schema.GroupVersion) (AuditEvent, error) { - e := &auditinternal.Event{} - decoder := audit.Codecs.UniversalDecoder(version) - if err := runtime.DecodeInto(decoder, []byte(line), e); err != nil { - return AuditEvent{}, fmt.Errorf("failed decoding buf: %s, apiVersion: %s", line, version) +// CheckAuditList searches an audit event list for the expected audit events. +// if includeID is true the event ids will also be verified +func CheckAuditList(el auditinternal.EventList, expected []AuditEvent) (missing []AuditEvent, err error) { + expectations := buildEventExpectations(expected) + + for _, e := range el.Items { + event, err := testEventFromInternal(&e) + if err != nil { + return expected, err + } + + // If the event was expected, mark it as found. + if _, found := expectations[event]; found { + expectations[event] = true + } } + missing = findMissing(expectations) + return missing, nil +} + +// CheckForDuplicates checks a list for duplicate events +func CheckForDuplicates(el auditinternal.EventList) (auditinternal.EventList, error) { + // eventMap holds a map of audit events with just a nil value + eventMap := map[AuditEvent]*bool{} + duplicates := auditinternal.EventList{} + var err error + for _, e := range el.Items { + event, err := testEventFromInternal(&e) + if err != nil { + return duplicates, err + } + event.ID = e.AuditID + if _, ok := eventMap[event]; ok { + duplicates.Items = append(duplicates.Items, e) + err = fmt.Errorf("failed duplicate check") + continue + } + eventMap[event] = nil + } + return duplicates, err +} + +// buildEventExpectations creates a bool map out of a list of audit events +func buildEventExpectations(expected []AuditEvent) map[AuditEvent]bool { + expectations := map[AuditEvent]bool{} + for _, event := range expected { + expectations[event] = false + } + return expectations +} + +// testEventFromInternal takes an internal audit event and returns a test event +// if includeID is true the event id will be included +func testEventFromInternal(e *auditinternal.Event) (AuditEvent, error) { event := AuditEvent{ Level: e.Level, Stage: e.Stage, @@ -113,3 +163,14 @@ func parseAuditLine(line string, version schema.GroupVersion) (AuditEvent, error event.AuthorizeDecision = e.Annotations["authorization.k8s.io/decision"] return event, nil } + +// findMissing checks for false values in the expectations map and returns them as a list +func findMissing(expectations map[AuditEvent]bool) []AuditEvent { + var missing []AuditEvent + for event, found := range expectations { + if !found { + missing = append(missing, event) + } + } + return missing +} diff --git a/test/utils/audit_dynamic.go b/test/utils/audit_dynamic.go new file mode 100644 index 00000000000..fdb85e4161c --- /dev/null +++ b/test/utils/audit_dynamic.go @@ -0,0 +1,193 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + auditregv1alpha1 "k8s.io/api/auditregistration/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + auditinternal "k8s.io/apiserver/pkg/apis/audit" + auditv1 "k8s.io/apiserver/pkg/apis/audit/v1" + "k8s.io/apiserver/pkg/audit" +) + +// AuditTestServer is a helper server for dynamic audit testing +type AuditTestServer struct { + Name string + LockedEventList *LockedEventList + Server *httptest.Server + t *testing.T +} + +// LockedEventList is an event list with a lock for concurrent access +type LockedEventList struct { + *sync.RWMutex + EventList auditinternal.EventList +} + +// NewLockedEventList returns a new LockedEventList +func NewLockedEventList() *LockedEventList { + return &LockedEventList{ + RWMutex: &sync.RWMutex{}, + EventList: auditinternal.EventList{}, + } +} + +// NewAuditTestServer returns a new audit test server +func NewAuditTestServer(t *testing.T, name string) *AuditTestServer { + s := &AuditTestServer{ + Name: name, + LockedEventList: NewLockedEventList(), + t: t, + } + s.buildServer() + return s +} + +// GetEventList safely returns the internal event list +func (a *AuditTestServer) GetEventList() auditinternal.EventList { + a.LockedEventList.RLock() + defer a.LockedEventList.RUnlock() + return a.LockedEventList.EventList +} + +// ResetEventList resets the internal event list +func (a *AuditTestServer) ResetEventList() { + a.LockedEventList.Lock() + defer a.LockedEventList.Unlock() + a.LockedEventList.EventList = auditinternal.EventList{} +} + +// AppendEvents will add the given events to the internal event list +func (a *AuditTestServer) AppendEvents(events []auditinternal.Event) { + a.LockedEventList.Lock() + defer a.LockedEventList.Unlock() + a.LockedEventList.EventList.Items = append(a.LockedEventList.EventList.Items, events...) +} + +// WaitForEvents waits for the given events to arrive in the server or the 30s timeout is reached +func (a *AuditTestServer) WaitForEvents(expected []AuditEvent) ([]AuditEvent, error) { + var missing []AuditEvent + err := wait.PollImmediate(50*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + var err error + a.LockedEventList.RLock() + defer a.LockedEventList.RUnlock() + el := a.GetEventList() + if len(el.Items) < 1 { + return false, nil + } + missing, err = CheckAuditList(el, expected) + if err != nil { + return false, nil + } + return true, nil + }) + return missing, err +} + +// WaitForNumEvents checks that at least the given number of events has arrived or the 30s timeout is reached +func (a *AuditTestServer) WaitForNumEvents(numEvents int) error { + err := wait.PollImmediate(50*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + el := a.GetEventList() + if len(el.Items) < numEvents { + return false, nil + } + return true, nil + }) + if err != nil { + return fmt.Errorf("%v: %d events failed to arrive in %v", err, numEvents, wait.ForeverTestTimeout) + } + return nil +} + +// Health polls the server healthcheck until successful or the 30s timeout has been reached +func (a *AuditTestServer) Health() error { + err := wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + resp, err := http.Get(fmt.Sprintf("%s/health", a.Server.URL)) + if err != nil { + return false, nil + } + if resp.StatusCode != 200 { + return false, nil + } + return true, nil + }) + if err != nil { + return fmt.Errorf("server %s permanently failed health check: %v", a.Server.URL, err) + } + return nil +} + +// Close the server +func (a *AuditTestServer) Close() { + a.Server.Close() +} + +// BuildSinkConfiguration creates a generic audit sink configuration for this server +func (a *AuditTestServer) BuildSinkConfiguration() *auditregv1alpha1.AuditSink { + return &auditregv1alpha1.AuditSink{ + ObjectMeta: metav1.ObjectMeta{ + Name: a.Name, + }, + Spec: auditregv1alpha1.AuditSinkSpec{ + Policy: auditregv1alpha1.Policy{ + Level: auditregv1alpha1.LevelRequestResponse, + Stages: []auditregv1alpha1.Stage{ + auditregv1alpha1.StageResponseStarted, + auditregv1alpha1.StageResponseComplete, + }, + }, + Webhook: auditregv1alpha1.Webhook{ + ClientConfig: auditregv1alpha1.WebhookClientConfig{ + URL: &a.Server.URL, + }, + }, + }, + } +} + +// buildServer creates an http test server that will update the internal event list +// with the value it receives +func (a *AuditTestServer) buildServer() { + decoder := audit.Codecs.UniversalDecoder(auditv1.SchemeGroupVersion) + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(r.Body) + require.NoError(a.t, err, "could not read request body") + el := auditinternal.EventList{} + err = runtime.DecodeInto(decoder, body, &el) + r.Body.Close() + require.NoError(a.t, err, "failed decoding buf: %b, apiVersion: %s", body, auditv1.SchemeGroupVersion) + a.AppendEvents(el.Items) + w.WriteHeader(200) + }) + mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + }) + a.Server = httptest.NewServer(mux) +}