From 1994540ffc5495bc78a4aaa428873d04d4dda442 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Mon, 16 Sep 2024 11:51:26 +0200 Subject: [PATCH 1/5] client-go/metadata: refactor List method --- staging/src/k8s.io/client-go/metadata/metadata.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/staging/src/k8s.io/client-go/metadata/metadata.go b/staging/src/k8s.io/client-go/metadata/metadata.go index 2cc7e22adf7..e8d263a2793 100644 --- a/staging/src/k8s.io/client-go/metadata/metadata.go +++ b/staging/src/k8s.io/client-go/metadata/metadata.go @@ -218,6 +218,10 @@ func (c *client) Get(ctx context.Context, name string, opts metav1.GetOptions, s // List returns all resources within the specified scope (namespace or cluster). func (c *client) List(ctx context.Context, opts metav1.ListOptions) (*metav1.PartialObjectMetadataList, error) { + return c.list(ctx, opts) +} + +func (c *client) list(ctx context.Context, opts metav1.ListOptions) (*metav1.PartialObjectMetadataList, error) { result := c.client.client.Get().AbsPath(c.makeURLSegments("")...). SetHeader("Accept", "application/vnd.kubernetes.protobuf;as=PartialObjectMetadataList;g=meta.k8s.io;v=v1,application/json;as=PartialObjectMetadataList;g=meta.k8s.io;v=v1,application/json"). SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1). From 3a0aa1093f5e11afb3d729e5799c2de526c20de9 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Mon, 16 Sep 2024 11:56:54 +0200 Subject: [PATCH 2/5] client-go/metadata: add watchlist method --- .../src/k8s.io/client-go/metadata/metadata.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/staging/src/k8s.io/client-go/metadata/metadata.go b/staging/src/k8s.io/client-go/metadata/metadata.go index e8d263a2793..7eddc6734e9 100644 --- a/staging/src/k8s.io/client-go/metadata/metadata.go +++ b/staging/src/k8s.io/client-go/metadata/metadata.go @@ -253,6 +253,25 @@ func (c *client) list(ctx context.Context, opts metav1.ListOptions) (*metav1.Par return partial, nil } +// watchList establishes a watch stream with the server and returns PartialObjectMetadataList. +func (c *client) watchList(ctx context.Context, opts metav1.ListOptions) (*metav1.PartialObjectMetadataList, error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + + result := &metav1.PartialObjectMetadataList{} + err := c.client.client.Get(). + AbsPath(c.makeURLSegments("")...). + SetHeader("Accept", "application/vnd.kubernetes.protobuf;as=PartialObjectMetadata;g=meta.k8s.io;v=v1,application/json;as=PartialObjectMetadata;g=meta.k8s.io;v=v1,application/json"). + SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1). + Timeout(timeout). + WatchList(ctx). + Into(result) + + return result, err +} + // Watch finds all changes to the resources in the specified scope (namespace or cluster). func (c *client) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { var timeout time.Duration From 0912e400cda2dd6ef6950dea15b18f826777ac41 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Mon, 16 Sep 2024 12:00:11 +0200 Subject: [PATCH 3/5] client-go/metadata: use watchlist --- .../src/k8s.io/client-go/metadata/metadata.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/staging/src/k8s.io/client-go/metadata/metadata.go b/staging/src/k8s.io/client-go/metadata/metadata.go index 7eddc6734e9..a19ba13049f 100644 --- a/staging/src/k8s.io/client-go/metadata/metadata.go +++ b/staging/src/k8s.io/client-go/metadata/metadata.go @@ -33,6 +33,8 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/rest" + "k8s.io/client-go/util/consistencydetector" + "k8s.io/client-go/util/watchlist" ) var deleteScheme = runtime.NewScheme() @@ -218,7 +220,21 @@ func (c *client) Get(ctx context.Context, name string, opts metav1.GetOptions, s // List returns all resources within the specified scope (namespace or cluster). func (c *client) List(ctx context.Context, opts metav1.ListOptions) (*metav1.PartialObjectMetadataList, error) { - return c.list(ctx, opts) + if watchListOptions, hasWatchListOptionsPrepared, watchListOptionsErr := watchlist.PrepareWatchListOptionsFromListOptions(opts); watchListOptionsErr != nil { + klog.FromContext(ctx).Error(watchListOptionsErr, "Failed preparing watchlist options, falling back to the standard LIST semantics", "resource", c.resource) + } else if hasWatchListOptionsPrepared { + result, err := c.watchList(ctx, watchListOptions) + if err == nil { + consistencydetector.CheckWatchListFromCacheDataConsistencyIfRequested(ctx, fmt.Sprintf("watchlist request for %v", c.resource), c.list, opts, result) + return result, nil + } + klog.FromContext(ctx).Error(err, "The watchlist request ended with an error, falling back to the standard LIST semantics", "resource", c.resource) + } + result, err := c.list(ctx, opts) + if err == nil { + consistencydetector.CheckListFromCacheDataConsistencyIfRequested(ctx, fmt.Sprintf("list request for %v", c.resource), c.list, opts, result) + } + return result, err } func (c *client) list(ctx context.Context, opts metav1.ListOptions) (*metav1.PartialObjectMetadataList, error) { From 6d258660fd6ab4978638a051f78b7dd8c1be6f3e Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Mon, 16 Sep 2024 13:39:18 +0200 Subject: [PATCH 4/5] e2e/apimachinery/watchlist: slighty refactor unstructured tests --- test/e2e/apimachinery/watchlist.go | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/test/e2e/apimachinery/watchlist.go b/test/e2e/apimachinery/watchlist.go index 0e5e131a2cc..041768a64a9 100644 --- a/test/e2e/apimachinery/watchlist.go +++ b/test/e2e/apimachinery/watchlist.go @@ -66,13 +66,7 @@ var _ = SIGDescribe("API Streaming (aka. WatchList)", framework.WithSerial(), fe nil, ) - ginkgo.By(fmt.Sprintf("Adding 5 secrets to %s namespace", f.Namespace.Name)) - var expectedSecrets []v1.Secret - for i := 1; i <= 5; i++ { - secret, err := f.ClientSet.CoreV1().Secrets(f.Namespace.Name).Create(ctx, newSecret(fmt.Sprintf("secret-%d", i)), metav1.CreateOptions{}) - framework.ExpectNoError(err) - expectedSecrets = append(expectedSecrets, *secret) - } + expectedSecrets := addWellKnownSecrets(ctx, f) ginkgo.By("Starting the secret informer") go secretInformer.Run(stopCh) @@ -99,13 +93,7 @@ var _ = SIGDescribe("API Streaming (aka. WatchList)", framework.WithSerial(), fe ginkgo.It("should be requested by client-go's List method when WatchListClient is enabled", func(ctx context.Context) { featuregatetesting.SetFeatureGateDuringTest(ginkgo.GinkgoTB(), utilfeature.DefaultFeatureGate, featuregate.Feature(clientfeatures.WatchListClient), true) - ginkgo.By(fmt.Sprintf("Adding 5 secrets to %s namespace", f.Namespace.Name)) - var expectedSecrets []v1.Secret - for i := 1; i <= 5; i++ { - secret, err := f.ClientSet.CoreV1().Secrets(f.Namespace.Name).Create(ctx, newSecret(fmt.Sprintf("secret-%d", i)), metav1.CreateOptions{}) - framework.ExpectNoError(err) - expectedSecrets = append(expectedSecrets, *secret) - } + expectedSecrets := addWellKnownSecrets(ctx, f) rt, clientConfig := clientConfigWithRoundTripper(f) wrappedKubeClient, err := kubernetes.NewForConfig(clientConfig) @@ -204,6 +192,17 @@ func getExpectedRequestMadeByClientFor(rv string) []string { return expectedRequestMadeByClient } +func addWellKnownSecrets(ctx context.Context, f *framework.Framework) []v1.Secret { + ginkgo.By(fmt.Sprintf("Adding 5 secrets to %s namespace", f.Namespace.Name)) + var secrets []v1.Secret + for i := 1; i <= 5; i++ { + secret, err := f.ClientSet.CoreV1().Secrets(f.Namespace.Name).Create(ctx, newSecret(fmt.Sprintf("secret-%d", i)), metav1.CreateOptions{}) + framework.ExpectNoError(err) + secrets = append(secrets, *secret) + } + return secrets +} + type byName []v1.Secret func (a byName) Len() int { return len(a) } From 0f933a0b14d6f948be6460839e4fce021d0f1e05 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Mon, 16 Sep 2024 14:33:48 +0200 Subject: [PATCH 5/5] test/apimachinery/watchlist: add scenario for metadata client's List method --- test/e2e/apimachinery/watchlist.go | 31 ++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/test/e2e/apimachinery/watchlist.go b/test/e2e/apimachinery/watchlist.go index 041768a64a9..46541757fd5 100644 --- a/test/e2e/apimachinery/watchlist.go +++ b/test/e2e/apimachinery/watchlist.go @@ -31,12 +31,14 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/dynamic" clientfeatures "k8s.io/client-go/features" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/metadata" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/consistencydetector" @@ -140,6 +142,35 @@ var _ = SIGDescribe("API Streaming (aka. WatchList)", framework.WithSerial(), fe expectedRequestMadeByDynamicClient := getExpectedRequestMadeByClientFor(secretList.GetResourceVersion()) gomega.Expect(rt.actualRequests).To(gomega.Equal(expectedRequestMadeByDynamicClient)) }) + ginkgo.It("should be requested by metadata client's List method when WatchListClient is enabled", func(ctx context.Context) { + featuregatetesting.SetFeatureGateDuringTest(ginkgo.GinkgoTB(), utilfeature.DefaultFeatureGate, featuregate.Feature(clientfeatures.WatchListClient), true) + + metaClient, err := metadata.NewForConfig(f.ClientConfig()) + framework.ExpectNoError(err) + expectedMetaSecrets := []metav1.PartialObjectMetadata{} + for _, addedSecret := range addWellKnownSecrets(ctx, f) { + addedSecretMeta, err := metaClient.Resource(v1.SchemeGroupVersion.WithResource("secrets")).Namespace(f.Namespace.Name).Get(ctx, addedSecret.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + expectedMetaSecrets = append(expectedMetaSecrets, *addedSecretMeta) + } + + rt, clientConfig := clientConfigWithRoundTripper(f) + wrappedMetaClient, err := metadata.NewForConfig(clientConfig) + framework.ExpectNoError(err) + + ginkgo.By("Streaming secrets metadata from the server") + secretMetaList, err := wrappedMetaClient.Resource(v1.SchemeGroupVersion.WithResource("secrets")).Namespace(f.Namespace.Name).List(ctx, metav1.ListOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("Verifying if the secret meta list was properly streamed") + streamedMetaSecrets := secretMetaList.Items + gomega.Expect(cmp.Equal(expectedMetaSecrets, streamedMetaSecrets)).To(gomega.BeTrueBecause("data received via watchlist must match the added data")) + gomega.Expect(secretMetaList.GetObjectKind().GroupVersionKind()).To(gomega.Equal(schema.GroupVersion{}.WithKind("PartialObjectMetadataList"))) + + ginkgo.By("Verifying if expected requests were sent to the server") + expectedRequestMadeByMetaClient := getExpectedRequestMadeByClientFor(secretMetaList.GetResourceVersion()) + gomega.Expect(rt.actualRequests).To(gomega.Equal(expectedRequestMadeByMetaClient)) + }) }) type roundTripper struct {