audit: disable new v1beta1 types until incompatible changes are done

This commit is contained in:
Dr. Stefan Schimanski 2017-08-16 10:03:52 +02:00 committed by Cao Shufeng
parent 0410221c3f
commit 1dc251a160
6 changed files with 356 additions and 50 deletions

View File

@ -51,6 +51,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apiserver/pkg/admission:go_default_library", "//vendor/k8s.io/apiserver/pkg/admission:go_default_library",
"//vendor/k8s.io/apiserver/pkg/admission/initializer:go_default_library", "//vendor/k8s.io/apiserver/pkg/admission/initializer:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit/v1alpha1:go_default_library",
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library", "//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/audit/policy:go_default_library", "//vendor/k8s.io/apiserver/pkg/audit/policy:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authentication/authenticatorfactory:go_default_library", "//vendor/k8s.io/apiserver/pkg/authentication/authenticatorfactory:go_default_library",

View File

@ -25,6 +25,8 @@ import (
"github.com/spf13/pflag" "github.com/spf13/pflag"
"gopkg.in/natefinch/lumberjack.v2" "gopkg.in/natefinch/lumberjack.v2"
"k8s.io/apimachinery/pkg/runtime/schema"
auditv1alpha1 "k8s.io/apiserver/pkg/apis/audit/v1alpha1"
"k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/audit/policy" "k8s.io/apiserver/pkg/audit/policy"
"k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/features"
@ -237,7 +239,8 @@ func (o *AuditWebhookOptions) applyTo(c *server.Config) error {
return nil return nil
} }
webhook, err := pluginwebhook.NewBackend(o.ConfigFile, o.Mode) // TODO: switch to beta
webhook, err := pluginwebhook.NewBackend(o.ConfigFile, o.Mode, []schema.GroupVersion{auditv1alpha1.SchemeGroupVersion})
if err != nil { if err != nil {
return fmt.Errorf("initializing audit webhook: %v", err) return fmt.Errorf("initializing audit webhook: %v", err)
} }

View File

@ -8,14 +8,19 @@ load(
go_test( go_test(
name = "go_default_test", name = "go_default_test",
srcs = ["webhook_test.go"], srcs = [
"webhook_test.go",
"webhook_v1alpha1_test.go",
],
library = ":go_default_library", library = ":go_default_library",
deps = [ deps = [
"//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer/json:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/serializer/json:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library", "//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit/v1alpha1:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit/v1beta1:go_default_library", "//vendor/k8s.io/apiserver/pkg/apis/audit/v1beta1:go_default_library",
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library", "//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd/api/v1:go_default_library", "//vendor/k8s.io/client-go/tools/clientcmd/api/v1:go_default_library",
@ -34,6 +39,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library", "//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit/install:go_default_library", "//vendor/k8s.io/apiserver/pkg/apis/audit/install:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit/v1alpha1:go_default_library",
"//vendor/k8s.io/apiserver/pkg/apis/audit/v1beta1:go_default_library", "//vendor/k8s.io/apiserver/pkg/apis/audit/v1beta1:go_default_library",
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library", "//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/webhook:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/webhook:go_default_library",

View File

@ -32,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/runtime"
auditinternal "k8s.io/apiserver/pkg/apis/audit" auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/apis/audit/install" "k8s.io/apiserver/pkg/apis/audit/install"
auditv1alpha1 "k8s.io/apiserver/pkg/apis/audit/v1alpha1"
auditv1beta1 "k8s.io/apiserver/pkg/apis/audit/v1beta1" auditv1beta1 "k8s.io/apiserver/pkg/apis/audit/v1beta1"
"k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/util/webhook" "k8s.io/apiserver/pkg/util/webhook"
@ -70,12 +71,12 @@ const pluginName = "webhook"
// NewBackend returns an audit backend that sends events over HTTP to an external service. // NewBackend returns an audit backend that sends events over HTTP to an external service.
// The mode indicates the caching behavior of the webhook. Either blocking (ModeBlocking) // The mode indicates the caching behavior of the webhook. Either blocking (ModeBlocking)
// or buffered with batch POSTs (ModeBatch). // or buffered with batch POSTs (ModeBatch).
func NewBackend(kubeConfigFile string, mode string) (audit.Backend, error) { func NewBackend(kubeConfigFile string, mode string, groupVersions []schema.GroupVersion) (audit.Backend, error) {
switch mode { switch mode {
case ModeBatch: case ModeBatch:
return newBatchWebhook(kubeConfigFile) return newBatchWebhook(kubeConfigFile, groupVersions)
case ModeBlocking: case ModeBlocking:
return newBlockingWebhook(kubeConfigFile) return newBlockingWebhook(kubeConfigFile, groupVersions)
default: default:
return nil, fmt.Errorf("webhook mode %q is not in list of known modes (%s)", return nil, fmt.Errorf("webhook mode %q is not in list of known modes (%s)",
mode, strings.Join(AllowedModes, ",")) mode, strings.Join(AllowedModes, ","))
@ -88,24 +89,24 @@ var (
// Can we make these passable to NewGenericWebhook? // Can we make these passable to NewGenericWebhook?
groupFactoryRegistry = make(announced.APIGroupFactoryRegistry) groupFactoryRegistry = make(announced.APIGroupFactoryRegistry)
// TODO(audit): figure out a general way to let the client choose their preferred version // TODO(audit): figure out a general way to let the client choose their preferred version
groupVersions = []schema.GroupVersion{auditv1beta1.SchemeGroupVersion} registry = registered.NewOrDie("")
registry = registered.NewOrDie("")
) )
func init() { func init() {
registry.RegisterVersions(groupVersions) allGVs := []schema.GroupVersion{auditv1alpha1.SchemeGroupVersion, auditv1beta1.SchemeGroupVersion}
if err := registry.EnableVersions(groupVersions...); err != nil { registry.RegisterVersions(allGVs)
panic(fmt.Sprintf("failed to enable version %v", groupVersions)) if err := registry.EnableVersions(allGVs...); err != nil {
panic(fmt.Sprintf("failed to enable version %v", allGVs))
} }
install.Install(groupFactoryRegistry, registry, audit.Scheme) install.Install(groupFactoryRegistry, registry, audit.Scheme)
} }
func loadWebhook(configFile string) (*webhook.GenericWebhook, error) { func loadWebhook(configFile string, groupVersions []schema.GroupVersion) (*webhook.GenericWebhook, error) {
return webhook.NewGenericWebhook(registry, audit.Codecs, configFile, groupVersions, 0) return webhook.NewGenericWebhook(registry, audit.Codecs, configFile, groupVersions, 0)
} }
func newBlockingWebhook(configFile string) (*blockingBackend, error) { func newBlockingWebhook(configFile string, groupVersions []schema.GroupVersion) (*blockingBackend, error) {
w, err := loadWebhook(configFile) w, err := loadWebhook(configFile, groupVersions)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -140,8 +141,8 @@ func (b *blockingBackend) processEvents(ev ...*auditinternal.Event) error {
return b.w.RestClient.Post().Body(&list).Do().Error() return b.w.RestClient.Post().Body(&list).Do().Error()
} }
func newBatchWebhook(configFile string) (*batchBackend, error) { func newBatchWebhook(configFile string, groupVersions []schema.GroupVersion) (*batchBackend, error) {
w, err := loadWebhook(configFile) w, err := loadWebhook(configFile, groupVersions)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -24,6 +24,7 @@ import (
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"os" "os"
"reflect"
"sync" "sync"
"sync/atomic" "sync/atomic"
"testing" "testing"
@ -33,6 +34,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/json" "k8s.io/apimachinery/pkg/runtime/serializer/json"
auditinternal "k8s.io/apiserver/pkg/apis/audit" auditinternal "k8s.io/apiserver/pkg/apis/audit"
auditv1beta1 "k8s.io/apiserver/pkg/apis/audit/v1beta1" auditv1beta1 "k8s.io/apiserver/pkg/apis/audit/v1beta1"
@ -42,10 +44,12 @@ import (
// newWebhookHandler returns a handler which recieves webhook events and decodes the // newWebhookHandler returns a handler which recieves webhook events and decodes the
// request body. The caller passes a callback which is called on each webhook POST. // request body. The caller passes a callback which is called on each webhook POST.
func newWebhookHandler(t *testing.T, cb func(events *auditv1beta1.EventList)) http.Handler { // The object passed to cb is of the same type as list.
func newWebhookHandler(t *testing.T, list runtime.Object, cb func(events runtime.Object)) http.Handler {
s := json.NewSerializer(json.DefaultMetaFactory, audit.Scheme, audit.Scheme, false) s := json.NewSerializer(json.DefaultMetaFactory, audit.Scheme, audit.Scheme, false)
return &testWebhookHandler{ return &testWebhookHandler{
t: t, t: t,
list: list,
onEvents: cb, onEvents: cb,
serializer: s, serializer: s,
} }
@ -54,7 +58,8 @@ func newWebhookHandler(t *testing.T, cb func(events *auditv1beta1.EventList)) ht
type testWebhookHandler struct { type testWebhookHandler struct {
t *testing.T t *testing.T
onEvents func(events *auditv1beta1.EventList) list runtime.Object
onEvents func(events runtime.Object)
serializer runtime.Serializer serializer runtime.Serializer
} }
@ -66,15 +71,14 @@ func (t *testWebhookHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return fmt.Errorf("read webhook request body: %v", err) return fmt.Errorf("read webhook request body: %v", err)
} }
obj, _, err := t.serializer.Decode(body, nil, &auditv1beta1.EventList{}) obj, _, err := t.serializer.Decode(body, nil, t.list.DeepCopyObject())
if err != nil { if err != nil {
return fmt.Errorf("decode request body: %v", err) return fmt.Errorf("decode request body: %v", err)
} }
list, ok := obj.(*auditv1beta1.EventList) if reflect.TypeOf(obj).Elem() != reflect.TypeOf(t.list).Elem() {
if !ok { return fmt.Errorf("expected %T, got %T", t.list, obj)
return fmt.Errorf("expected *v1beta1.EventList got %T", obj)
} }
t.onEvents(list) t.onEvents(obj)
return nil return nil
}() }()
@ -87,15 +91,15 @@ func (t *testWebhookHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
} }
func newTestBlockingWebhook(t *testing.T, endpoint string) *blockingBackend { func newTestBlockingWebhook(t *testing.T, endpoint string, groupVersions []schema.GroupVersion) *blockingBackend {
return newWebhook(t, endpoint, ModeBlocking).(*blockingBackend) return newWebhook(t, endpoint, ModeBlocking, groupVersions).(*blockingBackend)
} }
func newTestBatchWebhook(t *testing.T, endpoint string) *batchBackend { func newTestBatchWebhook(t *testing.T, endpoint string, groupVersions []schema.GroupVersion) *batchBackend {
return newWebhook(t, endpoint, ModeBatch).(*batchBackend) return newWebhook(t, endpoint, ModeBatch, groupVersions).(*batchBackend)
} }
func newWebhook(t *testing.T, endpoint string, mode string) audit.Backend { func newWebhook(t *testing.T, endpoint string, mode string, groupVersions []schema.GroupVersion) audit.Backend {
config := v1.Config{ config := v1.Config{
Clusters: []v1.NamedCluster{ Clusters: []v1.NamedCluster{
{Cluster: v1.Cluster{Server: endpoint, InsecureSkipTLSVerify: true}}, {Cluster: v1.Cluster{Server: endpoint, InsecureSkipTLSVerify: true}},
@ -112,7 +116,7 @@ func newWebhook(t *testing.T, endpoint string, mode string) audit.Backend {
// NOTE(ericchiang): Do we need to use a proper serializer? // NOTE(ericchiang): Do we need to use a proper serializer?
require.NoError(t, stdjson.NewEncoder(f).Encode(config), "writing kubeconfig") require.NoError(t, stdjson.NewEncoder(f).Encode(config), "writing kubeconfig")
backend, err := NewBackend(f.Name(), mode) backend, err := NewBackend(f.Name(), mode, groupVersions)
require.NoError(t, err, "initializing backend") require.NoError(t, err, "initializing backend")
return backend return backend
@ -122,12 +126,12 @@ func TestWebhook(t *testing.T) {
gotEvents := false gotEvents := false
defer func() { require.True(t, gotEvents, "no events received") }() defer func() { require.True(t, gotEvents, "no events received") }()
s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1beta1.EventList) { s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) {
gotEvents = true gotEvents = true
})) }))
defer s.Close() defer s.Close()
backend := newTestBlockingWebhook(t, s.URL) backend := newTestBlockingWebhook(t, s.URL, []schema.GroupVersion{auditv1beta1.SchemeGroupVersion})
// Ensure this doesn't return a serialization error. // Ensure this doesn't return a serialization error.
event := &auditinternal.Event{} event := &auditinternal.Event{}
@ -151,12 +155,12 @@ func TestBatchWebhookMaxEvents(t *testing.T) {
} }
got := make(chan int, 2) got := make(chan int, 2)
s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1beta1.EventList) { s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) {
got <- len(events.Items) got <- len(events.(*auditv1beta1.EventList).Items)
})) }))
defer s.Close() defer s.Close()
backend := newTestBatchWebhook(t, s.URL) backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1beta1.SchemeGroupVersion})
backend.ProcessEvents(events...) backend.ProcessEvents(events...)
@ -183,12 +187,12 @@ func TestBatchWebhookStopCh(t *testing.T) {
expected := len(events) expected := len(events)
got := make(chan int, 2) got := make(chan int, 2)
s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1beta1.EventList) { s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) {
got <- len(events.Items) got <- len(events.(*auditv1beta1.EventList).Items)
})) }))
defer s.Close() defer s.Close()
backend := newTestBatchWebhook(t, s.URL) backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1beta1.SchemeGroupVersion})
backend.ProcessEvents(events...) backend.ProcessEvents(events...)
stopCh := make(chan struct{}) stopCh := make(chan struct{})
@ -209,12 +213,12 @@ func TestBatchWebhookProcessEventsAfterStop(t *testing.T) {
} }
got := make(chan struct{}) got := make(chan struct{})
s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1beta1.EventList) { s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) {
close(got) close(got)
})) }))
defer s.Close() defer s.Close()
backend := newTestBatchWebhook(t, s.URL) backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1beta1.SchemeGroupVersion})
stopCh := make(chan struct{}) stopCh := make(chan struct{})
backend.Run(stopCh) backend.Run(stopCh)
@ -233,13 +237,13 @@ func TestBatchWebhookShutdown(t *testing.T) {
got := make(chan struct{}) got := make(chan struct{})
contReqCh := make(chan struct{}) contReqCh := make(chan struct{})
shutdownCh := make(chan struct{}) shutdownCh := make(chan struct{})
s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1beta1.EventList) { s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) {
close(got) close(got)
<-contReqCh <-contReqCh
})) }))
defer s.Close() defer s.Close()
backend := newTestBatchWebhook(t, s.URL) backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1beta1.SchemeGroupVersion})
backend.ProcessEvents(events...) backend.ProcessEvents(events...)
go func() { go func() {
@ -278,12 +282,12 @@ func TestBatchWebhookEmptyBuffer(t *testing.T) {
expected := len(events) expected := len(events)
got := make(chan int, 2) got := make(chan int, 2)
s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1beta1.EventList) { s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) {
got <- len(events.Items) got <- len(events.(*auditv1beta1.EventList).Items)
})) }))
defer s.Close() defer s.Close()
backend := newTestBatchWebhook(t, s.URL) backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1beta1.SchemeGroupVersion})
stopCh := make(chan struct{}) stopCh := make(chan struct{})
timer := make(chan time.Time, 1) timer := make(chan time.Time, 1)
@ -311,12 +315,12 @@ func TestBatchBufferFull(t *testing.T) {
for i := range events { for i := range events {
events[i] = &auditinternal.Event{} events[i] = &auditinternal.Event{}
} }
s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1beta1.EventList) { s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) {
// Do nothing. // Do nothing.
})) }))
defer s.Close() defer s.Close()
backend := newTestBatchWebhook(t, s.URL) backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1beta1.SchemeGroupVersion})
// Make sure this doesn't block. // Make sure this doesn't block.
backend.ProcessEvents(events...) backend.ProcessEvents(events...)
@ -344,7 +348,8 @@ func TestBatchRun(t *testing.T) {
close(done) close(done)
}() }()
s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1beta1.EventList) { s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(obj runtime.Object) {
events := obj.(*auditv1beta1.EventList)
atomic.AddInt64(got, int64(len(events.Items))) atomic.AddInt64(got, int64(len(events.Items)))
wg.Add(-len(events.Items)) wg.Add(-len(events.Items))
})) }))
@ -353,7 +358,7 @@ func TestBatchRun(t *testing.T) {
stopCh := make(chan struct{}) stopCh := make(chan struct{})
defer close(stopCh) defer close(stopCh)
backend := newTestBatchWebhook(t, s.URL) backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1beta1.SchemeGroupVersion})
// Test the Run codepath. E.g. that the spawned goroutines behave correctly. // Test the Run codepath. E.g. that the spawned goroutines behave correctly.
backend.Run(stopCh) backend.Run(stopCh)
@ -377,8 +382,8 @@ func TestBatchConcurrentRequests(t *testing.T) {
wg := new(sync.WaitGroup) wg := new(sync.WaitGroup)
wg.Add(len(events)) wg.Add(len(events))
s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1beta1.EventList) { s := httptest.NewServer(newWebhookHandler(t, &auditv1beta1.EventList{}, func(events runtime.Object) {
wg.Add(-len(events.Items)) wg.Add(-len(events.(*auditv1beta1.EventList).Items))
// Since the webhook makes concurrent requests, blocking on the webhook response // Since the webhook makes concurrent requests, blocking on the webhook response
// shouldn't block the webhook from sending more events. // shouldn't block the webhook from sending more events.
@ -391,7 +396,7 @@ func TestBatchConcurrentRequests(t *testing.T) {
stopCh := make(chan struct{}) stopCh := make(chan struct{})
defer close(stopCh) defer close(stopCh)
backend := newTestBatchWebhook(t, s.URL) backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1beta1.SchemeGroupVersion})
backend.Run(stopCh) backend.Run(stopCh)
backend.ProcessEvents(events...) backend.ProcessEvents(events...)

View File

@ -0,0 +1,290 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package webhook
import (
"net/http/httptest"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
auditv1alpha1 "k8s.io/apiserver/pkg/apis/audit/v1alpha1"
)
func TestBatchWebhookMaxEventsV1Alpha1(t *testing.T) {
nRest := 10
events := make([]*auditinternal.Event, defaultBatchMaxSize+nRest) // greater than max size.
for i := range events {
events[i] = &auditinternal.Event{}
}
got := make(chan int, 2)
s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) {
got <- len(events.(*auditv1alpha1.EventList).Items)
}))
defer s.Close()
backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1alpha1.SchemeGroupVersion})
backend.ProcessEvents(events...)
stopCh := make(chan struct{})
timer := make(chan time.Time, 1)
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
require.Equal(t, defaultBatchMaxSize, <-got, "did not get batch max size")
go func() {
waitForEmptyBuffer(backend) // wait for the buffer to empty
timer <- time.Now() // Trigger the wait timeout
}()
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
require.Equal(t, nRest, <-got, "failed to get the rest of the events")
}
func TestBatchWebhookStopChV1Alpha1(t *testing.T) {
events := make([]*auditinternal.Event, 1) // less than max size.
for i := range events {
events[i] = &auditinternal.Event{}
}
expected := len(events)
got := make(chan int, 2)
s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) {
got <- len(events.(*auditv1alpha1.EventList).Items)
}))
defer s.Close()
backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1alpha1.SchemeGroupVersion})
backend.ProcessEvents(events...)
stopCh := make(chan struct{})
timer := make(chan time.Time)
go func() {
waitForEmptyBuffer(backend)
close(stopCh) // stop channel has stopped
}()
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
require.Equal(t, expected, <-got, "get queued events after timer expires")
}
func TestBatchWebhookProcessEventsAfterStopV1Alpha1(t *testing.T) {
events := make([]*auditinternal.Event, 1) // less than max size.
for i := range events {
events[i] = &auditinternal.Event{}
}
got := make(chan struct{})
s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) {
close(got)
}))
defer s.Close()
backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1alpha1.SchemeGroupVersion})
stopCh := make(chan struct{})
backend.Run(stopCh)
close(stopCh)
<-backend.shutdownCh
backend.ProcessEvents(events...)
assert.Equal(t, 0, len(backend.buffer), "processed events after the backed has been stopped")
}
func TestBatchWebhookShutdownV1Alpha1(t *testing.T) {
events := make([]*auditinternal.Event, 1)
for i := range events {
events[i] = &auditinternal.Event{}
}
got := make(chan struct{})
contReqCh := make(chan struct{})
shutdownCh := make(chan struct{})
s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) {
close(got)
<-contReqCh
}))
defer s.Close()
backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1alpha1.SchemeGroupVersion})
backend.ProcessEvents(events...)
go func() {
// Assume stopCh was closed.
close(backend.buffer)
backend.sendBatchEvents(backend.collectLastEvents())
}()
<-got
go func() {
close(backend.shutdownCh)
backend.Shutdown()
close(shutdownCh)
}()
// Wait for some time in case there's a bug that allows for the Shutdown
// method to exit before all requests has been completed.
time.Sleep(1 * time.Second)
select {
case <-shutdownCh:
t.Fatal("Backend shut down before all requests finished")
default:
// Continue.
}
close(contReqCh)
<-shutdownCh
}
func TestBatchWebhookEmptyBufferV1Alpha1(t *testing.T) {
events := make([]*auditinternal.Event, 1) // less than max size.
for i := range events {
events[i] = &auditinternal.Event{}
}
expected := len(events)
got := make(chan int, 2)
s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) {
got <- len(events.(*auditv1alpha1.EventList).Items)
}))
defer s.Close()
backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1alpha1.SchemeGroupVersion})
stopCh := make(chan struct{})
timer := make(chan time.Time, 1)
timer <- time.Now() // Timer is done.
// Buffer is empty, no events have been queued. This should exit but send no events.
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
// Send additional events after the sendBatchEvents has been called.
backend.ProcessEvents(events...)
go func() {
waitForEmptyBuffer(backend)
timer <- time.Now()
}()
backend.sendBatchEvents(backend.collectEvents(stopCh, timer))
// Make sure we didn't get a POST with zero events.
require.Equal(t, expected, <-got, "expected one event")
}
func TestBatchBufferFullV1Alpha1(t *testing.T) {
events := make([]*auditinternal.Event, defaultBatchBufferSize+1) // More than buffered size
for i := range events {
events[i] = &auditinternal.Event{}
}
s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) {
// Do nothing.
}))
defer s.Close()
backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1alpha1.SchemeGroupVersion})
// Make sure this doesn't block.
backend.ProcessEvents(events...)
}
func TestBatchRunV1Alpha1(t *testing.T) {
// Divisable by max batch size so we don't have to wait for a minute for
// the test to finish.
events := make([]*auditinternal.Event, defaultBatchMaxSize*3)
for i := range events {
events[i] = &auditinternal.Event{}
}
got := new(int64)
want := len(events)
wg := new(sync.WaitGroup)
wg.Add(want)
done := make(chan struct{})
go func() {
wg.Wait()
// When the expected number of events have been received, close the channel.
close(done)
}()
s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(obj runtime.Object) {
events := obj.(*auditv1alpha1.EventList)
atomic.AddInt64(got, int64(len(events.Items)))
wg.Add(-len(events.Items))
}))
defer s.Close()
stopCh := make(chan struct{})
defer close(stopCh)
backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1alpha1.SchemeGroupVersion})
// Test the Run codepath. E.g. that the spawned goroutines behave correctly.
backend.Run(stopCh)
backend.ProcessEvents(events...)
select {
case <-done:
// Received all the events.
case <-time.After(2 * time.Minute):
t.Errorf("expected %d events got %d", want, atomic.LoadInt64(got))
}
}
func TestBatchConcurrentRequestsV1Alpha1(t *testing.T) {
events := make([]*auditinternal.Event, defaultBatchBufferSize) // Don't drop events
for i := range events {
events[i] = &auditinternal.Event{}
}
wg := new(sync.WaitGroup)
wg.Add(len(events))
s := httptest.NewServer(newWebhookHandler(t, &auditv1alpha1.EventList{}, func(events runtime.Object) {
wg.Add(-len(events.(*auditv1alpha1.EventList).Items))
// Since the webhook makes concurrent requests, blocking on the webhook response
// shouldn't block the webhook from sending more events.
//
// Wait for all responses to be received before sending the response.
wg.Wait()
}))
defer s.Close()
stopCh := make(chan struct{})
defer close(stopCh)
backend := newTestBatchWebhook(t, s.URL, []schema.GroupVersion{auditv1alpha1.SchemeGroupVersion})
backend.Run(stopCh)
backend.ProcessEvents(events...)
// Wait for the webhook to receive all events.
wg.Wait()
}