diff --git a/test/e2e/apimachinery/crd_selectable_fields.go b/test/e2e/apimachinery/crd_selectable_fields.go index 7330f2dbb73..771dad81cde 100644 --- a/test/e2e/apimachinery/crd_selectable_fields.go +++ b/test/e2e/apimachinery/crd_selectable_fields.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "github.com/onsi/gomega" + "sync" "time" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" @@ -33,6 +34,8 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/storage/names" "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/utils/crd" imageutils "k8s.io/kubernetes/test/utils/image" @@ -151,15 +154,44 @@ var _ = SIGDescribe("CustomResourceFieldSelectors [Privileged:ClusterAdmin]", fr ginkgo.By("Watching with field selectors") - v2Client, gvr := customResourceClient(crd, "v2") - hostWatch, err := v2Client.Namespace(f.Namespace.Name).Watch(ctx, metav1.ListOptions{FieldSelector: "host=host1"}) + v2Client, v2gvr := customResourceClient(crd, "v2") + v2hostWatch, err := v2Client.Namespace(f.Namespace.Name).Watch(ctx, metav1.ListOptions{FieldSelector: "host=host1"}) framework.ExpectNoError(err, "watching custom resources with field selector") + v2hostWatchAcc := watchAccumulator(v2hostWatch) v2hostPortWatch, err := v2Client.Namespace(f.Namespace.Name).Watch(ctx, metav1.ListOptions{FieldSelector: "host=host1,port=80"}) framework.ExpectNoError(err, "watching custom resources with field selector") + framework.ExpectNoError(err, "watching custom resources with field selector") + v2hostPortWatchAcc := watchAccumulator(v2hostPortWatch) v1Client, _ := customResourceClient(crd, "v1") v1hostPortWatch, err := v1Client.Namespace(f.Namespace.Name).Watch(ctx, metav1.ListOptions{FieldSelector: "hostPort=host1:80"}) framework.ExpectNoError(err, "watching custom resources with field selector") + v1hostPortWatchAcc := watchAccumulator(v1hostPortWatch) + + ginkgo.By("Registering informers with field selectors") + + informerCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + hostInformer := dynamicinformer.NewFilteredDynamicSharedInformerFactory(f.DynamicClient, 0, f.Namespace.Name, func(opts *metav1.ListOptions) { + opts.FieldSelector = "host=host1" + }) + hostInformer.Start(informerCtx.Done()) + + hostPortInformer := dynamicinformer.NewFilteredDynamicSharedInformerFactory(f.DynamicClient, 0, f.Namespace.Name, func(opts *metav1.ListOptions) { + opts.FieldSelector = "host=host1,port=80" + }) + hostPortInformer.Start(informerCtx.Done()) + + v2HostInformer := hostInformer.ForResource(v2gvr).Informer() + go v2HostInformer.Run(informerCtx.Done()) + v2HostPortInformer := hostPortInformer.ForResource(v2gvr).Informer() + go v2HostPortInformer.Run(informerCtx.Done()) + + v2HostInformerAcc := informerAccumulator(v2HostInformer) + v2HostPortInformerAcc := informerAccumulator(v2HostPortInformer) + + framework.ExpectNoError(err, "adding event handler") ginkgo.By("Creating custom resources") toCreate := []map[string]any{ @@ -182,7 +214,7 @@ var _ = SIGDescribe("CustomResourceFieldSelectors [Privileged:ClusterAdmin]", fr crNames[i] = name obj := map[string]interface{}{ - "apiVersion": gvr.Group + "/" + gvr.Version, + "apiVersion": v2gvr.Group + "/" + v2gvr.Version, "kind": crd.Spec.Names.Kind, "metadata": map[string]interface{}{ "name": name, @@ -216,15 +248,23 @@ var _ = SIGDescribe("CustomResourceFieldSelectors [Privileged:ClusterAdmin]", fr gomega.Expect(listResultToNames(list)).To(gomega.Equal(sets.New(crNames[1]))) ginkgo.By("Waiting for watch events to contain v2 custom resources for field selector host=host1") - gomega.Eventually(ctx, watchAccumulator(hostWatch)).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). + gomega.Eventually(ctx, v2hostWatchAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). Should(gomega.Equal(addedEvents(sets.New(crNames[0], crNames[1])))) ginkgo.By("Waiting for watch events to contain v2 custom resources for field selector host=host1,port=80") - gomega.Eventually(ctx, watchAccumulator(v2hostPortWatch)).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). + gomega.Eventually(ctx, v2hostPortWatchAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). Should(gomega.Equal(addedEvents(sets.New(crNames[0])))) ginkgo.By("Waiting for watch events to contain v1 custom resources for field selector hostPort=host1:80") - gomega.Eventually(ctx, watchAccumulator(v1hostPortWatch)).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). + gomega.Eventually(ctx, v1hostPortWatchAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). + Should(gomega.Equal(addedEvents(sets.New(crNames[0])))) + + ginkgo.By("Waiting for informer events to contain v2 custom resources for field selector host=host1") + gomega.Eventually(ctx, v2HostInformerAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). + Should(gomega.Equal(addedEvents(sets.New(crNames[0], crNames[1])))) + + ginkgo.By("Waiting for informer events to contain v2 custom resources for field selector host=host1,port=80") + gomega.Eventually(ctx, v2HostPortInformerAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). Should(gomega.Equal(addedEvents(sets.New(crNames[0])))) ginkgo.By("Deleting one custom resources to ensure that deletions are observed") @@ -250,18 +290,25 @@ var _ = SIGDescribe("CustomResourceFieldSelectors [Privileged:ClusterAdmin]", fr gomega.Expect(listResultToNames(list)).To(gomega.Equal(sets.New[string]())) ginkgo.By("Waiting for v2 watch events after updates and deletes for field selector host=host1") - gomega.Eventually(ctx, watchAccumulator(hostWatch)).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). - Should(gomega.Equal(deletedEvents(sets.New(crNames[0], crNames[1])))) + gomega.Eventually(ctx, v2hostWatchAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). + Should(gomega.Equal(addedAndDeletedEvents(sets.New(crNames[0], crNames[1]), sets.New(crNames[0], crNames[1])))) ginkgo.By("Waiting for v2 watch events after updates and deletes for field selector host=host1,port=80") - gomega.Eventually(ctx, watchAccumulator(v2hostPortWatch)).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). - Should(gomega.Equal(deletedEvents(sets.New(crNames[0])))) + gomega.Eventually(ctx, v2hostPortWatchAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). + Should(gomega.Equal(addedAndDeletedEvents(sets.New(crNames[0]), sets.New(crNames[0])))) ginkgo.By("Waiting for v1 watch events after updates and deletes for field selector hostPort=host1:80") - gomega.Eventually(ctx, watchAccumulator(v1hostPortWatch)).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). - Should(gomega.Equal(deletedEvents(sets.New(crNames[0])))) - }) + gomega.Eventually(ctx, v1hostPortWatchAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). + Should(gomega.Equal(addedAndDeletedEvents(sets.New(crNames[0]), sets.New(crNames[0])))) + ginkgo.By("Waiting for v2 informer events after updates and deletes for field selector host=host1") + gomega.Eventually(ctx, v2HostInformerAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). + Should(gomega.Equal(addedAndDeletedEvents(sets.New(crNames[0], crNames[1]), sets.New(crNames[0], crNames[1])))) + + ginkgo.By("Waiting for v2 informer events after updates and deletes for field selector host=host1,port=80") + gomega.Eventually(ctx, v2HostPortInformerAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). + Should(gomega.Equal(addedAndDeletedEvents(sets.New(crNames[0]), sets.New(crNames[0])))) + }) }) }) @@ -277,8 +324,8 @@ func addedEvents(added sets.Set[string]) *accumulatedEvents { return &accumulatedEvents{added: added, deleted: sets.New[string]()} } -func deletedEvents(deleted sets.Set[string]) *accumulatedEvents { - return &accumulatedEvents{added: sets.New[string](), deleted: deleted} +func addedAndDeletedEvents(added, deleted sets.Set[string]) *accumulatedEvents { + return &accumulatedEvents{added: added, deleted: deleted} } func watchAccumulator(w watch.Interface) func(ctx context.Context) (*accumulatedEvents, error) { @@ -302,6 +349,39 @@ func watchAccumulator(w watch.Interface) func(ctx context.Context) (*accumulated } } +func informerAccumulator(informer cache.SharedIndexInformer) func(ctx context.Context) (*accumulatedEvents, error) { + var lock sync.Mutex + result := emptyEvents() + + _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + defer ginkgo.GinkgoRecover() + lock.Lock() + defer lock.Unlock() + result.added.Insert(obj.(*unstructured.Unstructured).GetName()) + }, + UpdateFunc: func(oldObj, newObj any) { + defer ginkgo.GinkgoRecover() + // ignoring + }, + DeleteFunc: func(obj any) { + defer ginkgo.GinkgoRecover() + lock.Lock() + defer lock.Unlock() + result.deleted.Insert(obj.(*unstructured.Unstructured).GetName()) + }, + }) + + return func(ctx context.Context) (*accumulatedEvents, error) { + if err != nil { + return nil, err + } + lock.Lock() + defer lock.Unlock() + return result, nil + } +} + func listResultToNames(list *unstructured.UnstructuredList) sets.Set[string] { found := sets.New[string]() for _, i := range list.Items {