Merge pull request #125305 from p0lyn0mial/upstream-dynamic-client-uses-watch-list

dynamic client: add support for API streaming
This commit is contained in:
Kubernetes Prow Robot 2024-07-15 10:05:12 -07:00 committed by GitHub
commit 06ec6ba05d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 273 additions and 25 deletions

View File

@ -27,6 +27,8 @@ import (
"strings"
"testing"
"github.com/google/go-cmp/cmp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
@ -34,6 +36,8 @@ import (
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
clientfeatures "k8s.io/client-go/features"
clientfeaturestesting "k8s.io/client-go/features/testing"
restclient "k8s.io/client-go/rest"
restclientwatch "k8s.io/client-go/rest/watch"
)
@ -148,6 +152,171 @@ func TestList(t *testing.T) {
}
}
func TestWatchList(t *testing.T) {
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true)
type requestParam struct {
Path string
Query string
}
scenarios := []struct {
name string
namespace string
watchResponse []watch.Event
listResponse []byte
expectedRequestParams []requestParam
expectedList *unstructured.UnstructuredList
}{
{
name: "watch-list request for cluster wide resource",
watchResponse: []watch.Event{
{Type: watch.Added, Object: getObject("gtest/vTest", "rTest", "item1")},
{Type: watch.Added, Object: getObject("gtest/vTest", "rTest", "item2")},
{Type: watch.Bookmark, Object: func() runtime.Object {
obj := getObject("gtest/vTest", "rTest", "item2")
obj.SetResourceVersion("10")
obj.SetAnnotations(map[string]string{metav1.InitialEventsAnnotationKey: "true"})
return obj
}()},
},
expectedRequestParams: []requestParam{
{
Path: "/apis/gtest/vtest/rtest",
Query: "allowWatchBookmarks=true&resourceVersionMatch=NotOlderThan&sendInitialEvents=true&watch=true",
},
},
expectedList: &unstructured.UnstructuredList{
Object: map[string]interface{}{
"apiVersion": "",
"kind": "UnstructuredList",
"metadata": map[string]interface{}{
"resourceVersion": "10",
},
},
Items: []unstructured.Unstructured{
*getObject("gtest/vTest", "rTest", "item1"),
*getObject("gtest/vTest", "rTest", "item2"),
},
},
},
{
name: "watch-list request for namespaced watch resource",
namespace: "nstest",
watchResponse: []watch.Event{
{Type: watch.Added, Object: getObject("gtest/vTest", "rTest", "item1")},
{Type: watch.Bookmark, Object: func() runtime.Object {
obj := getObject("gtest/vTest", "rTest", "item2")
obj.SetResourceVersion("39")
obj.SetAnnotations(map[string]string{metav1.InitialEventsAnnotationKey: "true"})
return obj
}()},
},
expectedRequestParams: []requestParam{
{
Path: "/apis/gtest/vtest/namespaces/nstest/rtest",
Query: "allowWatchBookmarks=true&resourceVersionMatch=NotOlderThan&sendInitialEvents=true&watch=true",
},
},
expectedList: &unstructured.UnstructuredList{
Object: map[string]interface{}{
"apiVersion": "",
"kind": "UnstructuredList",
"metadata": map[string]interface{}{
"resourceVersion": "39",
},
},
Items: []unstructured.Unstructured{
*getObject("gtest/vTest", "rTest", "item1"),
},
},
},
{
name: "watch-list request falls back to standard list on any error",
namespace: "nstest",
// watchList method in client-go expect only watch.Add and watch.Bookmark events
// receiving watch.Error will cause this method to report an error which will
// trigger the fallback logic
watchResponse: []watch.Event{
{Type: watch.Error, Object: getObject("gtest/vTest", "rTest", "item1")},
},
listResponse: getListJSON("vTest", "UnstructuredList",
getJSON("gtest/vTest", "rTest", "item1"),
getJSON("gtest/vTest", "rTest", "item2")),
expectedRequestParams: []requestParam{
// a watch-list request first
{
Path: "/apis/gtest/vtest/namespaces/nstest/rtest",
Query: "allowWatchBookmarks=true&resourceVersionMatch=NotOlderThan&sendInitialEvents=true&watch=true",
},
// a standard list request second
{
Path: "/apis/gtest/vtest/namespaces/nstest/rtest",
},
},
expectedList: &unstructured.UnstructuredList{
Object: map[string]interface{}{
"apiVersion": "vTest",
"kind": "UnstructuredList",
},
Items: []unstructured.Unstructured{
*getObject("gtest/vTest", "rTest", "item1"),
*getObject("gtest/vTest", "rTest", "item2"),
},
},
},
}
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
var actualRequestParams []requestParam
resource := schema.GroupVersionResource{Group: "gtest", Version: "vtest", Resource: "rtest"}
cl, srv, err := getClientServer(func(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
t.Errorf("unexpected HTTP method %s. expected GET", r.Method)
}
actualRequestParams = append(actualRequestParams, requestParam{
Path: r.URL.Path,
Query: r.URL.RawQuery,
})
w.Header().Set("Content-Type", runtime.ContentTypeJSON)
// handle LIST response
if len(scenario.listResponse) > 0 {
if _, err := w.Write(scenario.listResponse); err != nil {
t.Fatal(err)
}
return
}
// handle WATCH response
enc := restclientwatch.NewEncoder(streaming.NewEncoder(w, unstructured.UnstructuredJSONScheme), unstructured.UnstructuredJSONScheme)
for _, e := range scenario.watchResponse {
if err := enc.Encode(&e); err != nil {
t.Fatal(err)
}
}
})
if err != nil {
t.Fatalf("unexpected error when creating test client and server: %v", err)
}
defer srv.Close()
actualList, err := cl.Resource(resource).Namespace(scenario.namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !cmp.Equal(scenario.expectedRequestParams, actualRequestParams) {
t.Fatalf("unexpected request params: %v", cmp.Diff(scenario.expectedRequestParams, actualRequestParams))
}
if !cmp.Equal(scenario.expectedList, actualList) {
t.Errorf("received expected list, diff: %s", cmp.Diff(scenario.expectedList, actualList))
}
})
}
}
func TestGet(t *testing.T) {
tcs := []struct {
resource string

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"net/http"
"time"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -30,6 +31,8 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/consistencydetector"
"k8s.io/client-go/util/watchlist"
"k8s.io/klog/v2"
)
type DynamicClient struct {
@ -293,13 +296,22 @@ func (c *dynamicResourceClient) Get(ctx context.Context, name string, opts metav
return uncastObj.(*unstructured.Unstructured), nil
}
func (c *dynamicResourceClient) List(ctx context.Context, opts metav1.ListOptions) (result *unstructured.UnstructuredList, err error) {
defer func() {
func (c *dynamicResourceClient) List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) {
if watchListOptions, hasWatchListOptionsPrepared, watchListOptionsErr := watchlist.PrepareWatchListOptionsFromListOptions(opts); watchListOptionsErr != nil {
klog.Warningf("Failed preparing watchlist options for %v, falling back to the standard LIST semantics, err = %v", c.resource, watchListOptionsErr)
} else if hasWatchListOptionsPrepared {
result, err := c.watchList(ctx, watchListOptions)
if err == nil {
consistencydetector.CheckListFromCacheDataConsistencyIfRequested(ctx, fmt.Sprintf("list request for %v", c.resource), c.list, opts, result)
consistencydetector.CheckWatchListFromCacheDataConsistencyIfRequested(ctx, fmt.Sprintf("watchlist request for %v", c.resource), c.list, opts, result)
return result, nil
}
}()
return c.list(ctx, opts)
klog.Warningf("The watchlist request for %v ended with an error, falling back to the standard LIST semantics, err = %v", c.resource, err)
}
result, err := c.list(ctx, opts)
if err == nil {
consistencydetector.CheckListFromCacheDataConsistencyIfRequested(ctx, fmt.Sprintf("list request for %v", c.resource), c.list, opts, result)
}
return result, err
}
func (c *dynamicResourceClient) list(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) {
@ -329,6 +341,27 @@ func (c *dynamicResourceClient) list(ctx context.Context, opts metav1.ListOption
return list, nil
}
// watchList establishes a watch stream with the server and returns an unstructured list.
func (c *dynamicResourceClient) watchList(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) {
if err := validateNamespaceWithOptionalName(c.namespace); err != nil {
return nil, err
}
var timeout time.Duration
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
result := &unstructured.UnstructuredList{}
err := c.client.client.Get().AbsPath(c.makeURLSegments("")...).
SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).
Timeout(timeout).
WatchList(ctx).
Into(result)
return result, err
}
func (c *dynamicResourceClient) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
opts.Watch = true
if err := validateNamespaceWithOptionalName(c.namespace); err != nil {

View File

@ -29,12 +29,15 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/dynamic"
clientfeatures "k8s.io/client-go/features"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/consistencydetector"
"k8s.io/component-base/featuregate"
@ -103,14 +106,7 @@ var _ = SIGDescribe("API Streaming (aka. WatchList)", framework.WithSerial(), fu
expectedSecrets = append(expectedSecrets, *secret)
}
var actualRequestsMadeByKubeClient []string
clientConfig := f.ClientConfig()
clientConfig.Wrap(func(rt http.RoundTripper) http.RoundTripper {
return roundTripFunc(func(req *http.Request) (*http.Response, error) {
actualRequestsMadeByKubeClient = append(actualRequestsMadeByKubeClient, req.URL.RawQuery)
return rt.RoundTrip(req)
})
})
rt, clientConfig := clientConfigWithRoundTripper(f)
wrappedKubeClient, err := kubernetes.NewForConfig(clientConfig)
framework.ExpectNoError(err)
@ -120,26 +116,64 @@ var _ = SIGDescribe("API Streaming (aka. WatchList)", framework.WithSerial(), fu
ginkgo.By("Verifying if the secret list was properly streamed")
streamedSecrets := secretList.Items
sort.Sort(byName(expectedSecrets))
gomega.Expect(cmp.Equal(expectedSecrets, streamedSecrets)).To(gomega.BeTrueBecause("data received via watchlist must match the added data"))
ginkgo.By("Verifying if expected requests were sent to the server")
expectedRequestMadeByKubeClient := []string{
// corresponds to a streaming request made by the kube client to stream the secrets
"allowWatchBookmarks=true&resourceVersionMatch=NotOlderThan&sendInitialEvents=true&watch=true",
expectedRequestMadeByKubeClient := getExpectedRequestMadeByClientFor(secretList.ResourceVersion)
gomega.Expect(rt.actualRequests).To(gomega.Equal(expectedRequestMadeByKubeClient))
})
ginkgo.It("should be requested by dynamic client's List method when WatchListClient is enabled", func(ctx context.Context) {
featuregatetesting.SetFeatureGateDuringTest(ginkgo.GinkgoTB(), utilfeature.DefaultFeatureGate, featuregate.Feature(clientfeatures.WatchListClient), true)
ginkgo.By(fmt.Sprintf("Adding 5 secrets to %s namespace", f.Namespace.Name))
var expectedSecrets []unstructured.Unstructured
for i := 1; i <= 5; i++ {
unstructuredSecret, err := runtime.DefaultUnstructuredConverter.ToUnstructured(newSecret(fmt.Sprintf("secret-%d", i)))
framework.ExpectNoError(err)
secret, err := f.DynamicClient.Resource(v1.SchemeGroupVersion.WithResource("secrets")).Namespace(f.Namespace.Name).Create(ctx, &unstructured.Unstructured{Object: unstructuredSecret}, metav1.CreateOptions{})
framework.ExpectNoError(err)
expectedSecrets = append(expectedSecrets, *secret)
}
if consistencydetector.IsDataConsistencyDetectionForWatchListEnabled() {
// corresponds to a standard list request made by the consistency detector build in into the kube client
expectedRequestMadeByKubeClient = append(expectedRequestMadeByKubeClient, fmt.Sprintf("resourceVersion=%s&resourceVersionMatch=Exact", secretList.ResourceVersion))
}
gomega.Expect(actualRequestsMadeByKubeClient).To(gomega.Equal(expectedRequestMadeByKubeClient))
rt, clientConfig := clientConfigWithRoundTripper(f)
wrappedDynamicClient, err := dynamic.NewForConfig(clientConfig)
framework.ExpectNoError(err)
ginkgo.By("Streaming secrets from the server")
secretList, err := wrappedDynamicClient.Resource(v1.SchemeGroupVersion.WithResource("secrets")).Namespace(f.Namespace.Name).List(ctx, metav1.ListOptions{})
framework.ExpectNoError(err)
ginkgo.By("Verifying if the secret list was properly streamed")
streamedSecrets := secretList.Items
gomega.Expect(cmp.Equal(expectedSecrets, streamedSecrets)).To(gomega.BeTrueBecause("data received via watchlist must match the added data"))
ginkgo.By("Verifying if expected requests were sent to the server")
expectedRequestMadeByDynamicClient := getExpectedRequestMadeByClientFor(secretList.GetResourceVersion())
gomega.Expect(rt.actualRequests).To(gomega.Equal(expectedRequestMadeByDynamicClient))
})
})
type roundTripFunc func(req *http.Request) (*http.Response, error)
type roundTripper struct {
actualRequests []string
delegate http.RoundTripper
}
func (f roundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
return f(req)
func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
r.actualRequests = append(r.actualRequests, req.URL.RawQuery)
return r.delegate.RoundTrip(req)
}
func (r *roundTripper) Wrap(delegate http.RoundTripper) http.RoundTripper {
r.delegate = delegate
return r
}
func clientConfigWithRoundTripper(f *framework.Framework) (*roundTripper, *rest.Config) {
clientConfig := f.ClientConfig()
rt := &roundTripper{}
clientConfig.Wrap(rt.Wrap)
return rt, clientConfig
}
func verifyStore(ctx context.Context, expectedSecrets []v1.Secret, store cache.Store) {
@ -157,6 +191,18 @@ func verifyStore(ctx context.Context, expectedSecrets []v1.Secret, store cache.S
framework.ExpectNoError(err)
}
func getExpectedRequestMadeByClientFor(rv string) []string {
expectedRequestMadeByClient := []string{
// corresponds to a streaming request made by the client to stream the secrets
"allowWatchBookmarks=true&resourceVersionMatch=NotOlderThan&sendInitialEvents=true&watch=true",
}
if consistencydetector.IsDataConsistencyDetectionForWatchListEnabled() {
// corresponds to a standard list request made by the consistency detector build in into the client
expectedRequestMadeByClient = append(expectedRequestMadeByClient, fmt.Sprintf("resourceVersion=%s&resourceVersionMatch=Exact", rv))
}
return expectedRequestMadeByClient
}
type byName []v1.Secret
func (a byName) Len() int { return len(a) }