diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/controller_test.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/controller_test.go new file mode 100644 index 00000000000..ad78dfbabfc --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic/controller_test.go @@ -0,0 +1,516 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package generic_test + +import ( + "context" + "errors" + "fmt" + "reflect" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" + + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + + "k8s.io/apiserver/pkg/admission/plugin/cel/internal/generic" + + clienttesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" +) + +type testInformer struct { + cache.SharedIndexInformer + + lock sync.Mutex + registrations map[interface{}]struct{} +} + +func (t *testInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) { + res, err := t.SharedIndexInformer.AddEventHandler(handler) + if err != nil { + return res, err + } + + func() { + t.lock.Lock() + defer t.lock.Unlock() + if t.registrations == nil { + t.registrations = make(map[interface{}]struct{}) + } + t.registrations[res] = struct{}{} + }() + + return res, err +} + +func (t *testInformer) RemoveEventHandler(registration cache.ResourceEventHandlerRegistration) error { + func() { + t.lock.Lock() + defer t.lock.Unlock() + + if _, ok := t.registrations[registration]; !ok { + panic("removing unknown event handler?") + } + delete(t.registrations, registration) + }() + + return t.SharedIndexInformer.RemoveEventHandler(registration) +} + +var ( + scheme *runtime.Scheme = runtime.NewScheme() + codecs serializer.CodecFactory = serializer.NewCodecFactory(scheme) + fakeGVR schema.GroupVersionResource = schema.GroupVersionResource{ + Group: "fake.example.com", + Version: "v1", + Resource: "fakes", + } + fakeGVK schema.GroupVersionKind = fakeGVR.GroupVersion().WithKind("Fake") + fakeGVKList schema.GroupVersionKind = fakeGVR.GroupVersion().WithKind("FakeList") +) + +func init() { + scheme.AddKnownTypeWithName(fakeGVK, &unstructured.Unstructured{}) + scheme.AddKnownTypeWithName(fakeGVKList, &unstructured.UnstructuredList{}) +} + +func setupTest(ctx context.Context, customReconciler func(string, string, runtime.Object) error) ( + tracker clienttesting.ObjectTracker, + controller generic.Controller[*unstructured.Unstructured], + informer *testInformer, + waitForReconcile func(runtime.Object) error, +) { + tracker = clienttesting.NewObjectTracker(scheme, codecs.UniversalDecoder()) + reconciledObjects := make(chan runtime.Object) + + // Set up fake informers that return instances of mock Policy definitoins + // and mock policy bindings + informer = &testInformer{SharedIndexInformer: cache.NewSharedIndexInformer(&cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return tracker.List(fakeGVR, fakeGVK, "") + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return tracker.Watch(fakeGVR, "") + }, + }, &unstructured.Unstructured{}, 30*time.Second, nil)} + + reconciler := func(namespace, name string, newObj *unstructured.Unstructured) error { + var err error + copied := newObj.DeepCopyObject() + if customReconciler != nil { + err = customReconciler(namespace, name, newObj) + } + reconciledObjects <- copied + return err + } + + waitForReconcile = func(obj runtime.Object) error { + select { + case reconciledObj := <-reconciledObjects: + if reflect.DeepEqual(obj, reconciledObj) { + return nil + } + return fmt.Errorf("expected equal objects: %v", cmp.Diff(obj, reconciledObj)) + case <-ctx.Done(): + return fmt.Errorf("context done before reconcile: %w", ctx.Err()) + } + } + + myController := generic.NewController( + generic.NewInformer[*unstructured.Unstructured](informer), + reconciler, + generic.ControllerOptions{}, + ) + + go func() { + <-ctx.Done() + close(reconciledObjects) + + for leftover := range reconciledObjects { + panic(fmt.Errorf("leftover object which was not anticipated by test: %v", leftover)) + } + }() + + return tracker, myController, informer, waitForReconcile +} + +func TestReconcile(t *testing.T) { + testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer testCancel() + + tracker, myController, informer, waitForReconcile := setupTest(testContext, nil) + + // Add object to informer + initialObject := &unstructured.Unstructured{} + initialObject.SetUnstructuredContent(map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "object1", + "resourceVersion": "1", + }, + }) + initialObject.SetGroupVersionKind(fakeGVK) + + require.NoError(t, tracker.Add(initialObject)) + + wg := sync.WaitGroup{} + + // Start informer + wg.Add(1) + go func() { + defer wg.Done() + informer.Run(testContext.Done()) + }() + + // Start controller + wg.Add(1) + go func() { + defer wg.Done() + stopReason := myController.Run(testContext) + require.ErrorIs(t, stopReason, context.Canceled) + }() + + require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced)) + + // Wait for all enqueued reconciliations + require.NoError(t, waitForReconcile(initialObject)) + + // Updated object + updatedObject := &unstructured.Unstructured{} + updatedObject.SetUnstructuredContent(map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "object1", + "resourceVersion": "2", + }, + "newKey": "a key", + }) + updatedObject.SetGroupVersionKind(fakeGVK) + require.NoError(t, tracker.Update(fakeGVR, updatedObject, "")) + + // Wait for all enqueued reconciliations + require.NoError(t, waitForReconcile(updatedObject)) + require.NoError(t, tracker.Delete(fakeGVR, updatedObject.GetNamespace(), updatedObject.GetName())) + require.NoError(t, waitForReconcile(nil)) + + testCancel() + wg.Wait() +} + +func TestShutdown(t *testing.T) { + testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer testCancel() + + _, myController, informer, _ := setupTest(testContext, nil) + + wg := sync.WaitGroup{} + + // Start informer + wg.Add(1) + go func() { + defer wg.Done() + informer.Run(testContext.Done()) + }() + + // Start controller + wg.Add(1) + go func() { + defer wg.Done() + stopReason := myController.Run(testContext) + require.ErrorIs(t, stopReason, context.Canceled) + }() + + // Wait for controller and informer to start up + require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced)) + + // Stop the controller and informer + testCancel() + + // Wait for controller and informer to stop + wg.Wait() + + // Ensure the event handler was cleaned up + require.Empty(t, informer.registrations) +} + +// Show an error is thrown informer isn't started when the controller runs +func TestInformerNeverStarts(t *testing.T) { + testContext, testCancel := context.WithTimeout(context.Background(), 400*time.Millisecond) + defer testCancel() + + _, myController, informer, _ := setupTest(testContext, nil) + + wg := sync.WaitGroup{} + + // Start controller + wg.Add(1) + go func() { + defer wg.Done() + stopReason := myController.Run(testContext) + require.ErrorIs(t, stopReason, context.DeadlineExceeded) + }() + + // Wait for deadline to pass without syncing the cache + require.False(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced)) + + // Wait for controller to stop (or context deadline will pass quickly) + wg.Wait() + + // Ensure there are no event handlers + require.Empty(t, informer.registrations) +} + +// Shows that if RV does not change, the reconciler does not get called +func TestIgnoredUpdate(t *testing.T) { + testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer testCancel() + + tracker, myController, informer, waitForReconcile := setupTest(testContext, nil) + + // Add object to informer + initialObject := &unstructured.Unstructured{} + initialObject.SetUnstructuredContent(map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "object1", + "resourceVersion": "1", + }, + }) + initialObject.SetGroupVersionKind(fakeGVK) + + require.NoError(t, tracker.Add(initialObject)) + + wg := sync.WaitGroup{} + + // Start informer + wg.Add(1) + go func() { + defer wg.Done() + informer.Run(testContext.Done()) + }() + + // Start controller + wg.Add(1) + go func() { + defer wg.Done() + stopReason := myController.Run(testContext) + require.ErrorIs(t, stopReason, context.Canceled) + }() + + require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced)) + + // Wait for all enqueued reconciliations + require.NoError(t, waitForReconcile(initialObject)) + + // Send update with the same object + require.NoError(t, tracker.Update(fakeGVR, initialObject, "")) + + // Don't wait for it to be reconciled + + testCancel() + wg.Wait() + + // Test infrastructure has logic to panic if there are any reconciled objects + // that weren't "expected" +} + +// Shows that an object which fails reconciliation will retry +func TestReconcileRetry(t *testing.T) { + testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer testCancel() + + calls := atomic.Uint64{} + success := atomic.Bool{} + tracker, myController, _, waitForReconcile := setupTest(testContext, func(s1, s2 string, o runtime.Object) error { + + if calls.Add(1) > 2 { + // Suddenly start liking the object + success.Store(true) + return nil + } + return errors.New("i dont like this object") + }) + + // Start informer + wg := sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + myController.Informer().Run(testContext.Done()) + }() + + // Start controller + wg.Add(1) + go func() { + defer wg.Done() + stopReason := myController.Run(testContext) + require.ErrorIs(t, stopReason, context.Canceled) + }() + + // Add object to informer + initialObject := &unstructured.Unstructured{} + initialObject.SetUnstructuredContent(map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "object1", + "resourceVersion": "1", + }, + }) + initialObject.SetGroupVersionKind(fakeGVK) + require.NoError(t, tracker.Add(initialObject)) + + require.NoError(t, waitForReconcile(initialObject), "initial reconcile") + require.NoError(t, waitForReconcile(initialObject), "previous reconcile failed, should retry quickly") + require.NoError(t, waitForReconcile(initialObject), "previous reconcile failed, should retry quickly") + // Will not try again since calls > 2 for last reconcile + require.True(t, success.Load(), "last call to reconcile should return success") + testCancel() + wg.Wait() +} + +func TestInformerList(t *testing.T) { + testContext, testCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer testCancel() + + tracker, myController, _, _ := setupTest(testContext, nil) + + wg := sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + myController.Informer().Run(testContext.Done()) + }() + + require.True(t, cache.WaitForCacheSync(testContext.Done(), myController.HasSynced)) + + object1 := &unstructured.Unstructured{} + object1.SetUnstructuredContent(map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "object1", + "resourceVersion": "object1", + }, + }) + object1.SetGroupVersionKind(fakeGVK) + + object1v2 := &unstructured.Unstructured{} + object1v2.SetUnstructuredContent(map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "object1", + "resourceVersion": "object1v2", + }, + }) + object1v2.SetGroupVersionKind(fakeGVK) + + object2 := &unstructured.Unstructured{} + object2.SetUnstructuredContent(map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "object2", + "resourceVersion": "object2", + }, + }) + object2.SetGroupVersionKind(fakeGVK) + + object3 := &unstructured.Unstructured{} + object3.SetUnstructuredContent(map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "object3", + "resourceVersion": "object3", + }, + }) + object3.SetGroupVersionKind(fakeGVK) + + namespacedObject1 := &unstructured.Unstructured{} + namespacedObject1.SetUnstructuredContent(map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "namespacedObject1", + "namespace": "test", + "resourceVersion": "namespacedObject1", + }, + }) + namespacedObject1.SetGroupVersionKind(fakeGVK) + + namespacedObject2 := &unstructured.Unstructured{} + namespacedObject2.SetUnstructuredContent(map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "namespacedObject2", + "namespace": "test", + "resourceVersion": "namespacedObject2", + }, + }) + namespacedObject2.SetGroupVersionKind(fakeGVK) + + require.NoError(t, tracker.Add(object1)) + require.NoError(t, tracker.Add(object2)) + + require.NoError(t, wait.PollWithContext(testContext, 100*time.Millisecond, 500*time.Millisecond, func(ctx context.Context) (done bool, err error) { + return myController.Informer().LastSyncResourceVersion() == object2.GetResourceVersion(), nil + })) + + values, err := myController.Informer().List(labels.Everything()) + require.NoError(t, err) + require.ElementsMatch(t, []*unstructured.Unstructured{object1, object2}, values) + + require.NoError(t, tracker.Update(fakeGVR, object1v2, object1v2.GetNamespace())) + require.NoError(t, tracker.Delete(fakeGVR, object2.GetNamespace(), object2.GetName())) + require.NoError(t, tracker.Add(object3)) + + require.NoError(t, wait.PollWithContext(testContext, 100*time.Millisecond, 500*time.Millisecond, func(ctx context.Context) (done bool, err error) { + return myController.Informer().LastSyncResourceVersion() == object3.GetResourceVersion(), nil + })) + + values, err = myController.Informer().List(labels.Everything()) + require.NoError(t, err) + require.ElementsMatch(t, []*unstructured.Unstructured{object1v2, object3}, values) + + require.NoError(t, tracker.Add(namespacedObject1)) + require.NoError(t, tracker.Add(namespacedObject2)) + + require.NoError(t, wait.PollWithContext(testContext, 100*time.Millisecond, 500*time.Millisecond, func(ctx context.Context) (done bool, err error) { + return myController.Informer().LastSyncResourceVersion() == namespacedObject2.GetResourceVersion(), nil + })) + values, err = myController.Informer().Namespaced(namespacedObject1.GetNamespace()).List(labels.Everything()) + require.NoError(t, err) + require.ElementsMatch(t, []*unstructured.Unstructured{namespacedObject1, namespacedObject2}, values) + + value, err := myController.Informer().Get(object3.GetName()) + require.NoError(t, err) + require.Equal(t, value, object3) + + value, err = myController.Informer().Namespaced(namespacedObject1.GetNamespace()).Get(namespacedObject1.GetName()) + require.NoError(t, err) + require.Equal(t, value, namespacedObject1) + + _, err = myController.Informer().Get("fakeobjectname") + require.True(t, k8serrors.IsNotFound(err)) + + _, err = myController.Informer().Namespaced("test").Get("fakeobjectname") + require.True(t, k8serrors.IsNotFound(err)) + + _, err = myController.Informer().Namespaced("fakenamespace").Get("fakeobjectname") + require.True(t, k8serrors.IsNotFound(err)) +}