Add field selector filtered informer e2e tests

This commit is contained in:
Joe Betz 2024-09-03 12:55:56 -04:00
parent 6ba426c0c0
commit e9744c1ea2

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"github.com/onsi/gomega"
"sync"
"time"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
@ -33,6 +34,8 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/utils/crd"
imageutils "k8s.io/kubernetes/test/utils/image"
@ -151,15 +154,44 @@ var _ = SIGDescribe("CustomResourceFieldSelectors [Privileged:ClusterAdmin]", fr
ginkgo.By("Watching with field selectors")
v2Client, gvr := customResourceClient(crd, "v2")
hostWatch, err := v2Client.Namespace(f.Namespace.Name).Watch(ctx, metav1.ListOptions{FieldSelector: "host=host1"})
v2Client, v2gvr := customResourceClient(crd, "v2")
v2hostWatch, err := v2Client.Namespace(f.Namespace.Name).Watch(ctx, metav1.ListOptions{FieldSelector: "host=host1"})
framework.ExpectNoError(err, "watching custom resources with field selector")
v2hostWatchAcc := watchAccumulator(v2hostWatch)
v2hostPortWatch, err := v2Client.Namespace(f.Namespace.Name).Watch(ctx, metav1.ListOptions{FieldSelector: "host=host1,port=80"})
framework.ExpectNoError(err, "watching custom resources with field selector")
framework.ExpectNoError(err, "watching custom resources with field selector")
v2hostPortWatchAcc := watchAccumulator(v2hostPortWatch)
v1Client, _ := customResourceClient(crd, "v1")
v1hostPortWatch, err := v1Client.Namespace(f.Namespace.Name).Watch(ctx, metav1.ListOptions{FieldSelector: "hostPort=host1:80"})
framework.ExpectNoError(err, "watching custom resources with field selector")
v1hostPortWatchAcc := watchAccumulator(v1hostPortWatch)
ginkgo.By("Registering informers with field selectors")
informerCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
hostInformer := dynamicinformer.NewFilteredDynamicSharedInformerFactory(f.DynamicClient, 0, f.Namespace.Name, func(opts *metav1.ListOptions) {
opts.FieldSelector = "host=host1"
})
hostInformer.Start(informerCtx.Done())
hostPortInformer := dynamicinformer.NewFilteredDynamicSharedInformerFactory(f.DynamicClient, 0, f.Namespace.Name, func(opts *metav1.ListOptions) {
opts.FieldSelector = "host=host1,port=80"
})
hostPortInformer.Start(informerCtx.Done())
v2HostInformer := hostInformer.ForResource(v2gvr).Informer()
go v2HostInformer.Run(informerCtx.Done())
v2HostPortInformer := hostPortInformer.ForResource(v2gvr).Informer()
go v2HostPortInformer.Run(informerCtx.Done())
v2HostInformerAcc := informerAccumulator(v2HostInformer)
v2HostPortInformerAcc := informerAccumulator(v2HostPortInformer)
framework.ExpectNoError(err, "adding event handler")
ginkgo.By("Creating custom resources")
toCreate := []map[string]any{
@ -182,7 +214,7 @@ var _ = SIGDescribe("CustomResourceFieldSelectors [Privileged:ClusterAdmin]", fr
crNames[i] = name
obj := map[string]interface{}{
"apiVersion": gvr.Group + "/" + gvr.Version,
"apiVersion": v2gvr.Group + "/" + v2gvr.Version,
"kind": crd.Spec.Names.Kind,
"metadata": map[string]interface{}{
"name": name,
@ -216,15 +248,23 @@ var _ = SIGDescribe("CustomResourceFieldSelectors [Privileged:ClusterAdmin]", fr
gomega.Expect(listResultToNames(list)).To(gomega.Equal(sets.New(crNames[1])))
ginkgo.By("Waiting for watch events to contain v2 custom resources for field selector host=host1")
gomega.Eventually(ctx, watchAccumulator(hostWatch)).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
gomega.Eventually(ctx, v2hostWatchAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
Should(gomega.Equal(addedEvents(sets.New(crNames[0], crNames[1]))))
ginkgo.By("Waiting for watch events to contain v2 custom resources for field selector host=host1,port=80")
gomega.Eventually(ctx, watchAccumulator(v2hostPortWatch)).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
gomega.Eventually(ctx, v2hostPortWatchAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
Should(gomega.Equal(addedEvents(sets.New(crNames[0]))))
ginkgo.By("Waiting for watch events to contain v1 custom resources for field selector hostPort=host1:80")
gomega.Eventually(ctx, watchAccumulator(v1hostPortWatch)).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
gomega.Eventually(ctx, v1hostPortWatchAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
Should(gomega.Equal(addedEvents(sets.New(crNames[0]))))
ginkgo.By("Waiting for informer events to contain v2 custom resources for field selector host=host1")
gomega.Eventually(ctx, v2HostInformerAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
Should(gomega.Equal(addedEvents(sets.New(crNames[0], crNames[1]))))
ginkgo.By("Waiting for informer events to contain v2 custom resources for field selector host=host1,port=80")
gomega.Eventually(ctx, v2HostPortInformerAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
Should(gomega.Equal(addedEvents(sets.New(crNames[0]))))
ginkgo.By("Deleting one custom resources to ensure that deletions are observed")
@ -250,18 +290,25 @@ var _ = SIGDescribe("CustomResourceFieldSelectors [Privileged:ClusterAdmin]", fr
gomega.Expect(listResultToNames(list)).To(gomega.Equal(sets.New[string]()))
ginkgo.By("Waiting for v2 watch events after updates and deletes for field selector host=host1")
gomega.Eventually(ctx, watchAccumulator(hostWatch)).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
Should(gomega.Equal(deletedEvents(sets.New(crNames[0], crNames[1]))))
gomega.Eventually(ctx, v2hostWatchAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
Should(gomega.Equal(addedAndDeletedEvents(sets.New(crNames[0], crNames[1]), sets.New(crNames[0], crNames[1]))))
ginkgo.By("Waiting for v2 watch events after updates and deletes for field selector host=host1,port=80")
gomega.Eventually(ctx, watchAccumulator(v2hostPortWatch)).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
Should(gomega.Equal(deletedEvents(sets.New(crNames[0]))))
gomega.Eventually(ctx, v2hostPortWatchAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
Should(gomega.Equal(addedAndDeletedEvents(sets.New(crNames[0]), sets.New(crNames[0]))))
ginkgo.By("Waiting for v1 watch events after updates and deletes for field selector hostPort=host1:80")
gomega.Eventually(ctx, watchAccumulator(v1hostPortWatch)).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
Should(gomega.Equal(deletedEvents(sets.New(crNames[0]))))
})
gomega.Eventually(ctx, v1hostPortWatchAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
Should(gomega.Equal(addedAndDeletedEvents(sets.New(crNames[0]), sets.New(crNames[0]))))
ginkgo.By("Waiting for v2 informer events after updates and deletes for field selector host=host1")
gomega.Eventually(ctx, v2HostInformerAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
Should(gomega.Equal(addedAndDeletedEvents(sets.New(crNames[0], crNames[1]), sets.New(crNames[0], crNames[1]))))
ginkgo.By("Waiting for v2 informer events after updates and deletes for field selector host=host1,port=80")
gomega.Eventually(ctx, v2HostPortInformerAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
Should(gomega.Equal(addedAndDeletedEvents(sets.New(crNames[0]), sets.New(crNames[0]))))
})
})
})
@ -277,8 +324,8 @@ func addedEvents(added sets.Set[string]) *accumulatedEvents {
return &accumulatedEvents{added: added, deleted: sets.New[string]()}
}
func deletedEvents(deleted sets.Set[string]) *accumulatedEvents {
return &accumulatedEvents{added: sets.New[string](), deleted: deleted}
func addedAndDeletedEvents(added, deleted sets.Set[string]) *accumulatedEvents {
return &accumulatedEvents{added: added, deleted: deleted}
}
func watchAccumulator(w watch.Interface) func(ctx context.Context) (*accumulatedEvents, error) {
@ -302,6 +349,39 @@ func watchAccumulator(w watch.Interface) func(ctx context.Context) (*accumulated
}
}
func informerAccumulator(informer cache.SharedIndexInformer) func(ctx context.Context) (*accumulatedEvents, error) {
var lock sync.Mutex
result := emptyEvents()
_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
defer ginkgo.GinkgoRecover()
lock.Lock()
defer lock.Unlock()
result.added.Insert(obj.(*unstructured.Unstructured).GetName())
},
UpdateFunc: func(oldObj, newObj any) {
defer ginkgo.GinkgoRecover()
// ignoring
},
DeleteFunc: func(obj any) {
defer ginkgo.GinkgoRecover()
lock.Lock()
defer lock.Unlock()
result.deleted.Insert(obj.(*unstructured.Unstructured).GetName())
},
})
return func(ctx context.Context) (*accumulatedEvents, error) {
if err != nil {
return nil, err
}
lock.Lock()
defer lock.Unlock()
return result, nil
}
}
func listResultToNames(list *unstructured.UnstructuredList) sets.Set[string] {
found := sets.New[string]()
for _, i := range list.Items {