mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-11-04 07:49:35 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			283 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			283 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2018 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package master
 | 
						|
 | 
						|
import (
 | 
						|
	"fmt"
 | 
						|
	"sync"
 | 
						|
	"sync/atomic"
 | 
						|
	"testing"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/stretchr/testify/require"
 | 
						|
 | 
						|
	"k8s.io/apimachinery/pkg/api/errors"
 | 
						|
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						|
	"k8s.io/apimachinery/pkg/util/wait"
 | 
						|
	auditinternal "k8s.io/apiserver/pkg/apis/audit"
 | 
						|
	"k8s.io/apiserver/pkg/features"
 | 
						|
	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
						|
	utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
 | 
						|
	"k8s.io/client-go/kubernetes"
 | 
						|
	"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
 | 
						|
	"k8s.io/kubernetes/test/integration/framework"
 | 
						|
	"k8s.io/kubernetes/test/utils"
 | 
						|
)
 | 
						|
 | 
						|
// TestDynamicAudit ensures that v1alpha of the auditregistration api works
 | 
						|
func TestDynamicAudit(t *testing.T) {
 | 
						|
	// start api server
 | 
						|
	stopCh := make(chan struct{})
 | 
						|
	defer close(stopCh)
 | 
						|
 | 
						|
	defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicAuditing, true)()
 | 
						|
	kubeclient, _ := framework.StartTestServer(t, stopCh, framework.TestServerSetup{
 | 
						|
		ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
 | 
						|
			opts.Audit.DynamicOptions.Enabled = true
 | 
						|
			// set max batch size so the buffers flush immediately
 | 
						|
			opts.Audit.DynamicOptions.BatchConfig.MaxBatchSize = 1
 | 
						|
			opts.APIEnablement.RuntimeConfig.Set("auditregistration.k8s.io/v1alpha1=true")
 | 
						|
		},
 | 
						|
	})
 | 
						|
 | 
						|
	// create test sinks
 | 
						|
	testServer1 := utils.NewAuditTestServer(t, "test1")
 | 
						|
	defer testServer1.Close()
 | 
						|
	testServer2 := utils.NewAuditTestServer(t, "test2")
 | 
						|
	defer testServer2.Close()
 | 
						|
 | 
						|
	// check that servers are healthy
 | 
						|
	require.NoError(t, testServer1.Health(), "server1 never became healthy")
 | 
						|
	require.NoError(t, testServer2.Health(), "server2 never became healthy")
 | 
						|
 | 
						|
	// build AuditSink configurations
 | 
						|
	sinkConfig1 := testServer1.BuildSinkConfiguration()
 | 
						|
	sinkConfig2 := testServer2.BuildSinkConfiguration()
 | 
						|
 | 
						|
	// test creates a single audit sink, generates audit events, and ensures they arrive at the server
 | 
						|
	success := t.Run("one sink", func(t *testing.T) {
 | 
						|
		_, err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Create(sinkConfig1)
 | 
						|
		require.NoError(t, err, "failed to create audit sink1")
 | 
						|
		t.Log("created audit sink1")
 | 
						|
 | 
						|
		// verify sink is ready
 | 
						|
		sinkHealth(t, kubeclient, testServer1)
 | 
						|
 | 
						|
		// perform configmap ops
 | 
						|
		configMapOperations(t, kubeclient)
 | 
						|
 | 
						|
		// check for corresponding events
 | 
						|
		missing, err := testServer1.WaitForEvents(expectedEvents)
 | 
						|
		require.NoError(t, err, "failed to match all expected events for server1, events %#v not found", missing)
 | 
						|
	})
 | 
						|
	require.True(t, success) // propagate failure
 | 
						|
 | 
						|
	// test creates a second audit sink, generates audit events, and ensures events arrive in both servers
 | 
						|
	success = t.Run("two sink", func(t *testing.T) {
 | 
						|
		_, err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Create(sinkConfig2)
 | 
						|
		require.NoError(t, err, "failed to create audit sink2")
 | 
						|
		t.Log("created audit sink2")
 | 
						|
 | 
						|
		// verify both sinks are ready
 | 
						|
		sinkHealth(t, kubeclient, testServer1, testServer2)
 | 
						|
 | 
						|
		// perform configmap ops
 | 
						|
		configMapOperations(t, kubeclient)
 | 
						|
 | 
						|
		// check for corresponding events in both sinks
 | 
						|
		missing, err := testServer1.WaitForEvents(expectedEvents)
 | 
						|
		require.NoError(t, err, "failed to match all expected events for server1, events %#v not found", missing)
 | 
						|
		missing, err = testServer2.WaitForEvents(expectedEvents)
 | 
						|
		require.NoError(t, err, "failed to match all expected events for server2, events %#v not found", missing)
 | 
						|
	})
 | 
						|
	require.True(t, success) // propagate failure
 | 
						|
 | 
						|
	// test deletes an audit sink, generates audit events, and ensures they don't arrive in the corresponding server
 | 
						|
	success = t.Run("delete sink", func(t *testing.T) {
 | 
						|
		err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Delete(sinkConfig2.Name, &metav1.DeleteOptions{})
 | 
						|
		require.NoError(t, err, "failed to delete audit sink2")
 | 
						|
		t.Log("deleted audit sink2")
 | 
						|
 | 
						|
		var finalErr error
 | 
						|
		err = wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
 | 
						|
			// reset event lists
 | 
						|
			testServer1.ResetEventList()
 | 
						|
			testServer2.ResetEventList()
 | 
						|
 | 
						|
			// perform configmap ops
 | 
						|
			configMapOperations(t, kubeclient)
 | 
						|
 | 
						|
			// check for corresponding events in server1
 | 
						|
			missing, err := testServer1.WaitForEvents(expectedEvents)
 | 
						|
			if err != nil {
 | 
						|
				finalErr = fmt.Errorf("%v: failed to match all expected events for server1, events %#v not found", err, missing)
 | 
						|
				return false, nil
 | 
						|
			}
 | 
						|
 | 
						|
			// check that server2 is empty
 | 
						|
			if len(testServer2.GetEventList().Items) != 0 {
 | 
						|
				finalErr = fmt.Errorf("server2 event list should be empty")
 | 
						|
				return false, nil
 | 
						|
			}
 | 
						|
			return true, nil
 | 
						|
		})
 | 
						|
		require.NoError(t, err, finalErr)
 | 
						|
	})
 | 
						|
	require.True(t, success) // propagate failure
 | 
						|
 | 
						|
	// This test will run a background process that generates audit events sending them to a sink.
 | 
						|
	// Whilst that generation is occurring, the sink is updated to point to a different server.
 | 
						|
	// The test checks that no events are lost or duplicated during the update.
 | 
						|
	t.Run("update sink", func(t *testing.T) {
 | 
						|
		// fetch sink1 config
 | 
						|
		sink1, err := kubeclient.AuditregistrationV1alpha1().AuditSinks().Get(sinkConfig1.Name, metav1.GetOptions{})
 | 
						|
		require.NoError(t, err)
 | 
						|
 | 
						|
		// reset event lists
 | 
						|
		testServer1.ResetEventList()
 | 
						|
		testServer2.ResetEventList()
 | 
						|
 | 
						|
		// run operations in background
 | 
						|
		stopChan := make(chan struct{})
 | 
						|
		expectedEvents := &atomic.Value{}
 | 
						|
		expectedEvents.Store([]utils.AuditEvent{})
 | 
						|
		wg := &sync.WaitGroup{}
 | 
						|
		wg.Add(1)
 | 
						|
		go asyncOps(stopChan, wg, kubeclient, expectedEvents)
 | 
						|
 | 
						|
		// check to see that at least 20 events have arrived in server1
 | 
						|
		err = testServer1.WaitForNumEvents(20)
 | 
						|
		require.NoError(t, err, "failed to find enough events in server1")
 | 
						|
 | 
						|
		// check that no events are in server 2 yet
 | 
						|
		require.Len(t, testServer2.GetEventList().Items, 0, "server2 should not have events yet")
 | 
						|
 | 
						|
		// update the url
 | 
						|
		sink1.Spec.Webhook.ClientConfig.URL = &testServer2.Server.URL
 | 
						|
		_, err = kubeclient.AuditregistrationV1alpha1().AuditSinks().Update(sink1)
 | 
						|
		require.NoError(t, err, "failed to update audit sink1")
 | 
						|
		t.Log("updated audit sink1 to point to server2")
 | 
						|
 | 
						|
		// check that at least 20 events have arrived in server2
 | 
						|
		err = testServer2.WaitForNumEvents(20)
 | 
						|
		require.NoError(t, err, "failed to find enough events in server2")
 | 
						|
 | 
						|
		// stop the operations and ensure they have finished
 | 
						|
		close(stopChan)
 | 
						|
		wg.Wait()
 | 
						|
 | 
						|
		// check that the final events have arrived
 | 
						|
		expected := expectedEvents.Load().([]utils.AuditEvent)
 | 
						|
		missing, err := testServer2.WaitForEvents(expected[len(expected)-4:])
 | 
						|
		require.NoError(t, err, "failed to find the final events in server2, events %#v not found", missing)
 | 
						|
 | 
						|
		// combine the event lists
 | 
						|
		el1 := testServer1.GetEventList()
 | 
						|
		el2 := testServer2.GetEventList()
 | 
						|
		combinedList := auditinternal.EventList{}
 | 
						|
		combinedList.Items = append(el1.Items, el2.Items...)
 | 
						|
 | 
						|
		// check that there are no duplicate events
 | 
						|
		dups, err := utils.CheckForDuplicates(combinedList)
 | 
						|
		require.NoError(t, err, "duplicate events found: %#v", dups)
 | 
						|
 | 
						|
		// check that no events are missing
 | 
						|
		missing, err = utils.CheckAuditList(combinedList, expected)
 | 
						|
		require.NoError(t, err, "failed to match all expected events: %#v not found", missing)
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// sinkHealth checks if sinks are running by verifying that uniquely identified events are found
 | 
						|
// in the given servers
 | 
						|
func sinkHealth(t *testing.T, kubeclient kubernetes.Interface, servers ...*utils.AuditTestServer) {
 | 
						|
	var missing []utils.AuditEvent
 | 
						|
	i := 0
 | 
						|
	var finalErr error
 | 
						|
	err := wait.PollImmediate(50*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
 | 
						|
		i++
 | 
						|
		name := fmt.Sprintf("health-%d-%d", i, time.Now().UnixNano())
 | 
						|
		expected, err := simpleOp(name, kubeclient)
 | 
						|
		require.NoError(t, err, "could not perform config map operations")
 | 
						|
 | 
						|
		// check that all given servers have received events
 | 
						|
		for _, server := range servers {
 | 
						|
			missing, err = server.WaitForEvents(expected)
 | 
						|
			if err != nil {
 | 
						|
				finalErr = fmt.Errorf("not all events found in %s health check: missing %#v", server.Name, missing)
 | 
						|
				return false, nil
 | 
						|
			}
 | 
						|
			server.ResetEventList()
 | 
						|
		}
 | 
						|
		return true, nil
 | 
						|
	})
 | 
						|
	require.NoError(t, err, finalErr)
 | 
						|
}
 | 
						|
 | 
						|
// simpleOp is a function that simply tries to get a configmap with the given name and returns the
 | 
						|
// corresponding expected audit event
 | 
						|
func simpleOp(name string, kubeclient kubernetes.Interface) ([]utils.AuditEvent, error) {
 | 
						|
	_, err := kubeclient.CoreV1().ConfigMaps(namespace).Get(name, metav1.GetOptions{})
 | 
						|
	if err != nil && !errors.IsNotFound(err) {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	expectedEvents := []utils.AuditEvent{
 | 
						|
		{
 | 
						|
			Level:             auditinternal.LevelRequestResponse,
 | 
						|
			Stage:             auditinternal.StageResponseComplete,
 | 
						|
			RequestURI:        fmt.Sprintf("/api/v1/namespaces/%s/configmaps/%s", namespace, name),
 | 
						|
			Verb:              "get",
 | 
						|
			Code:              404,
 | 
						|
			User:              auditTestUser,
 | 
						|
			Resource:          "configmaps",
 | 
						|
			Namespace:         namespace,
 | 
						|
			RequestObject:     false,
 | 
						|
			ResponseObject:    true,
 | 
						|
			AuthorizeDecision: "allow",
 | 
						|
		},
 | 
						|
	}
 | 
						|
	return expectedEvents, nil
 | 
						|
}
 | 
						|
 | 
						|
// asyncOps runs the simpleOp function until the stopChan is closed updating
 | 
						|
// the expected atomic events list
 | 
						|
func asyncOps(
 | 
						|
	stopChan <-chan struct{},
 | 
						|
	wg *sync.WaitGroup,
 | 
						|
	kubeclient kubernetes.Interface,
 | 
						|
	expected *atomic.Value,
 | 
						|
) {
 | 
						|
	for i := 0; ; i++ {
 | 
						|
		select {
 | 
						|
		case <-stopChan:
 | 
						|
			wg.Done()
 | 
						|
			return
 | 
						|
		default:
 | 
						|
			name := fmt.Sprintf("health-%d-%d", i, time.Now().UnixNano())
 | 
						|
			exp, err := simpleOp(name, kubeclient)
 | 
						|
			if err != nil {
 | 
						|
				// retry on errors
 | 
						|
				continue
 | 
						|
			}
 | 
						|
			e := expected.Load().([]utils.AuditEvent)
 | 
						|
			evList := []utils.AuditEvent{}
 | 
						|
			evList = append(e, exp...)
 | 
						|
			expected.Store(evList)
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 |