diff --git a/dynamic/client_test.go b/dynamic/client_test.go index 35f6e85a..a729cdf6 100644 --- a/dynamic/client_test.go +++ b/dynamic/client_test.go @@ -19,6 +19,7 @@ package dynamic import ( "bytes" "context" + "encoding/base64" "fmt" "io" "net/http" @@ -183,7 +184,10 @@ func TestWatchList(t *testing.T) { {Type: watch.Bookmark, Object: func() runtime.Object { obj := getObject("gtest/vTest", "rTest", "item2") obj.SetResourceVersion("10") - obj.SetAnnotations(map[string]string{metav1.InitialEventsAnnotationKey: "true"}) + obj.SetAnnotations(map[string]string{ + metav1.InitialEventsAnnotationKey: "true", + metav1.InitialEventsListBlueprintAnnotationKey: base64.StdEncoding.EncodeToString(getJSON("vTest", "rTests", "")), + }) return obj }()}, }, @@ -195,9 +199,10 @@ func TestWatchList(t *testing.T) { }, expectedList: &unstructured.UnstructuredList{ Object: map[string]interface{}{ - "apiVersion": "", - "kind": "UnstructuredList", + "apiVersion": "vTest", + "kind": "rTests", "metadata": map[string]interface{}{ + "name": "", "resourceVersion": "10", }, }, @@ -215,7 +220,10 @@ func TestWatchList(t *testing.T) { {Type: watch.Bookmark, Object: func() runtime.Object { obj := getObject("gtest/vTest", "rTest", "item2") obj.SetResourceVersion("39") - obj.SetAnnotations(map[string]string{metav1.InitialEventsAnnotationKey: "true"}) + obj.SetAnnotations(map[string]string{ + metav1.InitialEventsAnnotationKey: "true", + metav1.InitialEventsListBlueprintAnnotationKey: base64.StdEncoding.EncodeToString(getJSON("vTest", "rTests", "")), + }) return obj }()}, }, @@ -227,9 +235,10 @@ func TestWatchList(t *testing.T) { }, expectedList: &unstructured.UnstructuredList{ Object: map[string]interface{}{ - "apiVersion": "", - "kind": "UnstructuredList", + "apiVersion": "vTest", + "kind": "rTests", "metadata": map[string]interface{}{ + "name": "", "resourceVersion": "39", }, }, diff --git a/rest/request.go b/rest/request.go index f0fc7d1e..775f3c1a 100644 --- a/rest/request.go +++ b/rest/request.go @@ -19,6 +19,7 @@ package rest import ( "bytes" "context" + "encoding/base64" "encoding/hex" "fmt" "io" @@ -701,6 +702,11 @@ func (b *throttledLogger) Infof(message string, args ...interface{}) { // Watch attempts to begin watching the requested location. // Returns a watch.Interface, or an error. func (r *Request) Watch(ctx context.Context) (watch.Interface, error) { + w, _, e := r.watchInternal(ctx) + return w, e +} + +func (r *Request) watchInternal(ctx context.Context) (watch.Interface, runtime.Decoder, error) { if r.body == nil { logBody(ctx, 2, "Request Body", r.bodyBytes) } @@ -708,7 +714,7 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) { // We specifically don't want to rate limit watches, so we // don't use r.rateLimiter here. if r.err != nil { - return nil, r.err + return nil, nil, r.err } client := r.c.Client @@ -728,12 +734,12 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) { url := r.URL().String() for { if err := retry.Before(ctx, r); err != nil { - return nil, retry.WrapPreviousError(err) + return nil, nil, retry.WrapPreviousError(err) } req, err := r.newHTTPRequest(ctx) if err != nil { - return nil, err + return nil, nil, err } resp, err := client.Do(req) @@ -761,14 +767,14 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) { }() if done { if isErrRetryableFunc(req, err) { - return watch.NewEmptyWatch(), nil + return watch.NewEmptyWatch(), nil, nil } if err == nil { // if the server sent us an HTTP Response object, // we need to return the error object from that. err = transformErr } - return nil, retry.WrapPreviousError(err) + return nil, nil, retry.WrapPreviousError(err) } } } @@ -786,22 +792,35 @@ type WatchListResult struct { // the end of the stream. initialEventsEndBookmarkRV string - // gv represents the API version - // it is used to construct the final list response - // normally this information is filled by the server - gv schema.GroupVersion + // negotiatedObjectDecoder knows how to decode + // the initialEventsListBlueprint + negotiatedObjectDecoder runtime.Decoder + + // base64EncodedInitialEventsListBlueprint contains an empty, + // versioned list encoded in the requested format + // (e.g., protobuf, JSON, CBOR) and stored as a base64-encoded string + base64EncodedInitialEventsListBlueprint string } +// Into stores the result into obj. The passed obj parameter must be a pointer to a list type. +// +// Note: +// +// Special attention should be given to the type *unstructured.Unstructured, +// which represents a list type but does not have an "Items" field. +// Users who directly use RESTClient may store the response in such an object. +// This particular case is not handled by the current implementation of this function, +// but may be considered for future updates. func (r WatchListResult) Into(obj runtime.Object) error { if r.err != nil { return r.err } - listPtr, err := meta.GetItemsPtr(obj) + listItemsPtr, err := meta.GetItemsPtr(obj) if err != nil { return err } - listVal, err := conversion.EnforcePtr(listPtr) + listVal, err := conversion.EnforcePtr(listItemsPtr) if err != nil { return err } @@ -809,6 +828,16 @@ func (r WatchListResult) Into(obj runtime.Object) error { return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind()) } + encodedInitialEventsListBlueprint, err := base64.StdEncoding.DecodeString(r.base64EncodedInitialEventsListBlueprint) + if err != nil { + return fmt.Errorf("failed to decode the received blueprint list, err %w", err) + } + + err = runtime.DecodeInto(r.negotiatedObjectDecoder, encodedInitialEventsListBlueprint, obj) + if err != nil { + return err + } + if len(r.items) == 0 { listVal.Set(reflect.MakeSlice(listVal.Type(), 0, 0)) } else { @@ -826,15 +855,6 @@ func (r WatchListResult) Into(obj runtime.Object) error { return err } listMeta.SetResourceVersion(r.initialEventsEndBookmarkRV) - - typeMeta, err := meta.TypeAccessor(obj) - if err != nil { - return err - } - version := r.gv.String() - typeMeta.SetAPIVersion(version) - typeMeta.SetKind(reflect.TypeOf(obj).Elem().Name()) - return nil } @@ -857,16 +877,16 @@ func (r *Request) WatchList(ctx context.Context) WatchListResult { // Most users use the generated client, which handles the proper setting of parameters. // We don't have validation for other methods (e.g., the Watch) // thus, for symmetry, we haven't added additional checks for the WatchList method. - w, err := r.Watch(ctx) + w, d, err := r.watchInternal(ctx) if err != nil { return WatchListResult{err: err} } - return r.handleWatchList(ctx, w) + return r.handleWatchList(ctx, w, d) } // handleWatchList holds the actual logic for easier unit testing. // Note that this function will close the passed watch. -func (r *Request) handleWatchList(ctx context.Context, w watch.Interface) WatchListResult { +func (r *Request) handleWatchList(ctx context.Context, w watch.Interface, negotiatedObjectDecoder runtime.Decoder) WatchListResult { defer w.Stop() var lastKey string var items []runtime.Object @@ -900,10 +920,15 @@ func (r *Request) handleWatchList(ctx context.Context, w watch.Interface) WatchL lastKey = key case watch.Bookmark: if meta.GetAnnotations()[metav1.InitialEventsAnnotationKey] == "true" { + base64EncodedInitialEventsListBlueprint := meta.GetAnnotations()[metav1.InitialEventsListBlueprintAnnotationKey] + if len(base64EncodedInitialEventsListBlueprint) == 0 { + return WatchListResult{err: fmt.Errorf("%q annotation is missing content", metav1.InitialEventsListBlueprintAnnotationKey)} + } return WatchListResult{ - items: items, - initialEventsEndBookmarkRV: meta.GetResourceVersion(), - gv: r.c.content.GroupVersion, + items: items, + initialEventsEndBookmarkRV: meta.GetResourceVersion(), + negotiatedObjectDecoder: negotiatedObjectDecoder, + base64EncodedInitialEventsListBlueprint: base64EncodedInitialEventsListBlueprint, } } default: @@ -913,7 +938,7 @@ func (r *Request) handleWatchList(ctx context.Context, w watch.Interface) WatchL } } -func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error) { +func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, runtime.Decoder, error) { contentType := resp.Header.Get("Content-Type") mediaType, params, err := mime.ParseMediaType(contentType) if err != nil { @@ -921,7 +946,7 @@ func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error) } objectDecoder, streamingSerializer, framer, err := r.c.content.Negotiator.StreamDecoder(mediaType, params) if err != nil { - return nil, err + return nil, nil, err } handleWarnings(resp.Header, r.warningHandler) @@ -934,7 +959,7 @@ func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error) // use 500 to indicate that the cause of the error is unknown - other error codes // are more specific to HTTP interactions, and set a reason errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"), - ), nil + ), objectDecoder, nil } // updateRequestResultMetric increments the RequestResult metric counter, diff --git a/rest/request_watchlist_test.go b/rest/request_watchlist_test.go index badd871f..2fff568f 100644 --- a/rest/request_watchlist_test.go +++ b/rest/request_watchlist_test.go @@ -17,7 +17,9 @@ limitations under the License. package rest import ( + "bytes" "context" + "encoding/base64" "fmt" "regexp" "testing" @@ -30,9 +32,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + runtimejson "k8s.io/apimachinery/pkg/runtime/serializer/json" "k8s.io/apimachinery/pkg/watch" clientfeatures "k8s.io/client-go/features" clientfeaturestesting "k8s.io/client-go/features/testing" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" ) func TestWatchListResult(t *testing.T) { @@ -60,41 +64,34 @@ func TestWatchListResult(t *testing.T) { expectedErr: fmt.Errorf("*v1.Pod is not a list: no Items field in this object"), }, { - name: "an err is always returned", - result: nil, - target: WatchListResult{err: fmt.Errorf("dummy err")}, - expectedErr: fmt.Errorf("dummy err"), + name: "invalid base64EncodedInitialEventsListBlueprint", + result: &v1.PodList{}, + target: WatchListResult{ + base64EncodedInitialEventsListBlueprint: "invalid", + negotiatedObjectDecoder: newJSONSerializer(), + }, + expectedErr: fmt.Errorf("failed to decode the received blueprint list, err illegal base64 data at input byte 4"), }, { name: "empty list", result: &v1.PodList{}, + target: WatchListResult{ + base64EncodedInitialEventsListBlueprint: encodeObjectToBase64String(makeEmptyPodList(), t), + negotiatedObjectDecoder: newJSONSerializer(), + }, expectedResult: &v1.PodList{ TypeMeta: metav1.TypeMeta{Kind: "PodList"}, Items: []v1.Pod{}, }, }, - { - name: "gv is applied", - result: &v1.PodList{}, - target: WatchListResult{gv: schema.GroupVersion{Group: "g", Version: "v"}}, - expectedResult: &v1.PodList{ - TypeMeta: metav1.TypeMeta{Kind: "PodList", APIVersion: "g/v"}, - Items: []v1.Pod{}, - }, - }, - { - name: "gv is applied, empty group", - result: &v1.PodList{}, - target: WatchListResult{gv: schema.GroupVersion{Version: "v"}}, - expectedResult: &v1.PodList{ - TypeMeta: metav1.TypeMeta{Kind: "PodList", APIVersion: "v"}, - Items: []v1.Pod{}, - }, - }, { name: "rv is applied", result: &v1.PodList{}, - target: WatchListResult{initialEventsEndBookmarkRV: "100"}, + target: WatchListResult{ + initialEventsEndBookmarkRV: "100", + base64EncodedInitialEventsListBlueprint: encodeObjectToBase64String(makeEmptyPodList(), t), + negotiatedObjectDecoder: newJSONSerializer(), + }, expectedResult: &v1.PodList{ TypeMeta: metav1.TypeMeta{Kind: "PodList"}, ListMeta: metav1.ListMeta{ResourceVersion: "100"}, @@ -104,18 +101,36 @@ func TestWatchListResult(t *testing.T) { { name: "items are applied", result: &v1.PodList{}, - target: WatchListResult{items: []runtime.Object{makePod(1), makePod(2)}}, + target: WatchListResult{ + items: []runtime.Object{makePod(1), makePod(2)}, + base64EncodedInitialEventsListBlueprint: encodeObjectToBase64String(makeEmptyPodList(), t), + negotiatedObjectDecoder: newJSONSerializer(), + }, expectedResult: &v1.PodList{ TypeMeta: metav1.TypeMeta{Kind: "PodList"}, Items: []v1.Pod{*makePod(1), *makePod(2)}, }, }, { - name: "type mismatch", - result: &v1.PodList{}, - target: WatchListResult{items: []runtime.Object{makeNamespace("1")}}, + name: "list's object type mismatch", + result: &v1.PodList{}, + target: WatchListResult{ + items: []runtime.Object{makeNamespace("1")}, + base64EncodedInitialEventsListBlueprint: encodeObjectToBase64String(makeEmptyPodList(), t), + negotiatedObjectDecoder: newJSONSerializer(), + }, expectedErr: fmt.Errorf("received object type = v1.Namespace at index = 0, doesn't match the list item type = v1.Pod"), }, + { + name: "list type mismatch", + result: &v1.SecretList{}, + target: WatchListResult{ + items: []runtime.Object{makePod(1), makePod(2)}, + base64EncodedInitialEventsListBlueprint: encodeObjectToBase64String(makeEmptyPodList(), t), + negotiatedObjectDecoder: newJSONSerializer(), + }, + expectedErr: fmt.Errorf("unable to decode /v1, Kind=PodList into *v1.SecretList"), + }, } for _, scenario := range scenarios { t.Run(scenario.name, func(t *testing.T) { @@ -141,25 +156,23 @@ func TestWatchListResult(t *testing.T) { func TestWatchListSuccess(t *testing.T) { scenarios := []struct { - name string - gv schema.GroupVersion - watchEvents []watch.Event + name string + watchEvents []watch.Event + negotiatedObjectDecoder runtime.Serializer + expectedResult *v1.PodList }{ { - name: "happy path", - // Note that the APIVersion for the core API group is "v1" (not "core/v1"). - // We fake "core/v1" here to test if the Group part is properly - // recognized and set on the resulting object. - gv: schema.GroupVersion{Group: "core", Version: "v1"}, + name: "happy path", + negotiatedObjectDecoder: newJSONSerializer(), watchEvents: []watch.Event{ {Type: watch.Added, Object: makePod(1)}, {Type: watch.Added, Object: makePod(2)}, - {Type: watch.Bookmark, Object: makeBookmarkEvent(5)}, + {Type: watch.Bookmark, Object: makeBookmarkEvent(5, t)}, }, expectedResult: &v1.PodList{ TypeMeta: metav1.TypeMeta{ - APIVersion: "core/v1", + APIVersion: "", Kind: "PodList", }, ListMeta: metav1.ListMeta{ResourceVersion: "5"}, @@ -167,30 +180,14 @@ func TestWatchListSuccess(t *testing.T) { }, }, { - name: "APIVersion with only version provided is properly set", - gv: schema.GroupVersion{Version: "v1"}, + name: "only the bookmark", + negotiatedObjectDecoder: newJSONSerializer(), watchEvents: []watch.Event{ - {Type: watch.Added, Object: makePod(1)}, - {Type: watch.Bookmark, Object: makeBookmarkEvent(5)}, + {Type: watch.Bookmark, Object: makeBookmarkEvent(5, t)}, }, expectedResult: &v1.PodList{ TypeMeta: metav1.TypeMeta{ - APIVersion: "v1", - Kind: "PodList", - }, - ListMeta: metav1.ListMeta{ResourceVersion: "5"}, - Items: []v1.Pod{*makePod(1)}, - }, - }, - { - name: "only the bookmark", - gv: schema.GroupVersion{Version: "v1"}, - watchEvents: []watch.Event{ - {Type: watch.Bookmark, Object: makeBookmarkEvent(5)}, - }, - expectedResult: &v1.PodList{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "v1", + APIVersion: "", Kind: "PodList", }, ListMeta: metav1.ListMeta{ResourceVersion: "5"}, @@ -204,9 +201,7 @@ func TestWatchListSuccess(t *testing.T) { fakeWatcher := watch.NewFake() target := &Request{ c: &RESTClient{ - content: ClientContentConfig{ - GroupVersion: scenario.gv, - }, + content: ClientContentConfig{}, }, } @@ -216,7 +211,7 @@ func TestWatchListSuccess(t *testing.T) { } }(scenario.watchEvents) - res := target.handleWatchList(ctx, fakeWatcher) + res := target.handleWatchList(ctx, fakeWatcher, scenario.negotiatedObjectDecoder) if res.err != nil { t.Fatal(res.err) } @@ -303,7 +298,7 @@ func TestWatchListFailure(t *testing.T) { } }(scenario.watcher, scenario.watchEvents) - res := target.handleWatchList(scenario.ctx, scenario.watcher) + res := target.handleWatchList(scenario.ctx, scenario.watcher, nil /*TODO*/) resErr := res.Into(nil) if resErr == nil { t.Fatal("expected to get an error, got nil") @@ -338,6 +333,13 @@ func TestWatchListWhenFeatureGateDisabled(t *testing.T) { } } +func makeEmptyPodList() *v1.PodList { + return &v1.PodList{ + TypeMeta: metav1.TypeMeta{Kind: "PodList"}, + Items: []v1.Pod{}, + } +} + func makePod(rv uint64) *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -353,11 +355,14 @@ func makeNamespace(name string) *v1.Namespace { return &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: name}} } -func makeBookmarkEvent(rv uint64) *v1.Pod { +func makeBookmarkEvent(rv uint64, t *testing.T) *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ ResourceVersion: fmt.Sprintf("%d", rv), - Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"}, + Annotations: map[string]string{ + metav1.InitialEventsAnnotationKey: "true", + metav1.InitialEventsListBlueprintAnnotationKey: encodeObjectToBase64String(makeEmptyPodList(), t), + }, }, } } @@ -372,3 +377,23 @@ func (f fakeObj) GetObjectKind() schema.ObjectKind { func (f fakeObj) DeepCopyObject() runtime.Object { return fakeObj{} } + +func newJSONSerializer() runtime.Serializer { + return runtimejson.NewSerializerWithOptions( + runtimejson.DefaultMetaFactory, + clientgoscheme.Scheme, + clientgoscheme.Scheme, + runtimejson.SerializerOptions{}, + ) +} + +func encodeObjectToBase64String(obj runtime.Object, t *testing.T) string { + e := newJSONSerializer() + + var buf bytes.Buffer + err := e.Encode(obj, &buf) + if err != nil { + t.Fatal(err) + } + return base64.StdEncoding.EncodeToString(buf.Bytes()) +}