From 1dc251a1604b1576258f123ac8dd8390bba2e4a9 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Wed, 16 Aug 2017 10:03:52 +0200 Subject: [PATCH] audit: disable new v1beta1 types until incompatible changes are done --- .../k8s.io/apiserver/pkg/server/options/BUILD | 1 + .../apiserver/pkg/server/options/audit.go | 5 +- .../apiserver/plugin/pkg/audit/webhook/BUILD | 8 +- .../plugin/pkg/audit/webhook/webhook.go | 27 +- .../plugin/pkg/audit/webhook/webhook_test.go | 75 ++--- .../audit/webhook/webhook_v1alpha1_test.go | 290 ++++++++++++++++++ 6 files changed, 356 insertions(+), 50 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook_v1alpha1_test.go diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/BUILD b/staging/src/k8s.io/apiserver/pkg/server/options/BUILD index 0f6d6e8c606..f2fd351eee5 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/options/BUILD @@ -51,6 +51,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apiserver/pkg/admission:go_default_library", "//vendor/k8s.io/apiserver/pkg/admission/initializer:go_default_library", + "//vendor/k8s.io/apiserver/pkg/apis/audit/v1alpha1:go_default_library", "//vendor/k8s.io/apiserver/pkg/audit:go_default_library", "//vendor/k8s.io/apiserver/pkg/audit/policy:go_default_library", "//vendor/k8s.io/apiserver/pkg/authentication/authenticatorfactory:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/audit.go b/staging/src/k8s.io/apiserver/pkg/server/options/audit.go index 99c0e1789e3..9e673848bea 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/audit.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/audit.go @@ -25,6 +25,8 @@ import ( "github.com/spf13/pflag" "gopkg.in/natefinch/lumberjack.v2" + "k8s.io/apimachinery/pkg/runtime/schema" + auditv1alpha1 "k8s.io/apiserver/pkg/apis/audit/v1alpha1" "k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/audit/policy" "k8s.io/apiserver/pkg/features" @@ -237,7 +239,8 @@ func (o *AuditWebhookOptions) applyTo(c *server.Config) error { return nil } - webhook, err := pluginwebhook.NewBackend(o.ConfigFile, o.Mode) + // TODO: switch to beta + webhook, err := pluginwebhook.NewBackend(o.ConfigFile, o.Mode, []schema.GroupVersion{auditv1alpha1.SchemeGroupVersion}) if err != nil { return fmt.Errorf("initializing audit webhook: %v", err) } diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/BUILD b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/BUILD index 7610f164ea3..f27eb13cea4 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/BUILD +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/BUILD @@ -8,14 +8,19 @@ load( go_test( name = "go_default_test", - srcs = ["webhook_test.go"], + srcs = [ + "webhook_test.go", + "webhook_v1alpha1_test.go", + ], library = ":go_default_library", deps = [ "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/serializer/json:go_default_library", "//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library", + "//vendor/k8s.io/apiserver/pkg/apis/audit/v1alpha1:go_default_library", "//vendor/k8s.io/apiserver/pkg/apis/audit/v1beta1:go_default_library", "//vendor/k8s.io/apiserver/pkg/audit:go_default_library", "//vendor/k8s.io/client-go/tools/clientcmd/api/v1:go_default_library", @@ -34,6 +39,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library", "//vendor/k8s.io/apiserver/pkg/apis/audit/install:go_default_library", + "//vendor/k8s.io/apiserver/pkg/apis/audit/v1alpha1:go_default_library", "//vendor/k8s.io/apiserver/pkg/apis/audit/v1beta1:go_default_library", "//vendor/k8s.io/apiserver/pkg/audit:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/webhook:go_default_library", diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go index 295fe77312e..8d7f6b3a4e7 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go @@ -32,6 +32,7 @@ import ( "k8s.io/apimachinery/pkg/util/runtime" auditinternal "k8s.io/apiserver/pkg/apis/audit" "k8s.io/apiserver/pkg/apis/audit/install" + auditv1alpha1 "k8s.io/apiserver/pkg/apis/audit/v1alpha1" auditv1beta1 "k8s.io/apiserver/pkg/apis/audit/v1beta1" "k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/util/webhook" @@ -70,12 +71,12 @@ const pluginName = "webhook" // NewBackend returns an audit backend that sends events over HTTP to an external service. // The mode indicates the caching behavior of the webhook. Either blocking (ModeBlocking) // or buffered with batch POSTs (ModeBatch). -func NewBackend(kubeConfigFile string, mode string) (audit.Backend, error) { +func NewBackend(kubeConfigFile string, mode string, groupVersions []schema.GroupVersion) (audit.Backend, error) { switch mode { case ModeBatch: - return newBatchWebhook(kubeConfigFile) + return newBatchWebhook(kubeConfigFile, groupVersions) case ModeBlocking: - return newBlockingWebhook(kubeConfigFile) + return newBlockingWebhook(kubeConfigFile, groupVersions) default: return nil, fmt.Errorf("webhook mode %q is not in list of known modes (%s)", mode, strings.Join(AllowedModes, ",")) @@ -88,24 +89,24 @@ var ( // Can we make these passable to NewGenericWebhook? groupFactoryRegistry = make(announced.APIGroupFactoryRegistry) // TODO(audit): figure out a general way to let the client choose their preferred version - groupVersions = []schema.GroupVersion{auditv1beta1.SchemeGroupVersion} - registry = registered.NewOrDie("") + registry = registered.NewOrDie("") ) func init() { - registry.RegisterVersions(groupVersions) - if err := registry.EnableVersions(groupVersions...); err != nil { - panic(fmt.Sprintf("failed to enable version %v", groupVersions)) + allGVs := []schema.GroupVersion{auditv1alpha1.SchemeGroupVersion, auditv1beta1.SchemeGroupVersion} + registry.RegisterVersions(allGVs) + if err := registry.EnableVersions(allGVs...); err != nil { + panic(fmt.Sprintf("failed to enable version %v", allGVs)) } install.Install(groupFactoryRegistry, registry, audit.Scheme) } -func loadWebhook(configFile string) (*webhook.GenericWebhook, error) { +func loadWebhook(configFile string, groupVersions []schema.GroupVersion) (*webhook.GenericWebhook, error) { return webhook.NewGenericWebhook(registry, audit.Codecs, configFile, groupVersions, 0) } -func newBlockingWebhook(configFile string) (*blockingBackend, error) { - w, err := loadWebhook(configFile) +func newBlockingWebhook(configFile string, groupVersions []schema.GroupVersion) (*blockingBackend, error) { + w, err := loadWebhook(configFile, groupVersions) if err != nil { return nil, err } @@ -140,8 +141,8 @@ func (b *blockingBackend) processEvents(ev ...*auditinternal.Event) error { return b.w.RestClient.Post().Body(&list).Do().Error() } -func newBatchWebhook(configFile string) (*batchBackend, error) { - w, err := loadWebhook(configFile) +func newBatchWebhook(configFile string, groupVersions []schema.GroupVersion) (*batchBackend, error) { + w, err := loadWebhook(configFile, groupVersions) if err != nil { return nil, err } diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook_test.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook_test.go index a59d0cfdc47..d5af0e3e394 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook_test.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook_test.go @@ -24,6 +24,7 @@ import ( "net/http" "net/http/httptest" "os" + "reflect" "sync" "sync/atomic" "testing" @@ -33,6 +34,7 @@ import ( "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer/json" auditinternal "k8s.io/apiserver/pkg/apis/audit" auditv1beta1 "k8s.io/apiserver/pkg/apis/audit/v1beta1" @@ -42,10 +44,12 @@ import ( // newWebhookHandler returns a handler which recieves webhook events and decodes the // request body. The caller passes a callback which is called on each webhook POST. -func newWebhookHandler(t *testing.T, cb func(events *auditv1beta1.EventList)) http.Handler { +// The object passed to cb is of the same type as list. +func newWebhookHandler(t *testing.T, list runtime.Object, cb func(events runtime.Object)) http.Handler { s := json.NewSerializer(json.DefaultMetaFactory, audit.Scheme, audit.Scheme, false) return &testWebhookHandler{ t: t, + list: list, onEvents: cb, serializer: s, } @@ -54,7 +58,8 @@ func newWebhookHandler(t *testing.T, cb func(events *auditv1beta1.EventList)) ht type testWebhookHandler struct { t *testing.T - onEvents func(events *auditv1beta1.EventList) + list runtime.Object + onEvents func(events runtime.Object) serializer runtime.Serializer } @@ -66,15 +71,14 @@ func (t *testWebhookHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return fmt.Errorf("read webhook request body: %v", err) } - obj, _, err := t.serializer.Decode(body, nil, &auditv1beta1.EventList{}) + obj, _, err := t.serializer.Decode(body, nil, t.list.DeepCopyObject()) if err != nil { return fmt.Errorf("decode request body: %v", err) } - list, ok := obj.(*auditv1beta1.EventList) - if !ok { - return fmt.Errorf("expected *v1beta1.EventList got %T", obj) + if reflect.TypeOf(obj).Elem() != reflect.TypeOf(t.list).Elem() { + return fmt.Errorf("expected %T, got %T", t.list, obj) } - t.onEvents(list) + t.onEvents(obj) return nil }() @@ -87,15 +91,15 @@ func (t *testWebhookHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusInternalServerError) } -func newTestBlockingWebhook(t *testing.T, endpoint string) *blockingBackend { - return newWebhook(t, endpoint, ModeBlocking).(*blockingBackend) +func newTestBlockingWebhook(t *testing.T, endpoint string, groupVersions []schema.GroupVersion) *blockingBackend { + return newWebhook(t, endpoint, ModeBlocking, groupVersions).(*blockingBackend) } -func newTestBatchWebhook(t *testing.T, endpoint string) *batchBackend { - return newWebhook(t, endpoint, ModeBatch).(*batchBackend) +func newTestBatchWebhook(t *testing.T, endpoint string, groupVersions []schema.GroupVersion) *batchBackend { + return newWebhook(t, endpoint, ModeBatch, groupVersions).(*batchBackend) } -func newWebhook(t *testing.T, endpoint string, mode string) audit.Backend { +func newWebhook(t *testing.T, endpoint string, mode string, groupVersions []schema.GroupVersion) audit.Backend { config := v1.Config{ Clusters: []v1.NamedCluster{ {Cluster: v1.Cluster{Server: endpoint, InsecureSkipTLSVerify: true}}, @@ -112,7 +116,7 @@ func newWebhook(t *testing.T, endpoint string, mode string) audit.Backend { // NOTE(ericchiang): Do we need to use a proper serializer? require.NoError(t, stdjson.NewEncoder(f).Encode(config), "writing kubeconfig") - backend, err := NewBackend(f.Name(), mode) + backend, err := NewBackend(f.Name(), mode, groupVersions) require.NoError(t, err, "initializing backend") return backend @@ -122,12 +126,12 @@ func TestWebhook(t *testing.T) { gotEvents := false defer func() { require.True(t, gotEvents, "no events received") }() - s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1beta1.EventList) { + s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) { gotEvents = true })) defer s.Close() - backend := newTestBlockingWebhook(t, s.URL) + backend := newTestBlockingWebhook(t, s.URL, []schema.GroupVersion{auditv1beta1.SchemeGroupVersion}) // Ensure this doesn't return a serialization error. event := &auditinternal.Event{} @@ -151,12 +155,12 @@ func TestBatchWebhookMaxEvents(t *testing.T) { } got := make(chan int, 2) - s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1beta1.EventList) { - got <- len(events.Items) + s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) { + got <- len(events.(*auditv1beta1.EventList).Items) })) defer s.Close() - backend := newTestBatchWebhook(t, s.URL) + backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1beta1.SchemeGroupVersion}) backend.ProcessEvents(events...) @@ -183,12 +187,12 @@ func TestBatchWebhookStopCh(t *testing.T) { expected := len(events) got := make(chan int, 2) - s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1beta1.EventList) { - got <- len(events.Items) + s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) { + got <- len(events.(*auditv1beta1.EventList).Items) })) defer s.Close() - backend := newTestBatchWebhook(t, s.URL) + backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1beta1.SchemeGroupVersion}) backend.ProcessEvents(events...) stopCh := make(chan struct{}) @@ -209,12 +213,12 @@ func TestBatchWebhookProcessEventsAfterStop(t *testing.T) { } got := make(chan struct{}) - s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1beta1.EventList) { + s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) { close(got) })) defer s.Close() - backend := newTestBatchWebhook(t, s.URL) + backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1beta1.SchemeGroupVersion}) stopCh := make(chan struct{}) backend.Run(stopCh) @@ -233,13 +237,13 @@ func TestBatchWebhookShutdown(t *testing.T) { got := make(chan struct{}) contReqCh := make(chan struct{}) shutdownCh := make(chan struct{}) - s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1beta1.EventList) { + s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) { close(got) <-contReqCh })) defer s.Close() - backend := newTestBatchWebhook(t, s.URL) + backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1beta1.SchemeGroupVersion}) backend.ProcessEvents(events...) go func() { @@ -278,12 +282,12 @@ func TestBatchWebhookEmptyBuffer(t *testing.T) { expected := len(events) got := make(chan int, 2) - s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1beta1.EventList) { - got <- len(events.Items) + s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) { + got <- len(events.(*auditv1beta1.EventList).Items) })) defer s.Close() - backend := newTestBatchWebhook(t, s.URL) + backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1beta1.SchemeGroupVersion}) stopCh := make(chan struct{}) timer := make(chan time.Time, 1) @@ -311,12 +315,12 @@ func TestBatchBufferFull(t *testing.T) { for i := range events { events[i] = &auditinternal.Event{} } - s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1beta1.EventList) { + s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) { // Do nothing. })) defer s.Close() - backend := newTestBatchWebhook(t, s.URL) + backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1beta1.SchemeGroupVersion}) // Make sure this doesn't block. backend.ProcessEvents(events...) @@ -344,7 +348,8 @@ func TestBatchRun(t *testing.T) { close(done) }() - s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1beta1.EventList) { + s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(obj runtime.Object) { + events := obj.(*auditv1beta1.EventList) atomic.AddInt64(got, int64(len(events.Items))) wg.Add(-len(events.Items)) })) @@ -353,7 +358,7 @@ func TestBatchRun(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - backend := newTestBatchWebhook(t, s.URL) + backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1beta1.SchemeGroupVersion}) // Test the Run codepath. E.g. that the spawned goroutines behave correctly. backend.Run(stopCh) @@ -377,8 +382,8 @@ func TestBatchConcurrentRequests(t *testing.T) { wg := new(sync.WaitGroup) wg.Add(len(events)) - s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1beta1.EventList) { - wg.Add(-len(events.Items)) + s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) { + wg.Add(-len(events.(*auditv1beta1.EventList).Items)) // Since the webhook makes concurrent requests, blocking on the webhook response // shouldn't block the webhook from sending more events. @@ -391,7 +396,7 @@ func TestBatchConcurrentRequests(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - backend := newTestBatchWebhook(t, s.URL) + backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1beta1.SchemeGroupVersion}) backend.Run(stopCh) backend.ProcessEvents(events...) diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook_v1alpha1_test.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook_v1alpha1_test.go new file mode 100644 index 00000000000..99613804121 --- /dev/null +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook_v1alpha1_test.go @@ -0,0 +1,290 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package webhook + +import ( + "net/http/httptest" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + auditinternal "k8s.io/apiserver/pkg/apis/audit" + auditv1alpha1 "k8s.io/apiserver/pkg/apis/audit/v1alpha1" +) + +func TestBatchWebhookMaxEventsV1Alpha1(t *testing.T) { + nRest := 10 + events := make([]*auditinternal.Event, defaultBatchMaxSize+nRest) // greater than max size. + for i := range events { + events[i] = &auditinternal.Event{} + } + + got := make(chan int, 2) + s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) { + got <- len(events.(*auditv1alpha1.EventList).Items) + })) + defer s.Close() + + backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1alpha1.SchemeGroupVersion}) + + backend.ProcessEvents(events...) + + stopCh := make(chan struct{}) + timer := make(chan time.Time, 1) + + backend.sendBatchEvents(backend.collectEvents(stopCh, timer)) + require.Equal(t, defaultBatchMaxSize, <-got, "did not get batch max size") + + go func() { + waitForEmptyBuffer(backend) // wait for the buffer to empty + timer <- time.Now() // Trigger the wait timeout + }() + + backend.sendBatchEvents(backend.collectEvents(stopCh, timer)) + require.Equal(t, nRest, <-got, "failed to get the rest of the events") +} + +func TestBatchWebhookStopChV1Alpha1(t *testing.T) { + events := make([]*auditinternal.Event, 1) // less than max size. + for i := range events { + events[i] = &auditinternal.Event{} + } + + expected := len(events) + got := make(chan int, 2) + s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) { + got <- len(events.(*auditv1alpha1.EventList).Items) + })) + defer s.Close() + + backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1alpha1.SchemeGroupVersion}) + backend.ProcessEvents(events...) + + stopCh := make(chan struct{}) + timer := make(chan time.Time) + + go func() { + waitForEmptyBuffer(backend) + close(stopCh) // stop channel has stopped + }() + backend.sendBatchEvents(backend.collectEvents(stopCh, timer)) + require.Equal(t, expected, <-got, "get queued events after timer expires") +} + +func TestBatchWebhookProcessEventsAfterStopV1Alpha1(t *testing.T) { + events := make([]*auditinternal.Event, 1) // less than max size. + for i := range events { + events[i] = &auditinternal.Event{} + } + + got := make(chan struct{}) + s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) { + close(got) + })) + defer s.Close() + + backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1alpha1.SchemeGroupVersion}) + stopCh := make(chan struct{}) + + backend.Run(stopCh) + close(stopCh) + <-backend.shutdownCh + backend.ProcessEvents(events...) + assert.Equal(t, 0, len(backend.buffer), "processed events after the backed has been stopped") +} + +func TestBatchWebhookShutdownV1Alpha1(t *testing.T) { + events := make([]*auditinternal.Event, 1) + for i := range events { + events[i] = &auditinternal.Event{} + } + + got := make(chan struct{}) + contReqCh := make(chan struct{}) + shutdownCh := make(chan struct{}) + s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) { + close(got) + <-contReqCh + })) + defer s.Close() + + backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1alpha1.SchemeGroupVersion}) + backend.ProcessEvents(events...) + + go func() { + // Assume stopCh was closed. + close(backend.buffer) + backend.sendBatchEvents(backend.collectLastEvents()) + }() + + <-got + + go func() { + close(backend.shutdownCh) + backend.Shutdown() + close(shutdownCh) + }() + + // Wait for some time in case there's a bug that allows for the Shutdown + // method to exit before all requests has been completed. + time.Sleep(1 * time.Second) + select { + case <-shutdownCh: + t.Fatal("Backend shut down before all requests finished") + default: + // Continue. + } + + close(contReqCh) + <-shutdownCh +} + +func TestBatchWebhookEmptyBufferV1Alpha1(t *testing.T) { + events := make([]*auditinternal.Event, 1) // less than max size. + for i := range events { + events[i] = &auditinternal.Event{} + } + + expected := len(events) + got := make(chan int, 2) + s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) { + got <- len(events.(*auditv1alpha1.EventList).Items) + })) + defer s.Close() + + backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1alpha1.SchemeGroupVersion}) + + stopCh := make(chan struct{}) + timer := make(chan time.Time, 1) + + timer <- time.Now() // Timer is done. + + // Buffer is empty, no events have been queued. This should exit but send no events. + backend.sendBatchEvents(backend.collectEvents(stopCh, timer)) + + // Send additional events after the sendBatchEvents has been called. + backend.ProcessEvents(events...) + go func() { + waitForEmptyBuffer(backend) + timer <- time.Now() + }() + + backend.sendBatchEvents(backend.collectEvents(stopCh, timer)) + + // Make sure we didn't get a POST with zero events. + require.Equal(t, expected, <-got, "expected one event") +} + +func TestBatchBufferFullV1Alpha1(t *testing.T) { + events := make([]*auditinternal.Event, defaultBatchBufferSize+1) // More than buffered size + for i := range events { + events[i] = &auditinternal.Event{} + } + s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) { + // Do nothing. + })) + defer s.Close() + + backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1alpha1.SchemeGroupVersion}) + + // Make sure this doesn't block. + backend.ProcessEvents(events...) +} + +func TestBatchRunV1Alpha1(t *testing.T) { + + // Divisable by max batch size so we don't have to wait for a minute for + // the test to finish. + events := make([]*auditinternal.Event, defaultBatchMaxSize*3) + for i := range events { + events[i] = &auditinternal.Event{} + } + + got := new(int64) + want := len(events) + + wg := new(sync.WaitGroup) + wg.Add(want) + done := make(chan struct{}) + + go func() { + wg.Wait() + // When the expected number of events have been received, close the channel. + close(done) + }() + + s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(obj runtime.Object) { + events := obj.(*auditv1alpha1.EventList) + atomic.AddInt64(got, int64(len(events.Items))) + wg.Add(-len(events.Items)) + })) + defer s.Close() + + stopCh := make(chan struct{}) + defer close(stopCh) + + backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1alpha1.SchemeGroupVersion}) + + // Test the Run codepath. E.g. that the spawned goroutines behave correctly. + backend.Run(stopCh) + + backend.ProcessEvents(events...) + + select { + case <-done: + // Received all the events. + case <-time.After(2 * time.Minute): + t.Errorf("expected %d events got %d", want, atomic.LoadInt64(got)) + } +} + +func TestBatchConcurrentRequestsV1Alpha1(t *testing.T) { + events := make([]*auditinternal.Event, defaultBatchBufferSize) // Don't drop events + for i := range events { + events[i] = &auditinternal.Event{} + } + + wg := new(sync.WaitGroup) + wg.Add(len(events)) + + s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) { + wg.Add(-len(events.(*auditv1alpha1.EventList).Items)) + + // Since the webhook makes concurrent requests, blocking on the webhook response + // shouldn't block the webhook from sending more events. + // + // Wait for all responses to be received before sending the response. + wg.Wait() + })) + defer s.Close() + + stopCh := make(chan struct{}) + defer close(stopCh) + + backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1alpha1.SchemeGroupVersion}) + backend.Run(stopCh) + + backend.ProcessEvents(events...) + // Wait for the webhook to receive all events. + wg.Wait() +}