From d778356bc6a057ae41bee4577e568293a25fce9b Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Thu, 11 Jul 2024 13:49:29 +0200 Subject: [PATCH 1/3] dynamic client: add support for API streaming --- .../k8s.io/client-go/dynamic/client_test.go | 169 ++++++++++++++++++ .../src/k8s.io/client-go/dynamic/simple.go | 43 ++++- 2 files changed, 207 insertions(+), 5 deletions(-) diff --git a/staging/src/k8s.io/client-go/dynamic/client_test.go b/staging/src/k8s.io/client-go/dynamic/client_test.go index 2f16abb88c6..c812b016469 100644 --- a/staging/src/k8s.io/client-go/dynamic/client_test.go +++ b/staging/src/k8s.io/client-go/dynamic/client_test.go @@ -27,6 +27,8 @@ import ( "strings" "testing" + "github.com/google/go-cmp/cmp" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" @@ -34,6 +36,8 @@ import ( "k8s.io/apimachinery/pkg/runtime/serializer/streaming" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" + clientfeatures "k8s.io/client-go/features" + clientfeaturestesting "k8s.io/client-go/features/testing" restclient "k8s.io/client-go/rest" restclientwatch "k8s.io/client-go/rest/watch" ) @@ -148,6 +152,171 @@ func TestList(t *testing.T) { } } +func TestWatchList(t *testing.T) { + clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true) + + type requestParam struct { + Path string + Query string + } + + scenarios := []struct { + name string + namespace string + watchResponse []watch.Event + listResponse []byte + + expectedRequestParams []requestParam + expectedList *unstructured.UnstructuredList + }{ + { + name: "watch-list request for cluster wide resource", + watchResponse: []watch.Event{ + {Type: watch.Added, Object: getObject("gtest/vTest", "rTest", "item1")}, + {Type: watch.Added, Object: getObject("gtest/vTest", "rTest", "item2")}, + {Type: watch.Bookmark, Object: func() runtime.Object { + obj := getObject("gtest/vTest", "rTest", "item2") + obj.SetResourceVersion("10") + obj.SetAnnotations(map[string]string{metav1.InitialEventsAnnotationKey: "true"}) + return obj + }()}, + }, + expectedRequestParams: []requestParam{ + { + Path: "/apis/gtest/vtest/rtest", + Query: "allowWatchBookmarks=true&resourceVersionMatch=NotOlderThan&sendInitialEvents=true&watch=true", + }, + }, + expectedList: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "apiVersion": "", + "kind": "UnstructuredList", + "metadata": map[string]interface{}{ + "resourceVersion": "10", + }, + }, + Items: []unstructured.Unstructured{ + *getObject("gtest/vTest", "rTest", "item1"), + *getObject("gtest/vTest", "rTest", "item2"), + }, + }, + }, + { + name: "watch-list request for namespaced watch resource", + namespace: "nstest", + watchResponse: []watch.Event{ + {Type: watch.Added, Object: getObject("gtest/vTest", "rTest", "item1")}, + {Type: watch.Bookmark, Object: func() runtime.Object { + obj := getObject("gtest/vTest", "rTest", "item2") + obj.SetResourceVersion("39") + obj.SetAnnotations(map[string]string{metav1.InitialEventsAnnotationKey: "true"}) + return obj + }()}, + }, + expectedRequestParams: []requestParam{ + { + Path: "/apis/gtest/vtest/namespaces/nstest/rtest", + Query: "allowWatchBookmarks=true&resourceVersionMatch=NotOlderThan&sendInitialEvents=true&watch=true", + }, + }, + expectedList: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "apiVersion": "", + "kind": "UnstructuredList", + "metadata": map[string]interface{}{ + "resourceVersion": "39", + }, + }, + Items: []unstructured.Unstructured{ + *getObject("gtest/vTest", "rTest", "item1"), + }, + }, + }, + { + name: "watch-list request falls back to standard list on any error", + namespace: "nstest", + // watchList method in client-go expect only watch.Add and watch.Bookmark events + // receiving watch.Error will cause this method to report an error which will + // trigger the fallback logic + watchResponse: []watch.Event{ + {Type: watch.Error, Object: getObject("gtest/vTest", "rTest", "item1")}, + }, + listResponse: getListJSON("vTest", "UnstructuredList", + getJSON("gtest/vTest", "rTest", "item1"), + getJSON("gtest/vTest", "rTest", "item2")), + expectedRequestParams: []requestParam{ + // a watch-list request first + { + Path: "/apis/gtest/vtest/namespaces/nstest/rtest", + Query: "allowWatchBookmarks=true&resourceVersionMatch=NotOlderThan&sendInitialEvents=true&watch=true", + }, + // a standard list request second + { + Path: "/apis/gtest/vtest/namespaces/nstest/rtest", + }, + }, + expectedList: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "apiVersion": "vTest", + "kind": "UnstructuredList", + }, + Items: []unstructured.Unstructured{ + *getObject("gtest/vTest", "rTest", "item1"), + *getObject("gtest/vTest", "rTest", "item2"), + }, + }, + }, + } + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + var actualRequestParams []requestParam + resource := schema.GroupVersionResource{Group: "gtest", Version: "vtest", Resource: "rtest"} + cl, srv, err := getClientServer(func(w http.ResponseWriter, r *http.Request) { + if r.Method != "GET" { + t.Errorf("unexpected HTTP method %s. expected GET", r.Method) + } + actualRequestParams = append(actualRequestParams, requestParam{ + Path: r.URL.Path, + Query: r.URL.RawQuery, + }) + + w.Header().Set("Content-Type", runtime.ContentTypeJSON) + // handle LIST response + if len(scenario.listResponse) > 0 { + if _, err := w.Write(scenario.listResponse); err != nil { + t.Fatal(err) + } + return + } + + // handle WATCH response + enc := restclientwatch.NewEncoder(streaming.NewEncoder(w, unstructured.UnstructuredJSONScheme), unstructured.UnstructuredJSONScheme) + for _, e := range scenario.watchResponse { + if err := enc.Encode(&e); err != nil { + t.Fatal(err) + } + } + }) + if err != nil { + t.Fatalf("unexpected error when creating test client and server: %v", err) + } + defer srv.Close() + + actualList, err := cl.Resource(resource).Namespace(scenario.namespace).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if !cmp.Equal(scenario.expectedRequestParams, actualRequestParams) { + t.Fatalf("unexpected request params: %v", cmp.Diff(scenario.expectedRequestParams, actualRequestParams)) + } + if !cmp.Equal(scenario.expectedList, actualList) { + t.Errorf("received expected list, diff: %s", cmp.Diff(scenario.expectedList, actualList)) + } + }) + } +} + func TestGet(t *testing.T) { tcs := []struct { resource string diff --git a/staging/src/k8s.io/client-go/dynamic/simple.go b/staging/src/k8s.io/client-go/dynamic/simple.go index 9ea8ef36948..326da7cbdf0 100644 --- a/staging/src/k8s.io/client-go/dynamic/simple.go +++ b/staging/src/k8s.io/client-go/dynamic/simple.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net/http" + "time" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -30,6 +31,8 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/rest" "k8s.io/client-go/util/consistencydetector" + "k8s.io/client-go/util/watchlist" + "k8s.io/klog/v2" ) type DynamicClient struct { @@ -293,13 +296,22 @@ func (c *dynamicResourceClient) Get(ctx context.Context, name string, opts metav return uncastObj.(*unstructured.Unstructured), nil } -func (c *dynamicResourceClient) List(ctx context.Context, opts metav1.ListOptions) (result *unstructured.UnstructuredList, err error) { - defer func() { +func (c *dynamicResourceClient) List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) { + if watchListOptions, hasWatchListOptionsPrepared, watchListOptionsErr := watchlist.PrepareWatchListOptionsFromListOptions(opts); watchListOptionsErr != nil { + klog.Warningf("Failed preparing watchlist options for %v, falling back to the standard LIST semantics, err = %v", c.resource, watchListOptionsErr) + } else if hasWatchListOptionsPrepared { + result, err := c.watchList(ctx, watchListOptions) if err == nil { - consistencydetector.CheckListFromCacheDataConsistencyIfRequested(ctx, fmt.Sprintf("list request for %v", c.resource), c.list, opts, result) + consistencydetector.CheckWatchListFromCacheDataConsistencyIfRequested(ctx, fmt.Sprintf("watchlist request for %v", c.resource), c.list, opts, result) + return result, nil } - }() - return c.list(ctx, opts) + klog.Warningf("The watchlist request for %v ended with an error, falling back to the standard LIST semantics, err = %v", c.resource, err) + } + result, err := c.list(ctx, opts) + if err == nil { + consistencydetector.CheckListFromCacheDataConsistencyIfRequested(ctx, fmt.Sprintf("list request for %v", c.resource), c.list, opts, result) + } + return result, err } func (c *dynamicResourceClient) list(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) { @@ -329,6 +341,27 @@ func (c *dynamicResourceClient) list(ctx context.Context, opts metav1.ListOption return list, nil } +// watchList establishes a watch stream with the server and returns an unstructured list. +func (c *dynamicResourceClient) watchList(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) { + if err := validateNamespaceWithOptionalName(c.namespace); err != nil { + return nil, err + } + + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + + result := &unstructured.UnstructuredList{} + err := c.client.client.Get().AbsPath(c.makeURLSegments("")...). + SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1). + Timeout(timeout). + WatchList(ctx). + Into(result) + + return result, err +} + func (c *dynamicResourceClient) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.Watch = true if err := validateNamespaceWithOptionalName(c.namespace); err != nil { From 951d325111729c3b7428fa514f42b3e763d24ddb Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Mon, 15 Jul 2024 11:21:00 +0200 Subject: [PATCH 2/3] test/apimachinery/watchlist: move common functionality to separate functions --- test/e2e/apimachinery/watchlist.go | 57 +++++++++++++++++++----------- 1 file changed, 36 insertions(+), 21 deletions(-) diff --git a/test/e2e/apimachinery/watchlist.go b/test/e2e/apimachinery/watchlist.go index 1d120477900..cbb7c561358 100644 --- a/test/e2e/apimachinery/watchlist.go +++ b/test/e2e/apimachinery/watchlist.go @@ -35,6 +35,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" clientfeatures "k8s.io/client-go/features" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/consistencydetector" "k8s.io/component-base/featuregate" @@ -103,14 +104,7 @@ var _ = SIGDescribe("API Streaming (aka. WatchList)", framework.WithSerial(), fu expectedSecrets = append(expectedSecrets, *secret) } - var actualRequestsMadeByKubeClient []string - clientConfig := f.ClientConfig() - clientConfig.Wrap(func(rt http.RoundTripper) http.RoundTripper { - return roundTripFunc(func(req *http.Request) (*http.Response, error) { - actualRequestsMadeByKubeClient = append(actualRequestsMadeByKubeClient, req.URL.RawQuery) - return rt.RoundTrip(req) - }) - }) + rt, clientConfig := clientConfigWithRoundTripper(f) wrappedKubeClient, err := kubernetes.NewForConfig(clientConfig) framework.ExpectNoError(err) @@ -120,26 +114,35 @@ var _ = SIGDescribe("API Streaming (aka. WatchList)", framework.WithSerial(), fu ginkgo.By("Verifying if the secret list was properly streamed") streamedSecrets := secretList.Items - sort.Sort(byName(expectedSecrets)) gomega.Expect(cmp.Equal(expectedSecrets, streamedSecrets)).To(gomega.BeTrueBecause("data received via watchlist must match the added data")) ginkgo.By("Verifying if expected requests were sent to the server") - expectedRequestMadeByKubeClient := []string{ - // corresponds to a streaming request made by the kube client to stream the secrets - "allowWatchBookmarks=true&resourceVersionMatch=NotOlderThan&sendInitialEvents=true&watch=true", - } - if consistencydetector.IsDataConsistencyDetectionForWatchListEnabled() { - // corresponds to a standard list request made by the consistency detector build in into the kube client - expectedRequestMadeByKubeClient = append(expectedRequestMadeByKubeClient, fmt.Sprintf("resourceVersion=%s&resourceVersionMatch=Exact", secretList.ResourceVersion)) - } - gomega.Expect(actualRequestsMadeByKubeClient).To(gomega.Equal(expectedRequestMadeByKubeClient)) + expectedRequestMadeByKubeClient := getExpectedRequestMadeByClientFor(secretList.ResourceVersion) + gomega.Expect(rt.actualRequests).To(gomega.Equal(expectedRequestMadeByKubeClient)) }) }) -type roundTripFunc func(req *http.Request) (*http.Response, error) +type roundTripper struct { + actualRequests []string + delegate http.RoundTripper +} -func (f roundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) { - return f(req) +func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + r.actualRequests = append(r.actualRequests, req.URL.RawQuery) + return r.delegate.RoundTrip(req) +} + +func (r *roundTripper) Wrap(delegate http.RoundTripper) http.RoundTripper { + r.delegate = delegate + return r +} + +func clientConfigWithRoundTripper(f *framework.Framework) (*roundTripper, *rest.Config) { + clientConfig := f.ClientConfig() + rt := &roundTripper{} + clientConfig.Wrap(rt.Wrap) + + return rt, clientConfig } func verifyStore(ctx context.Context, expectedSecrets []v1.Secret, store cache.Store) { @@ -157,6 +160,18 @@ func verifyStore(ctx context.Context, expectedSecrets []v1.Secret, store cache.S framework.ExpectNoError(err) } +func getExpectedRequestMadeByClientFor(rv string) []string { + expectedRequestMadeByClient := []string{ + // corresponds to a streaming request made by the client to stream the secrets + "allowWatchBookmarks=true&resourceVersionMatch=NotOlderThan&sendInitialEvents=true&watch=true", + } + if consistencydetector.IsDataConsistencyDetectionForWatchListEnabled() { + // corresponds to a standard list request made by the consistency detector build in into the client + expectedRequestMadeByClient = append(expectedRequestMadeByClient, fmt.Sprintf("resourceVersion=%s&resourceVersionMatch=Exact", rv)) + } + return expectedRequestMadeByClient +} + type byName []v1.Secret func (a byName) Len() int { return len(a) } From ba160f6ed3f81f6dbe2c1dd5d0cdce6048812130 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Mon, 15 Jul 2024 11:24:30 +0200 Subject: [PATCH 3/3] test/apimachinery/watchlist: add scenario for dynamic client's List method --- test/e2e/apimachinery/watchlist.go | 31 ++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/test/e2e/apimachinery/watchlist.go b/test/e2e/apimachinery/watchlist.go index cbb7c561358..06a43c46ad3 100644 --- a/test/e2e/apimachinery/watchlist.go +++ b/test/e2e/apimachinery/watchlist.go @@ -29,10 +29,12 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/dynamic" clientfeatures "k8s.io/client-go/features" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -120,6 +122,35 @@ var _ = SIGDescribe("API Streaming (aka. WatchList)", framework.WithSerial(), fu expectedRequestMadeByKubeClient := getExpectedRequestMadeByClientFor(secretList.ResourceVersion) gomega.Expect(rt.actualRequests).To(gomega.Equal(expectedRequestMadeByKubeClient)) }) + ginkgo.It("should be requested by dynamic client's List method when WatchListClient is enabled", func(ctx context.Context) { + featuregatetesting.SetFeatureGateDuringTest(ginkgo.GinkgoTB(), utilfeature.DefaultFeatureGate, featuregate.Feature(clientfeatures.WatchListClient), true) + + ginkgo.By(fmt.Sprintf("Adding 5 secrets to %s namespace", f.Namespace.Name)) + var expectedSecrets []unstructured.Unstructured + for i := 1; i <= 5; i++ { + unstructuredSecret, err := runtime.DefaultUnstructuredConverter.ToUnstructured(newSecret(fmt.Sprintf("secret-%d", i))) + framework.ExpectNoError(err) + secret, err := f.DynamicClient.Resource(v1.SchemeGroupVersion.WithResource("secrets")).Namespace(f.Namespace.Name).Create(ctx, &unstructured.Unstructured{Object: unstructuredSecret}, metav1.CreateOptions{}) + framework.ExpectNoError(err) + expectedSecrets = append(expectedSecrets, *secret) + } + + rt, clientConfig := clientConfigWithRoundTripper(f) + wrappedDynamicClient, err := dynamic.NewForConfig(clientConfig) + framework.ExpectNoError(err) + + ginkgo.By("Streaming secrets from the server") + secretList, err := wrappedDynamicClient.Resource(v1.SchemeGroupVersion.WithResource("secrets")).Namespace(f.Namespace.Name).List(ctx, metav1.ListOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("Verifying if the secret list was properly streamed") + streamedSecrets := secretList.Items + gomega.Expect(cmp.Equal(expectedSecrets, streamedSecrets)).To(gomega.BeTrueBecause("data received via watchlist must match the added data")) + + ginkgo.By("Verifying if expected requests were sent to the server") + expectedRequestMadeByDynamicClient := getExpectedRequestMadeByClientFor(secretList.GetResourceVersion()) + gomega.Expect(rt.actualRequests).To(gomega.Equal(expectedRequestMadeByDynamicClient)) + }) }) type roundTripper struct {