mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-06 02:34:03 +00:00
Merge pull request #127099 from jpbetz/object-selectors-ga
Add filtered informers tests for CRD field selectors
This commit is contained in:
commit
c5f2fc05ad
@ -22,6 +22,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/api/equality"
|
"k8s.io/apimachinery/pkg/api/equality"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
@ -37,6 +38,7 @@ type triggerFunc func(gvr schema.GroupVersionResource, ns string, fakeClient *fa
|
|||||||
func triggerFactory(t *testing.T) triggerFunc {
|
func triggerFactory(t *testing.T) triggerFunc {
|
||||||
return func(gvr schema.GroupVersionResource, ns string, fakeClient *fake.FakeDynamicClient, _ *unstructured.Unstructured) *unstructured.Unstructured {
|
return func(gvr schema.GroupVersionResource, ns string, fakeClient *fake.FakeDynamicClient, _ *unstructured.Unstructured) *unstructured.Unstructured {
|
||||||
testObject := newUnstructured("apps/v1", "Deployment", "ns-foo", "name-foo")
|
testObject := newUnstructured("apps/v1", "Deployment", "ns-foo", "name-foo")
|
||||||
|
testObject.SetLabels(map[string]string{"environment": "test"})
|
||||||
createdObj, err := fakeClient.Resource(gvr).Namespace(ns).Create(context.TODO(), testObject, metav1.CreateOptions{})
|
createdObj, err := fakeClient.Resource(gvr).Namespace(ns).Create(context.TODO(), testObject, metav1.CreateOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
@ -55,13 +57,14 @@ func handler(rcvCh chan<- *unstructured.Unstructured) *cache.ResourceEventHandle
|
|||||||
|
|
||||||
func TestFilteredDynamicSharedInformerFactory(t *testing.T) {
|
func TestFilteredDynamicSharedInformerFactory(t *testing.T) {
|
||||||
scenarios := []struct {
|
scenarios := []struct {
|
||||||
name string
|
name string
|
||||||
existingObj *unstructured.Unstructured
|
existingObj *unstructured.Unstructured
|
||||||
gvr schema.GroupVersionResource
|
gvr schema.GroupVersionResource
|
||||||
informNS string
|
informNS string
|
||||||
ns string
|
ns string
|
||||||
trigger func(gvr schema.GroupVersionResource, ns string, fakeClient *fake.FakeDynamicClient, testObject *unstructured.Unstructured) *unstructured.Unstructured
|
trigger func(gvr schema.GroupVersionResource, ns string, fakeClient *fake.FakeDynamicClient, testObject *unstructured.Unstructured) *unstructured.Unstructured
|
||||||
handler func(rcvCh chan<- *unstructured.Unstructured) *cache.ResourceEventHandlerFuncs
|
handler func(rcvCh chan<- *unstructured.Unstructured) *cache.ResourceEventHandlerFuncs
|
||||||
|
tweakListOptions dynamicinformer.TweakListOptionsFunc
|
||||||
}{
|
}{
|
||||||
// scenario 1
|
// scenario 1
|
||||||
{
|
{
|
||||||
@ -81,6 +84,50 @@ func TestFilteredDynamicSharedInformerFactory(t *testing.T) {
|
|||||||
trigger: triggerFactory(t),
|
trigger: triggerFactory(t),
|
||||||
handler: handler,
|
handler: handler,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "tweak options: test adding an object not matching field selector should not trigger AddFunc",
|
||||||
|
informNS: "ns-foo",
|
||||||
|
ns: "ns-foo",
|
||||||
|
gvr: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"},
|
||||||
|
trigger: triggerFactory(t),
|
||||||
|
handler: handler,
|
||||||
|
tweakListOptions: func(opts *metav1.ListOptions) {
|
||||||
|
opts.FieldSelector = "metadata.name=name-bar"
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "tweak options: test adding an object matching field selector should trigger AddFunc",
|
||||||
|
informNS: "ns-foo",
|
||||||
|
ns: "ns-foo",
|
||||||
|
gvr: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"},
|
||||||
|
trigger: triggerFactory(t),
|
||||||
|
handler: handler,
|
||||||
|
tweakListOptions: func(opts *metav1.ListOptions) {
|
||||||
|
opts.FieldSelector = "metadata.name=name-foo"
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "tweak options: test adding an object not matching label selector should not trigger AddFunc",
|
||||||
|
informNS: "ns-foo",
|
||||||
|
ns: "ns-foo",
|
||||||
|
gvr: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"},
|
||||||
|
trigger: triggerFactory(t),
|
||||||
|
handler: handler,
|
||||||
|
tweakListOptions: func(opts *metav1.ListOptions) {
|
||||||
|
opts.LabelSelector = "environment=production"
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "tweak options: test adding an object matching label selector should trigger AddFunc",
|
||||||
|
informNS: "ns-foo",
|
||||||
|
ns: "ns-foo",
|
||||||
|
gvr: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"},
|
||||||
|
trigger: triggerFactory(t),
|
||||||
|
handler: handler,
|
||||||
|
tweakListOptions: func(opts *metav1.ListOptions) {
|
||||||
|
opts.LabelSelector = "environment=test"
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, ts := range scenarios {
|
for _, ts := range scenarios {
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/onsi/gomega"
|
"github.com/onsi/gomega"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
||||||
@ -32,6 +33,8 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/apiserver/pkg/storage/names"
|
"k8s.io/apiserver/pkg/storage/names"
|
||||||
"k8s.io/client-go/dynamic"
|
"k8s.io/client-go/dynamic"
|
||||||
|
"k8s.io/client-go/dynamic/dynamicinformer"
|
||||||
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/kubernetes/test/e2e/framework"
|
"k8s.io/kubernetes/test/e2e/framework"
|
||||||
"k8s.io/kubernetes/test/utils/crd"
|
"k8s.io/kubernetes/test/utils/crd"
|
||||||
imageutils "k8s.io/kubernetes/test/utils/image"
|
imageutils "k8s.io/kubernetes/test/utils/image"
|
||||||
@ -150,15 +153,44 @@ var _ = SIGDescribe("CustomResourceFieldSelectors [Privileged:ClusterAdmin]", fu
|
|||||||
|
|
||||||
ginkgo.By("Watching with field selectors")
|
ginkgo.By("Watching with field selectors")
|
||||||
|
|
||||||
v2Client, gvr := customResourceClient(crd, "v2")
|
v2Client, v2gvr := customResourceClient(crd, "v2")
|
||||||
hostWatch, err := v2Client.Namespace(f.Namespace.Name).Watch(ctx, metav1.ListOptions{FieldSelector: "host=host1"})
|
v2hostWatch, err := v2Client.Namespace(f.Namespace.Name).Watch(ctx, metav1.ListOptions{FieldSelector: "host=host1"})
|
||||||
framework.ExpectNoError(err, "watching custom resources with field selector")
|
framework.ExpectNoError(err, "watching custom resources with field selector")
|
||||||
|
v2hostWatchAcc := watchAccumulator(v2hostWatch)
|
||||||
v2hostPortWatch, err := v2Client.Namespace(f.Namespace.Name).Watch(ctx, metav1.ListOptions{FieldSelector: "host=host1,port=80"})
|
v2hostPortWatch, err := v2Client.Namespace(f.Namespace.Name).Watch(ctx, metav1.ListOptions{FieldSelector: "host=host1,port=80"})
|
||||||
framework.ExpectNoError(err, "watching custom resources with field selector")
|
framework.ExpectNoError(err, "watching custom resources with field selector")
|
||||||
|
framework.ExpectNoError(err, "watching custom resources with field selector")
|
||||||
|
v2hostPortWatchAcc := watchAccumulator(v2hostPortWatch)
|
||||||
|
|
||||||
v1Client, _ := customResourceClient(crd, "v1")
|
v1Client, _ := customResourceClient(crd, "v1")
|
||||||
v1hostPortWatch, err := v1Client.Namespace(f.Namespace.Name).Watch(ctx, metav1.ListOptions{FieldSelector: "hostPort=host1:80"})
|
v1hostPortWatch, err := v1Client.Namespace(f.Namespace.Name).Watch(ctx, metav1.ListOptions{FieldSelector: "hostPort=host1:80"})
|
||||||
framework.ExpectNoError(err, "watching custom resources with field selector")
|
framework.ExpectNoError(err, "watching custom resources with field selector")
|
||||||
|
v1hostPortWatchAcc := watchAccumulator(v1hostPortWatch)
|
||||||
|
|
||||||
|
ginkgo.By("Registering informers with field selectors")
|
||||||
|
|
||||||
|
informerCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
hostInformer := dynamicinformer.NewFilteredDynamicSharedInformerFactory(f.DynamicClient, 0, f.Namespace.Name, func(opts *metav1.ListOptions) {
|
||||||
|
opts.FieldSelector = "host=host1"
|
||||||
|
})
|
||||||
|
hostInformer.Start(informerCtx.Done())
|
||||||
|
|
||||||
|
hostPortInformer := dynamicinformer.NewFilteredDynamicSharedInformerFactory(f.DynamicClient, 0, f.Namespace.Name, func(opts *metav1.ListOptions) {
|
||||||
|
opts.FieldSelector = "host=host1,port=80"
|
||||||
|
})
|
||||||
|
hostPortInformer.Start(informerCtx.Done())
|
||||||
|
|
||||||
|
v2HostInformer := hostInformer.ForResource(v2gvr).Informer()
|
||||||
|
go v2HostInformer.Run(informerCtx.Done())
|
||||||
|
v2HostPortInformer := hostPortInformer.ForResource(v2gvr).Informer()
|
||||||
|
go v2HostPortInformer.Run(informerCtx.Done())
|
||||||
|
|
||||||
|
v2HostInformerAcc := informerAccumulator(v2HostInformer)
|
||||||
|
v2HostPortInformerAcc := informerAccumulator(v2HostPortInformer)
|
||||||
|
|
||||||
|
framework.ExpectNoError(err, "adding event handler")
|
||||||
|
|
||||||
ginkgo.By("Creating custom resources")
|
ginkgo.By("Creating custom resources")
|
||||||
toCreate := []map[string]any{
|
toCreate := []map[string]any{
|
||||||
@ -181,7 +213,7 @@ var _ = SIGDescribe("CustomResourceFieldSelectors [Privileged:ClusterAdmin]", fu
|
|||||||
crNames[i] = name
|
crNames[i] = name
|
||||||
|
|
||||||
obj := map[string]interface{}{
|
obj := map[string]interface{}{
|
||||||
"apiVersion": gvr.Group + "/" + gvr.Version,
|
"apiVersion": v2gvr.Group + "/" + v2gvr.Version,
|
||||||
"kind": crd.Spec.Names.Kind,
|
"kind": crd.Spec.Names.Kind,
|
||||||
"metadata": map[string]interface{}{
|
"metadata": map[string]interface{}{
|
||||||
"name": name,
|
"name": name,
|
||||||
@ -215,15 +247,23 @@ var _ = SIGDescribe("CustomResourceFieldSelectors [Privileged:ClusterAdmin]", fu
|
|||||||
gomega.Expect(listResultToNames(list)).To(gomega.Equal(sets.New(crNames[1])))
|
gomega.Expect(listResultToNames(list)).To(gomega.Equal(sets.New(crNames[1])))
|
||||||
|
|
||||||
ginkgo.By("Waiting for watch events to contain v2 custom resources for field selector host=host1")
|
ginkgo.By("Waiting for watch events to contain v2 custom resources for field selector host=host1")
|
||||||
gomega.Eventually(ctx, watchAccumulator(hostWatch)).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
|
gomega.Eventually(ctx, v2hostWatchAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
|
||||||
Should(gomega.Equal(addedEvents(sets.New(crNames[0], crNames[1]))))
|
Should(gomega.Equal(addedEvents(sets.New(crNames[0], crNames[1]))))
|
||||||
|
|
||||||
ginkgo.By("Waiting for watch events to contain v2 custom resources for field selector host=host1,port=80")
|
ginkgo.By("Waiting for watch events to contain v2 custom resources for field selector host=host1,port=80")
|
||||||
gomega.Eventually(ctx, watchAccumulator(v2hostPortWatch)).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
|
gomega.Eventually(ctx, v2hostPortWatchAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
|
||||||
Should(gomega.Equal(addedEvents(sets.New(crNames[0]))))
|
Should(gomega.Equal(addedEvents(sets.New(crNames[0]))))
|
||||||
|
|
||||||
ginkgo.By("Waiting for watch events to contain v1 custom resources for field selector hostPort=host1:80")
|
ginkgo.By("Waiting for watch events to contain v1 custom resources for field selector hostPort=host1:80")
|
||||||
gomega.Eventually(ctx, watchAccumulator(v1hostPortWatch)).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
|
gomega.Eventually(ctx, v1hostPortWatchAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
|
||||||
|
Should(gomega.Equal(addedEvents(sets.New(crNames[0]))))
|
||||||
|
|
||||||
|
ginkgo.By("Waiting for informer events to contain v2 custom resources for field selector host=host1")
|
||||||
|
gomega.Eventually(ctx, v2HostInformerAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
|
||||||
|
Should(gomega.Equal(addedEvents(sets.New(crNames[0], crNames[1]))))
|
||||||
|
|
||||||
|
ginkgo.By("Waiting for informer events to contain v2 custom resources for field selector host=host1,port=80")
|
||||||
|
gomega.Eventually(ctx, v2HostPortInformerAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
|
||||||
Should(gomega.Equal(addedEvents(sets.New(crNames[0]))))
|
Should(gomega.Equal(addedEvents(sets.New(crNames[0]))))
|
||||||
|
|
||||||
ginkgo.By("Deleting one custom resources to ensure that deletions are observed")
|
ginkgo.By("Deleting one custom resources to ensure that deletions are observed")
|
||||||
@ -249,18 +289,25 @@ var _ = SIGDescribe("CustomResourceFieldSelectors [Privileged:ClusterAdmin]", fu
|
|||||||
gomega.Expect(listResultToNames(list)).To(gomega.Equal(sets.New[string]()))
|
gomega.Expect(listResultToNames(list)).To(gomega.Equal(sets.New[string]()))
|
||||||
|
|
||||||
ginkgo.By("Waiting for v2 watch events after updates and deletes for field selector host=host1")
|
ginkgo.By("Waiting for v2 watch events after updates and deletes for field selector host=host1")
|
||||||
gomega.Eventually(ctx, watchAccumulator(hostWatch)).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
|
gomega.Eventually(ctx, v2hostWatchAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
|
||||||
Should(gomega.Equal(deletedEvents(sets.New(crNames[0], crNames[1]))))
|
Should(gomega.Equal(addedAndDeletedEvents(sets.New(crNames[0], crNames[1]), sets.New(crNames[0], crNames[1]))))
|
||||||
|
|
||||||
ginkgo.By("Waiting for v2 watch events after updates and deletes for field selector host=host1,port=80")
|
ginkgo.By("Waiting for v2 watch events after updates and deletes for field selector host=host1,port=80")
|
||||||
gomega.Eventually(ctx, watchAccumulator(v2hostPortWatch)).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
|
gomega.Eventually(ctx, v2hostPortWatchAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
|
||||||
Should(gomega.Equal(deletedEvents(sets.New(crNames[0]))))
|
Should(gomega.Equal(addedAndDeletedEvents(sets.New(crNames[0]), sets.New(crNames[0]))))
|
||||||
|
|
||||||
ginkgo.By("Waiting for v1 watch events after updates and deletes for field selector hostPort=host1:80")
|
ginkgo.By("Waiting for v1 watch events after updates and deletes for field selector hostPort=host1:80")
|
||||||
gomega.Eventually(ctx, watchAccumulator(v1hostPortWatch)).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
|
gomega.Eventually(ctx, v1hostPortWatchAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
|
||||||
Should(gomega.Equal(deletedEvents(sets.New(crNames[0]))))
|
Should(gomega.Equal(addedAndDeletedEvents(sets.New(crNames[0]), sets.New(crNames[0]))))
|
||||||
})
|
|
||||||
|
|
||||||
|
ginkgo.By("Waiting for v2 informer events after updates and deletes for field selector host=host1")
|
||||||
|
gomega.Eventually(ctx, v2HostInformerAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
|
||||||
|
Should(gomega.Equal(addedAndDeletedEvents(sets.New(crNames[0], crNames[1]), sets.New(crNames[0], crNames[1]))))
|
||||||
|
|
||||||
|
ginkgo.By("Waiting for v2 informer events after updates and deletes for field selector host=host1,port=80")
|
||||||
|
gomega.Eventually(ctx, v2HostPortInformerAcc).WithPolling(5 * time.Millisecond).WithTimeout(30 * time.Second).
|
||||||
|
Should(gomega.Equal(addedAndDeletedEvents(sets.New(crNames[0]), sets.New(crNames[0]))))
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -276,8 +323,8 @@ func addedEvents(added sets.Set[string]) *accumulatedEvents {
|
|||||||
return &accumulatedEvents{added: added, deleted: sets.New[string]()}
|
return &accumulatedEvents{added: added, deleted: sets.New[string]()}
|
||||||
}
|
}
|
||||||
|
|
||||||
func deletedEvents(deleted sets.Set[string]) *accumulatedEvents {
|
func addedAndDeletedEvents(added, deleted sets.Set[string]) *accumulatedEvents {
|
||||||
return &accumulatedEvents{added: sets.New[string](), deleted: deleted}
|
return &accumulatedEvents{added: added, deleted: deleted}
|
||||||
}
|
}
|
||||||
|
|
||||||
func watchAccumulator(w watch.Interface) func(ctx context.Context) (*accumulatedEvents, error) {
|
func watchAccumulator(w watch.Interface) func(ctx context.Context) (*accumulatedEvents, error) {
|
||||||
@ -301,6 +348,39 @@ func watchAccumulator(w watch.Interface) func(ctx context.Context) (*accumulated
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func informerAccumulator(informer cache.SharedIndexInformer) func(ctx context.Context) (*accumulatedEvents, error) {
|
||||||
|
var lock sync.Mutex
|
||||||
|
result := emptyEvents()
|
||||||
|
|
||||||
|
_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
|
AddFunc: func(obj any) {
|
||||||
|
defer ginkgo.GinkgoRecover()
|
||||||
|
lock.Lock()
|
||||||
|
defer lock.Unlock()
|
||||||
|
result.added.Insert(obj.(*unstructured.Unstructured).GetName())
|
||||||
|
},
|
||||||
|
UpdateFunc: func(oldObj, newObj any) {
|
||||||
|
defer ginkgo.GinkgoRecover()
|
||||||
|
// ignoring
|
||||||
|
},
|
||||||
|
DeleteFunc: func(obj any) {
|
||||||
|
defer ginkgo.GinkgoRecover()
|
||||||
|
lock.Lock()
|
||||||
|
defer lock.Unlock()
|
||||||
|
result.deleted.Insert(obj.(*unstructured.Unstructured).GetName())
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
return func(ctx context.Context) (*accumulatedEvents, error) {
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
lock.Lock()
|
||||||
|
defer lock.Unlock()
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func listResultToNames(list *unstructured.UnstructuredList) sets.Set[string] {
|
func listResultToNames(list *unstructured.UnstructuredList) sets.Set[string] {
|
||||||
found := sets.New[string]()
|
found := sets.New[string]()
|
||||||
for _, i := range list.Items {
|
for _, i := range list.Items {
|
||||||
|
Loading…
Reference in New Issue
Block a user