adds dynamic audit integration test

This commit is contained in:
Patrick Barker 2018-10-16 16:17:33 -06:00 committed by Patrick Barker
parent 5d19fda5e8
commit d995047366
8 changed files with 662 additions and 105 deletions

View File

@ -29,6 +29,7 @@ import (
"k8s.io/apiserver/pkg/storage/storagebackend"
utilflag "k8s.io/apiserver/pkg/util/flag"
auditbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered"
auditdynamic "k8s.io/apiserver/plugin/pkg/audit/dynamic"
audittruncate "k8s.io/apiserver/plugin/pkg/audit/truncate"
restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/pkg/api/legacyscheme"
@ -241,6 +242,9 @@ func TestAddFlags(t *testing.T) {
InitialBackoff: 2 * time.Second,
GroupVersionString: "audit.k8s.io/v1alpha1",
},
DynamicOptions: apiserveroptions.AuditDynamicOptions{
BatchConfig: auditdynamic.NewDefaultWebhookBatchConfig(),
},
PolicyFile: "/policy",
},
Features: &apiserveroptions.FeatureOptions{

View File

@ -148,9 +148,14 @@ type AuditWebhookOptions struct {
GroupVersionString string
}
// AuditDynamicOptions control the configuration of dynamic backends for audit events
type AuditDynamicOptions struct {
// Enabled tells whether the dynamic audit capability is enabled.
Enabled bool
// Configuration for batching backend. This is currently only used as an override
// for integration tests
BatchConfig *pluginbuffered.BatchConfig
}
func NewAuditOptions() *AuditOptions {
@ -174,7 +179,8 @@ func NewAuditOptions() *AuditOptions {
GroupVersionString: "audit.k8s.io/v1",
},
DynamicOptions: AuditDynamicOptions{
Enabled: false,
Enabled: false,
BatchConfig: plugindynamic.NewDefaultWebhookBatchConfig(),
},
}
}
@ -634,7 +640,7 @@ func (o *AuditDynamicOptions) newBackend(
dc := &plugindynamic.Config{
Informer: informer,
BufferedConfig: plugindynamic.NewDefaultWebhookBatchConfig(),
BufferedConfig: o.BatchConfig,
EventConfig: plugindynamic.EventConfig{
Sink: eventSink,
Source: corev1.EventSource{

View File

@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/master/reconcilers"
"k8s.io/kubernetes/test/integration/framework"
)
func TestWebhookLoopback(t *testing.T) {
@ -40,7 +41,7 @@ func TestWebhookLoopback(t *testing.T) {
called := int32(0)
client, _ := startTestServer(t, stopCh, TestServerSetup{
client, _ := framework.StartTestServer(t, stopCh, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
},
ModifyServerConfig: func(config *master.Config) {

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
package framework
import (
"io/ioutil"
@ -36,7 +36,6 @@ import (
"k8s.io/kubernetes/cmd/kube-apiserver/app"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/test/integration/framework"
)
type TestServerSetup struct {
@ -44,8 +43,8 @@ type TestServerSetup struct {
ModifyServerConfig func(*master.Config)
}
// startTestServer runs a kube-apiserver, optionally calling out to the setup.ModifyServerRunOptions and setup.ModifyServerConfig functions
func startTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup) (client.Interface, *rest.Config) {
// StartTestServer runs a kube-apiserver, optionally calling out to the setup.ModifyServerRunOptions and setup.ModifyServerConfig functions
func StartTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup) (client.Interface, *rest.Config) {
certDir, _ := ioutil.TempDir("", "test-integration-"+t.Name())
go func() {
<-stopCh
@ -89,7 +88,7 @@ func startTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup
kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir
kubeAPIServerOptions.InsecureServing.BindPort = 0
kubeAPIServerOptions.Etcd.StorageConfig.Prefix = path.Join("/", uuid.New(), "registry")
kubeAPIServerOptions.Etcd.StorageConfig.Transport.ServerList = []string{framework.GetEtcdURL()}
kubeAPIServerOptions.Etcd.StorageConfig.Transport.ServerList = []string{GetEtcdURL()}
kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange
kubeAPIServerOptions.Authentication.RequestHeader.UsernameHeaders = []string{"X-Remote-User"}
kubeAPIServerOptions.Authentication.RequestHeader.GroupHeaders = []string{"X-Remote-Group"}

View File

@ -0,0 +1,282 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package master
import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
"k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/kubernetes/test/utils"
)
// TestDynamicAudit ensures that v1alpha of the auditregistration api works
func TestDynamicAudit(t *testing.T) {
// start api server
stopCh := make(chan struct{})
defer close(stopCh)
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicAuditing, true)()
kubeclient, _ := framework.StartTestServer(t, stopCh, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.Audit.DynamicOptions.Enabled = true
// set max batch size so the buffers flush immediately
opts.Audit.DynamicOptions.BatchConfig.MaxBatchSize = 1
opts.APIEnablement.RuntimeConfig.Set("auditregistration.k8s.io/v1alpha1=true")
},
})
// create test sinks
testServer1 := utils.NewAuditTestServer(t, "test1")
defer testServer1.Close()
testServer2 := utils.NewAuditTestServer(t, "test2")
defer testServer2.Close()
// check that servers are healthy
require.NoError(t, testServer1.Health(), "server1 never became healthy")
require.NoError(t, testServer2.Health(), "server2 never became healthy")
// build AuditSink configurations
sinkConfig1 := testServer1.BuildSinkConfiguration()
sinkConfig2 := testServer2.BuildSinkConfiguration()
// test creates a single audit sink, generates audit events, and ensures they arrive at the server
success := t.Run("one sink", func(t *testing.T) {
_, err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Create(sinkConfig1)
require.NoError(t, err, "failed to create audit sink1")
t.Log("created audit sink1")
// verify sink is ready
sinkHealth(t, kubeclient, testServer1)
// perform configmap ops
configMapOperations(t, kubeclient)
// check for corresponding events
missing, err := testServer1.WaitForEvents(expectedEvents)
require.NoError(t, err, "failed to match all expected events for server1, events %#v not found", missing)
})
require.True(t, success) // propagate failure
// test creates a second audit sink, generates audit events, and ensures events arrive in both servers
success = t.Run("two sink", func(t *testing.T) {
_, err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Create(sinkConfig2)
require.NoError(t, err, "failed to create audit sink2")
t.Log("created audit sink2")
// verify both sinks are ready
sinkHealth(t, kubeclient, testServer1, testServer2)
// perform configmap ops
configMapOperations(t, kubeclient)
// check for corresponding events in both sinks
missing, err := testServer1.WaitForEvents(expectedEvents)
require.NoError(t, err, "failed to match all expected events for server1, events %#v not found", missing)
missing, err = testServer2.WaitForEvents(expectedEvents)
require.NoError(t, err, "failed to match all expected events for server2, events %#v not found", missing)
})
require.True(t, success) // propagate failure
// test deletes an audit sink, generates audit events, and ensures they don't arrive in the corresponding server
success = t.Run("delete sink", func(t *testing.T) {
err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Delete(sinkConfig2.Name, &metav1.DeleteOptions{})
require.NoError(t, err, "failed to delete audit sink2")
t.Log("deleted audit sink2")
var finalErr error
err = wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
// reset event lists
testServer1.ResetEventList()
testServer2.ResetEventList()
// perform configmap ops
configMapOperations(t, kubeclient)
// check for corresponding events in server1
missing, err := testServer1.WaitForEvents(expectedEvents)
if err != nil {
finalErr = fmt.Errorf("%v: failed to match all expected events for server1, events %#v not found", err, missing)
return false, nil
}
// check that server2 is empty
if len(testServer2.GetEventList().Items) != 0 {
finalErr = fmt.Errorf("server2 event list should be empty")
return false, nil
}
return true, nil
})
require.NoError(t, err, finalErr)
})
require.True(t, success) // propagate failure
// This test will run a background process that generates audit events sending them to a sink.
// Whilst that generation is occurring, the sink is updated to point to a different server.
// The test checks that no events are lost or duplicated during the update.
t.Run("update sink", func(t *testing.T) {
// fetch sink1 config
sink1, err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Get(sinkConfig1.Name, metav1.GetOptions{})
require.NoError(t, err)
// reset event lists
testServer1.ResetEventList()
testServer2.ResetEventList()
// run operations in background
stopChan := make(chan struct{})
expectedEvents := &atomic.Value{}
expectedEvents.Store([]utils.AuditEvent{})
wg := &sync.WaitGroup{}
wg.Add(1)
go asyncOps(stopChan, wg, kubeclient, expectedEvents)
// check to see that at least 20 events have arrived in server1
err = testServer1.WaitForNumEvents(20)
require.NoError(t, err, "failed to find enough events in server1")
// check that no events are in server 2 yet
require.Len(t, testServer2.GetEventList().Items, 0, "server2 should not have events yet")
// update the url
sink1.Spec.Webhook.ClientConfig.URL = &testServer2.Server.URL
_, err = kubeclient.AuditregistrationV1alpha1().AuditSinks().Update(sink1)
require.NoError(t, err, "failed to update audit sink1")
t.Log("updated audit sink1 to point to server2")
// check that at least 20 events have arrived in server2
err = testServer2.WaitForNumEvents(20)
require.NoError(t, err, "failed to find enough events in server2")
// stop the operations and ensure they have finished
close(stopChan)
wg.Wait()
// check that the final events have arrived
expected := expectedEvents.Load().([]utils.AuditEvent)
missing, err := testServer2.WaitForEvents(expected[len(expected)-4:])
require.NoError(t, err, "failed to find the final events in server2, events %#v not found", missing)
// combine the event lists
el1 := testServer1.GetEventList()
el2 := testServer2.GetEventList()
combinedList := auditinternal.EventList{}
combinedList.Items = append(el1.Items, el2.Items...)
// check that there are no duplicate events
dups, err := utils.CheckForDuplicates(combinedList)
require.NoError(t, err, "duplicate events found: %#v", dups)
// check that no events are missing
missing, err = utils.CheckAuditList(combinedList, expected)
require.NoError(t, err, "failed to match all expected events: %#v not found", missing)
})
}
// sinkHealth checks if sinks are running by verifying that uniquely identified events are found
// in the given servers
func sinkHealth(t *testing.T, kubeclient kubernetes.Interface, servers ...*utils.AuditTestServer) {
var missing []utils.AuditEvent
i := 0
var finalErr error
err := wait.PollImmediate(50*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
i++
name := fmt.Sprintf("health-%d-%d", i, time.Now().UnixNano())
expected, err := simpleOp(name, kubeclient)
require.NoError(t, err, "could not perform config map operations")
// check that all given servers have received events
for _, server := range servers {
missing, err = server.WaitForEvents(expected)
if err != nil {
finalErr = fmt.Errorf("not all events found in %s health check: missing %#v", server.Name, missing)
return false, nil
}
server.ResetEventList()
}
return true, nil
})
require.NoError(t, err, finalErr)
}
// simpleOp is a function that simply tries to get a configmap with the given name and returns the
// corresponding expected audit event
func simpleOp(name string, kubeclient kubernetes.Interface) ([]utils.AuditEvent, error) {
_, err := kubeclient.CoreV1().ConfigMaps(namespace).Get(name, metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
return nil, err
}
expectedEvents := []utils.AuditEvent{
{
Level: auditinternal.LevelRequestResponse,
Stage: auditinternal.StageResponseComplete,
RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/configmaps/%s", namespace, name),
Verb: "get",
Code: 404,
User: auditTestUser,
Resource: "configmaps",
Namespace: namespace,
RequestObject: false,
ResponseObject: true,
AuthorizeDecision: "allow",
},
}
return expectedEvents, nil
}
// asyncOps runs the simpleOp function until the stopChan is closed updating
// the expected atomic events list
func asyncOps(
stopChan <-chan struct{},
wg *sync.WaitGroup,
kubeclient kubernetes.Interface,
expected *atomic.Value,
) {
for i := 0; ; i++ {
select {
case <-stopChan:
wg.Done()
return
default:
name := fmt.Sprintf("health-%d-%d", i, time.Now().UnixNano())
exp, err := simpleOp(name, kubeclient)
if err != nil {
// retry on errors
continue
}
e := expected.Load().([]utils.AuditEvent)
evList := []utils.AuditEvent{}
evList = append(e, exp...)
expected.Store(evList)
}
}
}

View File

@ -59,87 +59,8 @@ rules:
"audit.k8s.io/v1": auditv1.SchemeGroupVersion,
"audit.k8s.io/v1beta1": auditv1beta1.SchemeGroupVersion,
}
)
// TestAudit ensures that both v1beta1 and v1 version audit api could work.
func TestAudit(t *testing.T) {
for version := range versions {
testAudit(t, version)
}
}
func testAudit(t *testing.T, version string) {
// prepare audit policy file
auditPolicy := []byte(strings.Replace(auditPolicyPattern, "{version}", version, 1))
policyFile, err := ioutil.TempFile("", "audit-policy.yaml")
if err != nil {
t.Fatalf("Failed to create audit policy file: %v", err)
}
defer os.Remove(policyFile.Name())
if _, err := policyFile.Write(auditPolicy); err != nil {
t.Fatalf("Failed to write audit policy file: %v", err)
}
if err := policyFile.Close(); err != nil {
t.Fatalf("Failed to close audit policy file: %v", err)
}
// prepare audit log file
logFile, err := ioutil.TempFile("", "audit.log")
if err != nil {
t.Fatalf("Failed to create audit log file: %v", err)
}
defer os.Remove(logFile.Name())
// start api server
result := kubeapiservertesting.StartTestServerOrDie(t, nil,
[]string{
"--audit-policy-file", policyFile.Name(),
"--audit-log-version", version,
"--audit-log-mode", "blocking",
"--audit-log-path", logFile.Name()},
framework.SharedEtcd())
defer result.TearDownFn()
kubeclient, err := kubernetes.NewForConfig(result.ClientConfig)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
func() {
// create, get, watch, update, patch, list and delete configmap.
configMap := &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "audit-configmap",
},
Data: map[string]string{
"map-key": "map-value",
},
}
_, err := kubeclient.CoreV1().ConfigMaps(namespace).Create(configMap)
expectNoError(t, err, "failed to create audit-configmap")
_, err = kubeclient.CoreV1().ConfigMaps(namespace).Get(configMap.Name, metav1.GetOptions{})
expectNoError(t, err, "failed to get audit-configmap")
configMapChan, err := kubeclient.CoreV1().ConfigMaps(namespace).Watch(watchOptions)
expectNoError(t, err, "failed to create watch for config maps")
configMapChan.Stop()
_, err = kubeclient.CoreV1().ConfigMaps(namespace).Update(configMap)
expectNoError(t, err, "failed to update audit-configmap")
_, err = kubeclient.CoreV1().ConfigMaps(namespace).Patch(configMap.Name, types.JSONPatchType, patch)
expectNoError(t, err, "failed to patch configmap")
_, err = kubeclient.CoreV1().ConfigMaps(namespace).List(metav1.ListOptions{})
expectNoError(t, err, "failed to list config maps")
err = kubeclient.CoreV1().ConfigMaps(namespace).Delete(configMap.Name, &metav1.DeleteOptions{})
expectNoError(t, err, "failed to delete audit-configmap")
}()
expectedEvents := []utils.AuditEvent{
expectedEvents = []utils.AuditEvent{
{
Level: auditinternal.LevelRequestResponse,
Stage: auditinternal.StageResponseComplete,
@ -238,7 +159,56 @@ func testAudit(t *testing.T, version string) {
AuthorizeDecision: "allow",
},
}
)
// TestAudit ensures that both v1beta1 and v1 version audit api could work.
func TestAudit(t *testing.T) {
for version := range versions {
testAudit(t, version)
}
}
func testAudit(t *testing.T, version string) {
// prepare audit policy file
auditPolicy := []byte(strings.Replace(auditPolicyPattern, "{version}", version, 1))
policyFile, err := ioutil.TempFile("", "audit-policy.yaml")
if err != nil {
t.Fatalf("Failed to create audit policy file: %v", err)
}
defer os.Remove(policyFile.Name())
if _, err := policyFile.Write(auditPolicy); err != nil {
t.Fatalf("Failed to write audit policy file: %v", err)
}
if err := policyFile.Close(); err != nil {
t.Fatalf("Failed to close audit policy file: %v", err)
}
// prepare audit log file
logFile, err := ioutil.TempFile("", "audit.log")
if err != nil {
t.Fatalf("Failed to create audit log file: %v", err)
}
defer os.Remove(logFile.Name())
// start api server
result := kubeapiservertesting.StartTestServerOrDie(t, nil,
[]string{
"--audit-policy-file", policyFile.Name(),
"--audit-log-version", version,
"--audit-log-mode", "blocking",
"--audit-log-path", logFile.Name()},
framework.SharedEtcd())
defer result.TearDownFn()
kubeclient, err := kubernetes.NewForConfig(result.ClientConfig)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// perform configmap operations
configMapOperations(t, kubeclient)
// check for corresponding audit logs
stream, err := os.Open(logFile.Name())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
@ -253,6 +223,47 @@ func testAudit(t *testing.T, version string) {
}
}
// configMapOperations is a set of known operations perfomed on the configmap type
// which correspond to the expected events.
// This is shared by the dynamic test
func configMapOperations(t *testing.T, kubeclient kubernetes.Interface) {
// create, get, watch, update, patch, list and delete configmap.
configMap := &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "audit-configmap",
},
Data: map[string]string{
"map-key": "map-value",
},
}
_, err := kubeclient.CoreV1().ConfigMaps(namespace).Create(configMap)
expectNoError(t, err, "failed to create audit-configmap")
_, err = kubeclient.CoreV1().ConfigMaps(namespace).Get(configMap.Name, metav1.GetOptions{})
expectNoError(t, err, "failed to get audit-configmap")
configMapChan, err := kubeclient.CoreV1().ConfigMaps(namespace).Watch(watchOptions)
expectNoError(t, err, "failed to create watch for config maps")
for range configMapChan.ResultChan() {
// Block until watchOptions.TimeoutSeconds expires.
// If the test finishes before watchOptions.TimeoutSeconds expires, the watch audit
// event at stage ResponseComplete will not be generated.
}
_, err = kubeclient.CoreV1().ConfigMaps(namespace).Update(configMap)
expectNoError(t, err, "failed to update audit-configmap")
_, err = kubeclient.CoreV1().ConfigMaps(namespace).Patch(configMap.Name, types.JSONPatchType, patch)
expectNoError(t, err, "failed to patch configmap")
_, err = kubeclient.CoreV1().ConfigMaps(namespace).List(metav1.ListOptions{})
expectNoError(t, err, "failed to list config maps")
err = kubeclient.CoreV1().ConfigMaps(namespace).Delete(configMap.Name, &metav1.DeleteOptions{})
expectNoError(t, err, "failed to delete audit-configmap")
}
func expectNoError(t *testing.T, err error, msg string) {
if err != nil {
t.Fatalf("%s: %v", msg, err)

View File

@ -25,11 +25,14 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/audit"
)
// AuditEvent is a simplified representation of an audit event for testing purposes
type AuditEvent struct {
ID types.UID
Level auditinternal.Level
Stage auditinternal.Stage
RequestURI string
@ -45,17 +48,21 @@ type AuditEvent struct {
AuthorizeDecision string
}
// Search the audit log for the expected audit lines.
// CheckAuditLines searches the audit log for the expected audit lines.
// if includeID is true the event ids will also be verified
func CheckAuditLines(stream io.Reader, expected []AuditEvent, version schema.GroupVersion) (missing []AuditEvent, err error) {
expectations := map[AuditEvent]bool{}
for _, event := range expected {
expectations[event] = false
}
expectations := buildEventExpectations(expected)
scanner := bufio.NewScanner(stream)
for scanner.Scan() {
line := scanner.Text()
event, err := parseAuditLine(line, version)
e := &auditinternal.Event{}
decoder := audit.Codecs.UniversalDecoder(version)
if err := runtime.DecodeInto(decoder, []byte(line), e); err != nil {
return expected, fmt.Errorf("failed decoding buf: %s, apiVersion: %s", line, version)
}
event, err := testEventFromInternal(e)
if err != nil {
return expected, err
}
@ -69,22 +76,65 @@ func CheckAuditLines(stream io.Reader, expected []AuditEvent, version schema.Gro
return expected, err
}
missing = make([]AuditEvent, 0)
for event, found := range expectations {
if !found {
missing = append(missing, event)
}
}
missing = findMissing(expectations)
return missing, nil
}
func parseAuditLine(line string, version schema.GroupVersion) (AuditEvent, error) {
e := &auditinternal.Event{}
decoder := audit.Codecs.UniversalDecoder(version)
if err := runtime.DecodeInto(decoder, []byte(line), e); err != nil {
return AuditEvent{}, fmt.Errorf("failed decoding buf: %s, apiVersion: %s", line, version)
// CheckAuditList searches an audit event list for the expected audit events.
// if includeID is true the event ids will also be verified
func CheckAuditList(el auditinternal.EventList, expected []AuditEvent) (missing []AuditEvent, err error) {
expectations := buildEventExpectations(expected)
for _, e := range el.Items {
event, err := testEventFromInternal(&e)
if err != nil {
return expected, err
}
// If the event was expected, mark it as found.
if _, found := expectations[event]; found {
expectations[event] = true
}
}
missing = findMissing(expectations)
return missing, nil
}
// CheckForDuplicates checks a list for duplicate events
func CheckForDuplicates(el auditinternal.EventList) (auditinternal.EventList, error) {
// eventMap holds a map of audit events with just a nil value
eventMap := map[AuditEvent]*bool{}
duplicates := auditinternal.EventList{}
var err error
for _, e := range el.Items {
event, err := testEventFromInternal(&e)
if err != nil {
return duplicates, err
}
event.ID = e.AuditID
if _, ok := eventMap[event]; ok {
duplicates.Items = append(duplicates.Items, e)
err = fmt.Errorf("failed duplicate check")
continue
}
eventMap[event] = nil
}
return duplicates, err
}
// buildEventExpectations creates a bool map out of a list of audit events
func buildEventExpectations(expected []AuditEvent) map[AuditEvent]bool {
expectations := map[AuditEvent]bool{}
for _, event := range expected {
expectations[event] = false
}
return expectations
}
// testEventFromInternal takes an internal audit event and returns a test event
// if includeID is true the event id will be included
func testEventFromInternal(e *auditinternal.Event) (AuditEvent, error) {
event := AuditEvent{
Level: e.Level,
Stage: e.Stage,
@ -113,3 +163,14 @@ func parseAuditLine(line string, version schema.GroupVersion) (AuditEvent, error
event.AuthorizeDecision = e.Annotations["authorization.k8s.io/decision"]
return event, nil
}
// findMissing checks for false values in the expectations map and returns them as a list
func findMissing(expectations map[AuditEvent]bool) []AuditEvent {
var missing []AuditEvent
for event, found := range expectations {
if !found {
missing = append(missing, event)
}
}
return missing
}

193
test/utils/audit_dynamic.go Normal file
View File

@ -0,0 +1,193 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package utils
import (
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
auditregv1alpha1 "k8s.io/api/auditregistration/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
"k8s.io/apiserver/pkg/audit"
)
// AuditTestServer is a helper server for dynamic audit testing
type AuditTestServer struct {
Name string
LockedEventList *LockedEventList
Server *httptest.Server
t *testing.T
}
// LockedEventList is an event list with a lock for concurrent access
type LockedEventList struct {
*sync.RWMutex
EventList auditinternal.EventList
}
// NewLockedEventList returns a new LockedEventList
func NewLockedEventList() *LockedEventList {
return &LockedEventList{
RWMutex: &sync.RWMutex{},
EventList: auditinternal.EventList{},
}
}
// NewAuditTestServer returns a new audit test server
func NewAuditTestServer(t *testing.T, name string) *AuditTestServer {
s := &AuditTestServer{
Name: name,
LockedEventList: NewLockedEventList(),
t: t,
}
s.buildServer()
return s
}
// GetEventList safely returns the internal event list
func (a *AuditTestServer) GetEventList() auditinternal.EventList {
a.LockedEventList.RLock()
defer a.LockedEventList.RUnlock()
return a.LockedEventList.EventList
}
// ResetEventList resets the internal event list
func (a *AuditTestServer) ResetEventList() {
a.LockedEventList.Lock()
defer a.LockedEventList.Unlock()
a.LockedEventList.EventList = auditinternal.EventList{}
}
// AppendEvents will add the given events to the internal event list
func (a *AuditTestServer) AppendEvents(events []auditinternal.Event) {
a.LockedEventList.Lock()
defer a.LockedEventList.Unlock()
a.LockedEventList.EventList.Items = append(a.LockedEventList.EventList.Items, events...)
}
// WaitForEvents waits for the given events to arrive in the server or the 30s timeout is reached
func (a *AuditTestServer) WaitForEvents(expected []AuditEvent) ([]AuditEvent, error) {
var missing []AuditEvent
err := wait.PollImmediate(50*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
var err error
a.LockedEventList.RLock()
defer a.LockedEventList.RUnlock()
el := a.GetEventList()
if len(el.Items) < 1 {
return false, nil
}
missing, err = CheckAuditList(el, expected)
if err != nil {
return false, nil
}
return true, nil
})
return missing, err
}
// WaitForNumEvents checks that at least the given number of events has arrived or the 30s timeout is reached
func (a *AuditTestServer) WaitForNumEvents(numEvents int) error {
err := wait.PollImmediate(50*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
el := a.GetEventList()
if len(el.Items) < numEvents {
return false, nil
}
return true, nil
})
if err != nil {
return fmt.Errorf("%v: %d events failed to arrive in %v", err, numEvents, wait.ForeverTestTimeout)
}
return nil
}
// Health polls the server healthcheck until successful or the 30s timeout has been reached
func (a *AuditTestServer) Health() error {
err := wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
resp, err := http.Get(fmt.Sprintf("%s/health", a.Server.URL))
if err != nil {
return false, nil
}
if resp.StatusCode != 200 {
return false, nil
}
return true, nil
})
if err != nil {
return fmt.Errorf("server %s permanently failed health check: %v", a.Server.URL, err)
}
return nil
}
// Close the server
func (a *AuditTestServer) Close() {
a.Server.Close()
}
// BuildSinkConfiguration creates a generic audit sink configuration for this server
func (a *AuditTestServer) BuildSinkConfiguration() *auditregv1alpha1.AuditSink {
return &auditregv1alpha1.AuditSink{
ObjectMeta: metav1.ObjectMeta{
Name: a.Name,
},
Spec: auditregv1alpha1.AuditSinkSpec{
Policy: auditregv1alpha1.Policy{
Level: auditregv1alpha1.LevelRequestResponse,
Stages: []auditregv1alpha1.Stage{
auditregv1alpha1.StageResponseStarted,
auditregv1alpha1.StageResponseComplete,
},
},
Webhook: auditregv1alpha1.Webhook{
ClientConfig: auditregv1alpha1.WebhookClientConfig{
URL: &a.Server.URL,
},
},
},
}
}
// buildServer creates an http test server that will update the internal event list
// with the value it receives
func (a *AuditTestServer) buildServer() {
decoder := audit.Codecs.UniversalDecoder(auditv1.SchemeGroupVersion)
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
require.NoError(a.t, err, "could not read request body")
el := auditinternal.EventList{}
err = runtime.DecodeInto(decoder, body, &el)
r.Body.Close()
require.NoError(a.t, err, "failed decoding buf: %b, apiVersion: %s", body, auditv1.SchemeGroupVersion)
a.AppendEvents(el.Items)
w.WriteHeader(200)
})
mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
})
a.Server = httptest.NewServer(mux)
}