diff --git a/staging/src/k8s.io/client-go/dynamic/dynamicinformer/informer_test.go b/staging/src/k8s.io/client-go/dynamic/dynamicinformer/informer_test.go index dac032ed699..3ea31b6249b 100644 --- a/staging/src/k8s.io/client-go/dynamic/dynamicinformer/informer_test.go +++ b/staging/src/k8s.io/client-go/dynamic/dynamicinformer/informer_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -37,6 +38,7 @@ type triggerFunc func(gvr schema.GroupVersionResource, ns string, fakeClient *fa func triggerFactory(t *testing.T) triggerFunc { return func(gvr schema.GroupVersionResource, ns string, fakeClient *fake.FakeDynamicClient, _ *unstructured.Unstructured) *unstructured.Unstructured { testObject := newUnstructured("apps/v1", "Deployment", "ns-foo", "name-foo") + testObject.SetLabels(map[string]string{"environment": "test"}) createdObj, err := fakeClient.Resource(gvr).Namespace(ns).Create(context.TODO(), testObject, metav1.CreateOptions{}) if err != nil { t.Error(err) @@ -55,13 +57,14 @@ func handler(rcvCh chan<- *unstructured.Unstructured) *cache.ResourceEventHandle func TestFilteredDynamicSharedInformerFactory(t *testing.T) { scenarios := []struct { - name string - existingObj *unstructured.Unstructured - gvr schema.GroupVersionResource - informNS string - ns string - trigger func(gvr schema.GroupVersionResource, ns string, fakeClient *fake.FakeDynamicClient, testObject *unstructured.Unstructured) *unstructured.Unstructured - handler func(rcvCh chan<- *unstructured.Unstructured) *cache.ResourceEventHandlerFuncs + name string + existingObj *unstructured.Unstructured + gvr schema.GroupVersionResource + informNS string + ns string + trigger func(gvr schema.GroupVersionResource, ns string, fakeClient *fake.FakeDynamicClient, testObject *unstructured.Unstructured) *unstructured.Unstructured + handler func(rcvCh chan<- *unstructured.Unstructured) *cache.ResourceEventHandlerFuncs + tweakListOptions dynamicinformer.TweakListOptionsFunc }{ // scenario 1 { @@ -81,6 +84,50 @@ func TestFilteredDynamicSharedInformerFactory(t *testing.T) { trigger: triggerFactory(t), handler: handler, }, + { + name: "tweak options: test adding an object not matching field selector should not trigger AddFunc", + informNS: "ns-foo", + ns: "ns-foo", + gvr: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}, + trigger: triggerFactory(t), + handler: handler, + tweakListOptions: func(opts *metav1.ListOptions) { + opts.FieldSelector = "metadata.name=name-bar" + }, + }, + { + name: "tweak options: test adding an object matching field selector should trigger AddFunc", + informNS: "ns-foo", + ns: "ns-foo", + gvr: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}, + trigger: triggerFactory(t), + handler: handler, + tweakListOptions: func(opts *metav1.ListOptions) { + opts.FieldSelector = "metadata.name=name-foo" + }, + }, + { + name: "tweak options: test adding an object not matching label selector should not trigger AddFunc", + informNS: "ns-foo", + ns: "ns-foo", + gvr: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}, + trigger: triggerFactory(t), + handler: handler, + tweakListOptions: func(opts *metav1.ListOptions) { + opts.LabelSelector = "environment=production" + }, + }, + { + name: "tweak options: test adding an object matching label selector should trigger AddFunc", + informNS: "ns-foo", + ns: "ns-foo", + gvr: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}, + trigger: triggerFactory(t), + handler: handler, + tweakListOptions: func(opts *metav1.ListOptions) { + opts.LabelSelector = "environment=test" + }, + }, } for _, ts := range scenarios { diff --git a/test/e2e/apimachinery/crd_selectable_fields.go b/test/e2e/apimachinery/crd_selectable_fields.go index 346ff9dc807..da02331be32 100644 --- a/test/e2e/apimachinery/crd_selectable_fields.go +++ b/test/e2e/apimachinery/crd_selectable_fields.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "github.com/onsi/gomega" + "sync" "time" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" @@ -32,6 +33,8 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/storage/names" "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/utils/crd" imageutils "k8s.io/kubernetes/test/utils/image" @@ -150,15 +153,44 @@ var _ = SIGDescribe("CustomResourceFieldSelectors [Privileged:ClusterAdmin]", fu ginkgo.By("Watching with field selectors") - v2Client, gvr := customResourceClient(crd, "v2") - hostWatch, err := v2Client.Namespace(f.Namespace.Name).Watch(ctx, metav1.ListOptions{FieldSelector: "host=host1"}) + v2Client, v2gvr := customResourceClient(crd, "v2") + v2hostWatch, err := v2Client.Namespace(f.Namespace.Name).Watch(ctx, metav1.ListOptions{FieldSelector: "host=host1"}) framework.ExpectNoError(err, "watching custom resources with field selector") + v2hostWatchAcc := watchAccumulator(v2hostWatch) v2hostPortWatch, err := v2Client.Namespace(f.Namespace.Name).Watch(ctx, metav1.ListOptions{FieldSelector: "host=host1,port=80"}) framework.ExpectNoError(err, "watching custom resources with field selector") + framework.ExpectNoError(err, "watching custom resources with field selector") + v2hostPortWatchAcc := watchAccumulator(v2hostPortWatch) v1Client, _ := customResourceClient(crd, "v1") v1hostPortWatch, err := v1Client.Namespace(f.Namespace.Name).Watch(ctx, metav1.ListOptions{FieldSelector: "hostPort=host1:80"}) framework.ExpectNoError(err, "watching custom resources with field selector") + v1hostPortWatchAcc := watchAccumulator(v1hostPortWatch) + + ginkgo.By("Registering informers with field selectors") + + informerCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + hostInformer := dynamicinformer.NewFilteredDynamicSharedInformerFactory(f.DynamicClient, 0, f.Namespace.Name, func(opts *metav1.ListOptions) { + opts.FieldSelector = "host=host1" + }) + hostInformer.Start(informerCtx.Done()) + + hostPortInformer := dynamicinformer.NewFilteredDynamicSharedInformerFactory(f.DynamicClient, 0, f.Namespace.Name, func(opts *metav1.ListOptions) { + opts.FieldSelector = "host=host1,port=80" + }) + hostPortInformer.Start(informerCtx.Done()) + + v2HostInformer := hostInformer.ForResource(v2gvr).Informer() + go v2HostInformer.Run(informerCtx.Done()) + v2HostPortInformer := hostPortInformer.ForResource(v2gvr).Informer() + go v2HostPortInformer.Run(informerCtx.Done()) + + v2HostInformerAcc := informerAccumulator(v2HostInformer) + v2HostPortInformerAcc := informerAccumulator(v2HostPortInformer) + + framework.ExpectNoError(err, "adding event handler") ginkgo.By("Creating custom resources") toCreate := []map[string]any{ @@ -181,7 +213,7 @@ var _ = SIGDescribe("CustomResourceFieldSelectors [Privileged:ClusterAdmin]", fu crNames[i] = name obj := map[string]interface{}{ - "apiVersion": gvr.Group + "/" + gvr.Version, + "apiVersion": v2gvr.Group + "/" + v2gvr.Version, "kind": crd.Spec.Names.Kind, "metadata": map[string]interface{}{ "name": name, @@ -215,15 +247,23 @@ var _ = SIGDescribe("CustomResourceFieldSelectors [Privileged:ClusterAdmin]", fu gomega.Expect(listResultToNames(list)).To(gomega.Equal(sets.New(crNames[1]))) ginkgo.By("Waiting for watch events to contain v2 custom resources for field selector host=host1") - gomega.Eventually(ctx, watchAccumulator(hostWatch)).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). + gomega.Eventually(ctx, v2hostWatchAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). Should(gomega.Equal(addedEvents(sets.New(crNames[0], crNames[1])))) ginkgo.By("Waiting for watch events to contain v2 custom resources for field selector host=host1,port=80") - gomega.Eventually(ctx, watchAccumulator(v2hostPortWatch)).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). + gomega.Eventually(ctx, v2hostPortWatchAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). Should(gomega.Equal(addedEvents(sets.New(crNames[0])))) ginkgo.By("Waiting for watch events to contain v1 custom resources for field selector hostPort=host1:80") - gomega.Eventually(ctx, watchAccumulator(v1hostPortWatch)).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). + gomega.Eventually(ctx, v1hostPortWatchAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). + Should(gomega.Equal(addedEvents(sets.New(crNames[0])))) + + ginkgo.By("Waiting for informer events to contain v2 custom resources for field selector host=host1") + gomega.Eventually(ctx, v2HostInformerAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). + Should(gomega.Equal(addedEvents(sets.New(crNames[0], crNames[1])))) + + ginkgo.By("Waiting for informer events to contain v2 custom resources for field selector host=host1,port=80") + gomega.Eventually(ctx, v2HostPortInformerAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). Should(gomega.Equal(addedEvents(sets.New(crNames[0])))) ginkgo.By("Deleting one custom resources to ensure that deletions are observed") @@ -249,18 +289,25 @@ var _ = SIGDescribe("CustomResourceFieldSelectors [Privileged:ClusterAdmin]", fu gomega.Expect(listResultToNames(list)).To(gomega.Equal(sets.New[string]())) ginkgo.By("Waiting for v2 watch events after updates and deletes for field selector host=host1") - gomega.Eventually(ctx, watchAccumulator(hostWatch)).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). - Should(gomega.Equal(deletedEvents(sets.New(crNames[0], crNames[1])))) + gomega.Eventually(ctx, v2hostWatchAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). + Should(gomega.Equal(addedAndDeletedEvents(sets.New(crNames[0], crNames[1]), sets.New(crNames[0], crNames[1])))) ginkgo.By("Waiting for v2 watch events after updates and deletes for field selector host=host1,port=80") - gomega.Eventually(ctx, watchAccumulator(v2hostPortWatch)).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). - Should(gomega.Equal(deletedEvents(sets.New(crNames[0])))) + gomega.Eventually(ctx, v2hostPortWatchAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). + Should(gomega.Equal(addedAndDeletedEvents(sets.New(crNames[0]), sets.New(crNames[0])))) ginkgo.By("Waiting for v1 watch events after updates and deletes for field selector hostPort=host1:80") - gomega.Eventually(ctx, watchAccumulator(v1hostPortWatch)).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). - Should(gomega.Equal(deletedEvents(sets.New(crNames[0])))) - }) + gomega.Eventually(ctx, v1hostPortWatchAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). + Should(gomega.Equal(addedAndDeletedEvents(sets.New(crNames[0]), sets.New(crNames[0])))) + ginkgo.By("Waiting for v2 informer events after updates and deletes for field selector host=host1") + gomega.Eventually(ctx, v2HostInformerAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). + Should(gomega.Equal(addedAndDeletedEvents(sets.New(crNames[0], crNames[1]), sets.New(crNames[0], crNames[1])))) + + ginkgo.By("Waiting for v2 informer events after updates and deletes for field selector host=host1,port=80") + gomega.Eventually(ctx, v2HostPortInformerAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second). + Should(gomega.Equal(addedAndDeletedEvents(sets.New(crNames[0]), sets.New(crNames[0])))) + }) }) }) @@ -276,8 +323,8 @@ func addedEvents(added sets.Set[string]) *accumulatedEvents { return &accumulatedEvents{added: added, deleted: sets.New[string]()} } -func deletedEvents(deleted sets.Set[string]) *accumulatedEvents { - return &accumulatedEvents{added: sets.New[string](), deleted: deleted} +func addedAndDeletedEvents(added, deleted sets.Set[string]) *accumulatedEvents { + return &accumulatedEvents{added: added, deleted: deleted} } func watchAccumulator(w watch.Interface) func(ctx context.Context) (*accumulatedEvents, error) { @@ -301,6 +348,39 @@ func watchAccumulator(w watch.Interface) func(ctx context.Context) (*accumulated } } +func informerAccumulator(informer cache.SharedIndexInformer) func(ctx context.Context) (*accumulatedEvents, error) { + var lock sync.Mutex + result := emptyEvents() + + _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + defer ginkgo.GinkgoRecover() + lock.Lock() + defer lock.Unlock() + result.added.Insert(obj.(*unstructured.Unstructured).GetName()) + }, + UpdateFunc: func(oldObj, newObj any) { + defer ginkgo.GinkgoRecover() + // ignoring + }, + DeleteFunc: func(obj any) { + defer ginkgo.GinkgoRecover() + lock.Lock() + defer lock.Unlock() + result.deleted.Insert(obj.(*unstructured.Unstructured).GetName()) + }, + }) + + return func(ctx context.Context) (*accumulatedEvents, error) { + if err != nil { + return nil, err + } + lock.Lock() + defer lock.Unlock() + return result, nil + } +} + func listResultToNames(list *unstructured.UnstructuredList) sets.Set[string] { found := sets.New[string]() for _, i := range list.Items {