From 4e217159cfc1441f3c3234059fc6fca0eb13a66d Mon Sep 17 00:00:00 2001 From: Alexander Zielenski Date: Mon, 7 Nov 2022 12:01:44 -0800 Subject: [PATCH] fix possible race in admission test of listwatch --- .../admission/plugin/cel/admission_test.go | 86 +++++++++++++++++-- 1 file changed, 79 insertions(+), 7 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/admission_test.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/admission_test.go index c20ff00e192..75c79cc97d4 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/admission_test.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/cel/admission_test.go @@ -40,7 +40,9 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" clienttesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" "k8s.io/component-base/featuregate" + "k8s.io/klog/v2" ) var ( @@ -267,7 +269,8 @@ func setupTestCommon(t *testing.T, compiler ValidatorCompiler) (plugin admission require.True(t, handler.enabled) // Override compiler used by controller for tests - handler.evaluator.(*celAdmissionController).validatorCompiler = compiler + controller = handler.evaluator.(*celAdmissionController) + controller.validatorCompiler = compiler // Make sure to start the fake informers fakeInformerFactory.Start(testContext.Done()) @@ -276,7 +279,73 @@ func setupTestCommon(t *testing.T, compiler ValidatorCompiler) (plugin admission // wait for informer factory to shutdown fakeInformerFactory.Shutdown() }) - return handler, dynamicClient.Tracker(), fakeClient.Tracker(), handler.evaluator.(*celAdmissionController) + + // Wait for admission controller to begin its object watches + // This is because there is a very rare (0.05% on my machine) race doing the + // initial List+Watch if an object is added after the list, but before the + // watch it could be missed. + // + // This is only due to the fact that NewSimpleClientset above ignores + // LastSyncResourceVersion on watch calls, so do it does not provide "catch up" + // which may have been added since the call to list. + if !cache.WaitForNamedCacheSync("initial sync", testContext.Done(), handler.evaluator.HasSynced) { + t.Fatal("failed to do perform initial cache sync") + } + + // WaitForCacheSync only tells us the list was performed. + // Keep changing an object until it is observable, then remove it + + i := 0 + + dummyPolicy := &v1alpha1.ValidatingAdmissionPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "dummypolicy.example.com", + Annotations: map[string]string{ + "myValue": fmt.Sprint(i), + }, + }, + } + + dummyBinding := &v1alpha1.ValidatingAdmissionPolicyBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "dummybinding.example.com", + Annotations: map[string]string{ + "myValue": fmt.Sprint(i), + }, + }, + } + + require.NoError(t, fakeClient.Tracker().Create(definitionsGVR, dummyPolicy, dummyPolicy.Namespace)) + require.NoError(t, fakeClient.Tracker().Create(bindingsGVR, dummyBinding, dummyBinding.Namespace)) + + wait.PollWithContext(testContext, 100*time.Millisecond, 300*time.Millisecond, func(ctx context.Context) (done bool, err error) { + defer func() { + i += 1 + }() + + dummyPolicy.Annotations = map[string]string{ + "myValue": fmt.Sprint(i), + } + dummyBinding.Annotations = dummyPolicy.Annotations + + require.NoError(t, fakeClient.Tracker().Update(definitionsGVR, dummyPolicy, dummyPolicy.Namespace)) + require.NoError(t, fakeClient.Tracker().Update(bindingsGVR, dummyBinding, dummyBinding.Namespace)) + + if obj, err := controller.getCurrentObject(dummyPolicy); obj == nil || err != nil { + return false, nil + } + + if obj, err := controller.getCurrentObject(dummyBinding); obj == nil || err != nil { + return false, nil + } + + return true, nil + }) + + require.NoError(t, fakeClient.Tracker().Delete(definitionsGVR, dummyPolicy.Namespace, dummyPolicy.Name)) + require.NoError(t, fakeClient.Tracker().Delete(bindingsGVR, dummyBinding.Namespace, dummyBinding.Name)) + + return handler, dynamicClient.Tracker(), fakeClient.Tracker(), controller } // Gets the last reconciled value in the controller of an object with the same @@ -341,20 +410,22 @@ func (c *celAdmissionController) getCurrentObject(obj runtime.Object) (runtime.O // Waits for the given objects to have been the latest reconciled values of // their gvk/name in the controller func waitForReconcile(ctx context.Context, controller *celAdmissionController, objects ...runtime.Object) error { - return wait.PollWithContext(ctx, 200*time.Millisecond, 5*time.Second, func(ctx context.Context) (done bool, err error) { + return wait.PollWithContext(ctx, 100*time.Millisecond, 1*time.Second, func(ctx context.Context) (done bool, err error) { for _, obj := range objects { + objMeta, err := meta.Accessor(obj) + if err != nil { + return false, fmt.Errorf("error getting meta accessor for original %T object (%v): %w", obj, obj, err) + } + currentValue, err := controller.getCurrentObject(obj) if err != nil { return false, fmt.Errorf("error getting current object: %w", err) } else if currentValue == nil { // Object not found, but not an error. Keep waiting. + klog.Infof("%v not found. keep waiting", objMeta.GetName()) return false, nil } - objMeta, err := meta.Accessor(obj) - if err != nil { - return false, fmt.Errorf("error getting meta accessor for original %T object (%v): %w", obj, obj, err) - } valueMeta, err := meta.Accessor(currentValue) if err != nil { return false, fmt.Errorf("error getting meta accessor for current %T object (%v): %w", currentValue, currentValue, err) @@ -367,6 +438,7 @@ func waitForReconcile(ctx context.Context, controller *celAdmissionController, o return false, fmt.Errorf("%s named %s has no resource version. please ensure your test objects have an RV", currentValue.GetObjectKind().GroupVersionKind().String(), valueMeta.GetName()) } else if objMeta.GetResourceVersion() != valueMeta.GetResourceVersion() { + klog.Infof("%v has RV %v. want RV %v", objMeta.GetName(), objMeta.GetResourceVersion(), objMeta.GetResourceVersion()) return false, nil } }