Merge pull request #127812 from p0lyn0mial/upstream-decode-list-blueprint

client-go/rest/request: decodes initialEventsListBlueprint for watchlist requests
This commit is contained in:
Kubernetes Prow Robot 2024-10-14 13:10:21 +01:00 committed by GitHub
commit a454563a8d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 159 additions and 101 deletions

View File

@ -19,6 +19,7 @@ package dynamic
import ( import (
"bytes" "bytes"
"context" "context"
"encoding/base64"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
@ -183,7 +184,10 @@ func TestWatchList(t *testing.T) {
{Type: watch.Bookmark, Object: func() runtime.Object { {Type: watch.Bookmark, Object: func() runtime.Object {
obj := getObject("gtest/vTest", "rTest", "item2") obj := getObject("gtest/vTest", "rTest", "item2")
obj.SetResourceVersion("10") obj.SetResourceVersion("10")
obj.SetAnnotations(map[string]string{metav1.InitialEventsAnnotationKey: "true"}) obj.SetAnnotations(map[string]string{
metav1.InitialEventsAnnotationKey: "true",
metav1.InitialEventsListBlueprintAnnotationKey: base64.StdEncoding.EncodeToString(getJSON("vTest", "rTests", "")),
})
return obj return obj
}()}, }()},
}, },
@ -195,9 +199,10 @@ func TestWatchList(t *testing.T) {
}, },
expectedList: &unstructured.UnstructuredList{ expectedList: &unstructured.UnstructuredList{
Object: map[string]interface{}{ Object: map[string]interface{}{
"apiVersion": "", "apiVersion": "vTest",
"kind": "UnstructuredList", "kind": "rTests",
"metadata": map[string]interface{}{ "metadata": map[string]interface{}{
"name": "",
"resourceVersion": "10", "resourceVersion": "10",
}, },
}, },
@ -215,7 +220,10 @@ func TestWatchList(t *testing.T) {
{Type: watch.Bookmark, Object: func() runtime.Object { {Type: watch.Bookmark, Object: func() runtime.Object {
obj := getObject("gtest/vTest", "rTest", "item2") obj := getObject("gtest/vTest", "rTest", "item2")
obj.SetResourceVersion("39") obj.SetResourceVersion("39")
obj.SetAnnotations(map[string]string{metav1.InitialEventsAnnotationKey: "true"}) obj.SetAnnotations(map[string]string{
metav1.InitialEventsAnnotationKey: "true",
metav1.InitialEventsListBlueprintAnnotationKey: base64.StdEncoding.EncodeToString(getJSON("vTest", "rTests", "")),
})
return obj return obj
}()}, }()},
}, },
@ -227,9 +235,10 @@ func TestWatchList(t *testing.T) {
}, },
expectedList: &unstructured.UnstructuredList{ expectedList: &unstructured.UnstructuredList{
Object: map[string]interface{}{ Object: map[string]interface{}{
"apiVersion": "", "apiVersion": "vTest",
"kind": "UnstructuredList", "kind": "rTests",
"metadata": map[string]interface{}{ "metadata": map[string]interface{}{
"name": "",
"resourceVersion": "39", "resourceVersion": "39",
}, },
}, },

View File

@ -19,6 +19,7 @@ package rest
import ( import (
"bytes" "bytes"
"context" "context"
"encoding/base64"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"io" "io"
@ -701,6 +702,11 @@ func (b *throttledLogger) Infof(message string, args ...interface{}) {
// Watch attempts to begin watching the requested location. // Watch attempts to begin watching the requested location.
// Returns a watch.Interface, or an error. // Returns a watch.Interface, or an error.
func (r *Request) Watch(ctx context.Context) (watch.Interface, error) { func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
w, _, e := r.watchInternal(ctx)
return w, e
}
func (r *Request) watchInternal(ctx context.Context) (watch.Interface, runtime.Decoder, error) {
if r.body == nil { if r.body == nil {
logBody(ctx, 2, "Request Body", r.bodyBytes) logBody(ctx, 2, "Request Body", r.bodyBytes)
} }
@ -708,7 +714,7 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
// We specifically don't want to rate limit watches, so we // We specifically don't want to rate limit watches, so we
// don't use r.rateLimiter here. // don't use r.rateLimiter here.
if r.err != nil { if r.err != nil {
return nil, r.err return nil, nil, r.err
} }
client := r.c.Client client := r.c.Client
@ -728,12 +734,12 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
url := r.URL().String() url := r.URL().String()
for { for {
if err := retry.Before(ctx, r); err != nil { if err := retry.Before(ctx, r); err != nil {
return nil, retry.WrapPreviousError(err) return nil, nil, retry.WrapPreviousError(err)
} }
req, err := r.newHTTPRequest(ctx) req, err := r.newHTTPRequest(ctx)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
resp, err := client.Do(req) resp, err := client.Do(req)
@ -761,14 +767,14 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
}() }()
if done { if done {
if isErrRetryableFunc(req, err) { if isErrRetryableFunc(req, err) {
return watch.NewEmptyWatch(), nil return watch.NewEmptyWatch(), nil, nil
} }
if err == nil { if err == nil {
// if the server sent us an HTTP Response object, // if the server sent us an HTTP Response object,
// we need to return the error object from that. // we need to return the error object from that.
err = transformErr err = transformErr
} }
return nil, retry.WrapPreviousError(err) return nil, nil, retry.WrapPreviousError(err)
} }
} }
} }
@ -786,22 +792,35 @@ type WatchListResult struct {
// the end of the stream. // the end of the stream.
initialEventsEndBookmarkRV string initialEventsEndBookmarkRV string
// gv represents the API version // negotiatedObjectDecoder knows how to decode
// it is used to construct the final list response // the initialEventsListBlueprint
// normally this information is filled by the server negotiatedObjectDecoder runtime.Decoder
gv schema.GroupVersion
// base64EncodedInitialEventsListBlueprint contains an empty,
// versioned list encoded in the requested format
// (e.g., protobuf, JSON, CBOR) and stored as a base64-encoded string
base64EncodedInitialEventsListBlueprint string
} }
// Into stores the result into obj. The passed obj parameter must be a pointer to a list type.
//
// Note:
//
// Special attention should be given to the type *unstructured.Unstructured,
// which represents a list type but does not have an "Items" field.
// Users who directly use RESTClient may store the response in such an object.
// This particular case is not handled by the current implementation of this function,
// but may be considered for future updates.
func (r WatchListResult) Into(obj runtime.Object) error { func (r WatchListResult) Into(obj runtime.Object) error {
if r.err != nil { if r.err != nil {
return r.err return r.err
} }
listPtr, err := meta.GetItemsPtr(obj) listItemsPtr, err := meta.GetItemsPtr(obj)
if err != nil { if err != nil {
return err return err
} }
listVal, err := conversion.EnforcePtr(listPtr) listVal, err := conversion.EnforcePtr(listItemsPtr)
if err != nil { if err != nil {
return err return err
} }
@ -809,6 +828,16 @@ func (r WatchListResult) Into(obj runtime.Object) error {
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind()) return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
} }
encodedInitialEventsListBlueprint, err := base64.StdEncoding.DecodeString(r.base64EncodedInitialEventsListBlueprint)
if err != nil {
return fmt.Errorf("failed to decode the received blueprint list, err %w", err)
}
err = runtime.DecodeInto(r.negotiatedObjectDecoder, encodedInitialEventsListBlueprint, obj)
if err != nil {
return err
}
if len(r.items) == 0 { if len(r.items) == 0 {
listVal.Set(reflect.MakeSlice(listVal.Type(), 0, 0)) listVal.Set(reflect.MakeSlice(listVal.Type(), 0, 0))
} else { } else {
@ -826,15 +855,6 @@ func (r WatchListResult) Into(obj runtime.Object) error {
return err return err
} }
listMeta.SetResourceVersion(r.initialEventsEndBookmarkRV) listMeta.SetResourceVersion(r.initialEventsEndBookmarkRV)
typeMeta, err := meta.TypeAccessor(obj)
if err != nil {
return err
}
version := r.gv.String()
typeMeta.SetAPIVersion(version)
typeMeta.SetKind(reflect.TypeOf(obj).Elem().Name())
return nil return nil
} }
@ -857,16 +877,16 @@ func (r *Request) WatchList(ctx context.Context) WatchListResult {
// Most users use the generated client, which handles the proper setting of parameters. // Most users use the generated client, which handles the proper setting of parameters.
// We don't have validation for other methods (e.g., the Watch) // We don't have validation for other methods (e.g., the Watch)
// thus, for symmetry, we haven't added additional checks for the WatchList method. // thus, for symmetry, we haven't added additional checks for the WatchList method.
w, err := r.Watch(ctx) w, d, err := r.watchInternal(ctx)
if err != nil { if err != nil {
return WatchListResult{err: err} return WatchListResult{err: err}
} }
return r.handleWatchList(ctx, w) return r.handleWatchList(ctx, w, d)
} }
// handleWatchList holds the actual logic for easier unit testing. // handleWatchList holds the actual logic for easier unit testing.
// Note that this function will close the passed watch. // Note that this function will close the passed watch.
func (r *Request) handleWatchList(ctx context.Context, w watch.Interface) WatchListResult { func (r *Request) handleWatchList(ctx context.Context, w watch.Interface, negotiatedObjectDecoder runtime.Decoder) WatchListResult {
defer w.Stop() defer w.Stop()
var lastKey string var lastKey string
var items []runtime.Object var items []runtime.Object
@ -900,10 +920,15 @@ func (r *Request) handleWatchList(ctx context.Context, w watch.Interface) WatchL
lastKey = key lastKey = key
case watch.Bookmark: case watch.Bookmark:
if meta.GetAnnotations()[metav1.InitialEventsAnnotationKey] == "true" { if meta.GetAnnotations()[metav1.InitialEventsAnnotationKey] == "true" {
base64EncodedInitialEventsListBlueprint := meta.GetAnnotations()[metav1.InitialEventsListBlueprintAnnotationKey]
if len(base64EncodedInitialEventsListBlueprint) == 0 {
return WatchListResult{err: fmt.Errorf("%q annotation is missing content", metav1.InitialEventsListBlueprintAnnotationKey)}
}
return WatchListResult{ return WatchListResult{
items: items, items: items,
initialEventsEndBookmarkRV: meta.GetResourceVersion(), initialEventsEndBookmarkRV: meta.GetResourceVersion(),
gv: r.c.content.GroupVersion, negotiatedObjectDecoder: negotiatedObjectDecoder,
base64EncodedInitialEventsListBlueprint: base64EncodedInitialEventsListBlueprint,
} }
} }
default: default:
@ -913,7 +938,7 @@ func (r *Request) handleWatchList(ctx context.Context, w watch.Interface) WatchL
} }
} }
func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error) { func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, runtime.Decoder, error) {
contentType := resp.Header.Get("Content-Type") contentType := resp.Header.Get("Content-Type")
mediaType, params, err := mime.ParseMediaType(contentType) mediaType, params, err := mime.ParseMediaType(contentType)
if err != nil { if err != nil {
@ -921,7 +946,7 @@ func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error)
} }
objectDecoder, streamingSerializer, framer, err := r.c.content.Negotiator.StreamDecoder(mediaType, params) objectDecoder, streamingSerializer, framer, err := r.c.content.Negotiator.StreamDecoder(mediaType, params)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
handleWarnings(resp.Header, r.warningHandler) handleWarnings(resp.Header, r.warningHandler)
@ -934,7 +959,7 @@ func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error)
// use 500 to indicate that the cause of the error is unknown - other error codes // use 500 to indicate that the cause of the error is unknown - other error codes
// are more specific to HTTP interactions, and set a reason // are more specific to HTTP interactions, and set a reason
errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"), errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"),
), nil ), objectDecoder, nil
} }
// updateRequestResultMetric increments the RequestResult metric counter, // updateRequestResultMetric increments the RequestResult metric counter,

View File

@ -17,7 +17,9 @@ limitations under the License.
package rest package rest
import ( import (
"bytes"
"context" "context"
"encoding/base64"
"fmt" "fmt"
"regexp" "regexp"
"testing" "testing"
@ -30,9 +32,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
runtimejson "k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
clientfeatures "k8s.io/client-go/features" clientfeatures "k8s.io/client-go/features"
clientfeaturestesting "k8s.io/client-go/features/testing" clientfeaturestesting "k8s.io/client-go/features/testing"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
) )
func TestWatchListResult(t *testing.T) { func TestWatchListResult(t *testing.T) {
@ -60,41 +64,34 @@ func TestWatchListResult(t *testing.T) {
expectedErr: fmt.Errorf("*v1.Pod is not a list: no Items field in this object"), expectedErr: fmt.Errorf("*v1.Pod is not a list: no Items field in this object"),
}, },
{ {
name: "an err is always returned", name: "invalid base64EncodedInitialEventsListBlueprint",
result: nil, result: &v1.PodList{},
target: WatchListResult{err: fmt.Errorf("dummy err")}, target: WatchListResult{
expectedErr: fmt.Errorf("dummy err"), base64EncodedInitialEventsListBlueprint: "invalid",
negotiatedObjectDecoder: newJSONSerializer(),
},
expectedErr: fmt.Errorf("failed to decode the received blueprint list, err illegal base64 data at input byte 4"),
}, },
{ {
name: "empty list", name: "empty list",
result: &v1.PodList{}, result: &v1.PodList{},
target: WatchListResult{
base64EncodedInitialEventsListBlueprint: encodeObjectToBase64String(makeEmptyPodList(), t),
negotiatedObjectDecoder: newJSONSerializer(),
},
expectedResult: &v1.PodList{ expectedResult: &v1.PodList{
TypeMeta: metav1.TypeMeta{Kind: "PodList"}, TypeMeta: metav1.TypeMeta{Kind: "PodList"},
Items: []v1.Pod{}, Items: []v1.Pod{},
}, },
}, },
{
name: "gv is applied",
result: &v1.PodList{},
target: WatchListResult{gv: schema.GroupVersion{Group: "g", Version: "v"}},
expectedResult: &v1.PodList{
TypeMeta: metav1.TypeMeta{Kind: "PodList", APIVersion: "g/v"},
Items: []v1.Pod{},
},
},
{
name: "gv is applied, empty group",
result: &v1.PodList{},
target: WatchListResult{gv: schema.GroupVersion{Version: "v"}},
expectedResult: &v1.PodList{
TypeMeta: metav1.TypeMeta{Kind: "PodList", APIVersion: "v"},
Items: []v1.Pod{},
},
},
{ {
name: "rv is applied", name: "rv is applied",
result: &v1.PodList{}, result: &v1.PodList{},
target: WatchListResult{initialEventsEndBookmarkRV: "100"}, target: WatchListResult{
initialEventsEndBookmarkRV: "100",
base64EncodedInitialEventsListBlueprint: encodeObjectToBase64String(makeEmptyPodList(), t),
negotiatedObjectDecoder: newJSONSerializer(),
},
expectedResult: &v1.PodList{ expectedResult: &v1.PodList{
TypeMeta: metav1.TypeMeta{Kind: "PodList"}, TypeMeta: metav1.TypeMeta{Kind: "PodList"},
ListMeta: metav1.ListMeta{ResourceVersion: "100"}, ListMeta: metav1.ListMeta{ResourceVersion: "100"},
@ -104,18 +101,36 @@ func TestWatchListResult(t *testing.T) {
{ {
name: "items are applied", name: "items are applied",
result: &v1.PodList{}, result: &v1.PodList{},
target: WatchListResult{items: []runtime.Object{makePod(1), makePod(2)}}, target: WatchListResult{
items: []runtime.Object{makePod(1), makePod(2)},
base64EncodedInitialEventsListBlueprint: encodeObjectToBase64String(makeEmptyPodList(), t),
negotiatedObjectDecoder: newJSONSerializer(),
},
expectedResult: &v1.PodList{ expectedResult: &v1.PodList{
TypeMeta: metav1.TypeMeta{Kind: "PodList"}, TypeMeta: metav1.TypeMeta{Kind: "PodList"},
Items: []v1.Pod{*makePod(1), *makePod(2)}, Items: []v1.Pod{*makePod(1), *makePod(2)},
}, },
}, },
{ {
name: "type mismatch", name: "list's object type mismatch",
result: &v1.PodList{}, result: &v1.PodList{},
target: WatchListResult{items: []runtime.Object{makeNamespace("1")}}, target: WatchListResult{
items: []runtime.Object{makeNamespace("1")},
base64EncodedInitialEventsListBlueprint: encodeObjectToBase64String(makeEmptyPodList(), t),
negotiatedObjectDecoder: newJSONSerializer(),
},
expectedErr: fmt.Errorf("received object type = v1.Namespace at index = 0, doesn't match the list item type = v1.Pod"), expectedErr: fmt.Errorf("received object type = v1.Namespace at index = 0, doesn't match the list item type = v1.Pod"),
}, },
{
name: "list type mismatch",
result: &v1.SecretList{},
target: WatchListResult{
items: []runtime.Object{makePod(1), makePod(2)},
base64EncodedInitialEventsListBlueprint: encodeObjectToBase64String(makeEmptyPodList(), t),
negotiatedObjectDecoder: newJSONSerializer(),
},
expectedErr: fmt.Errorf("unable to decode /v1, Kind=PodList into *v1.SecretList"),
},
} }
for _, scenario := range scenarios { for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) { t.Run(scenario.name, func(t *testing.T) {
@ -141,25 +156,23 @@ func TestWatchListResult(t *testing.T) {
func TestWatchListSuccess(t *testing.T) { func TestWatchListSuccess(t *testing.T) {
scenarios := []struct { scenarios := []struct {
name string name string
gv schema.GroupVersion watchEvents []watch.Event
watchEvents []watch.Event negotiatedObjectDecoder runtime.Serializer
expectedResult *v1.PodList expectedResult *v1.PodList
}{ }{
{ {
name: "happy path", name: "happy path",
// Note that the APIVersion for the core API group is "v1" (not "core/v1"). negotiatedObjectDecoder: newJSONSerializer(),
// We fake "core/v1" here to test if the Group part is properly
// recognized and set on the resulting object.
gv: schema.GroupVersion{Group: "core", Version: "v1"},
watchEvents: []watch.Event{ watchEvents: []watch.Event{
{Type: watch.Added, Object: makePod(1)}, {Type: watch.Added, Object: makePod(1)},
{Type: watch.Added, Object: makePod(2)}, {Type: watch.Added, Object: makePod(2)},
{Type: watch.Bookmark, Object: makeBookmarkEvent(5)}, {Type: watch.Bookmark, Object: makeBookmarkEvent(5, t)},
}, },
expectedResult: &v1.PodList{ expectedResult: &v1.PodList{
TypeMeta: metav1.TypeMeta{ TypeMeta: metav1.TypeMeta{
APIVersion: "core/v1", APIVersion: "",
Kind: "PodList", Kind: "PodList",
}, },
ListMeta: metav1.ListMeta{ResourceVersion: "5"}, ListMeta: metav1.ListMeta{ResourceVersion: "5"},
@ -167,30 +180,14 @@ func TestWatchListSuccess(t *testing.T) {
}, },
}, },
{ {
name: "APIVersion with only version provided is properly set", name: "only the bookmark",
gv: schema.GroupVersion{Version: "v1"}, negotiatedObjectDecoder: newJSONSerializer(),
watchEvents: []watch.Event{ watchEvents: []watch.Event{
{Type: watch.Added, Object: makePod(1)}, {Type: watch.Bookmark, Object: makeBookmarkEvent(5, t)},
{Type: watch.Bookmark, Object: makeBookmarkEvent(5)},
}, },
expectedResult: &v1.PodList{ expectedResult: &v1.PodList{
TypeMeta: metav1.TypeMeta{ TypeMeta: metav1.TypeMeta{
APIVersion: "v1", APIVersion: "",
Kind: "PodList",
},
ListMeta: metav1.ListMeta{ResourceVersion: "5"},
Items: []v1.Pod{*makePod(1)},
},
},
{
name: "only the bookmark",
gv: schema.GroupVersion{Version: "v1"},
watchEvents: []watch.Event{
{Type: watch.Bookmark, Object: makeBookmarkEvent(5)},
},
expectedResult: &v1.PodList{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "PodList", Kind: "PodList",
}, },
ListMeta: metav1.ListMeta{ResourceVersion: "5"}, ListMeta: metav1.ListMeta{ResourceVersion: "5"},
@ -204,9 +201,7 @@ func TestWatchListSuccess(t *testing.T) {
fakeWatcher := watch.NewFake() fakeWatcher := watch.NewFake()
target := &Request{ target := &Request{
c: &RESTClient{ c: &RESTClient{
content: ClientContentConfig{ content: ClientContentConfig{},
GroupVersion: scenario.gv,
},
}, },
} }
@ -216,7 +211,7 @@ func TestWatchListSuccess(t *testing.T) {
} }
}(scenario.watchEvents) }(scenario.watchEvents)
res := target.handleWatchList(ctx, fakeWatcher) res := target.handleWatchList(ctx, fakeWatcher, scenario.negotiatedObjectDecoder)
if res.err != nil { if res.err != nil {
t.Fatal(res.err) t.Fatal(res.err)
} }
@ -303,7 +298,7 @@ func TestWatchListFailure(t *testing.T) {
} }
}(scenario.watcher, scenario.watchEvents) }(scenario.watcher, scenario.watchEvents)
res := target.handleWatchList(scenario.ctx, scenario.watcher) res := target.handleWatchList(scenario.ctx, scenario.watcher, nil /*TODO*/)
resErr := res.Into(nil) resErr := res.Into(nil)
if resErr == nil { if resErr == nil {
t.Fatal("expected to get an error, got nil") t.Fatal("expected to get an error, got nil")
@ -338,6 +333,13 @@ func TestWatchListWhenFeatureGateDisabled(t *testing.T) {
} }
} }
func makeEmptyPodList() *v1.PodList {
return &v1.PodList{
TypeMeta: metav1.TypeMeta{Kind: "PodList"},
Items: []v1.Pod{},
}
}
func makePod(rv uint64) *v1.Pod { func makePod(rv uint64) *v1.Pod {
return &v1.Pod{ return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
@ -353,11 +355,14 @@ func makeNamespace(name string) *v1.Namespace {
return &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: name}} return &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: name}}
} }
func makeBookmarkEvent(rv uint64) *v1.Pod { func makeBookmarkEvent(rv uint64, t *testing.T) *v1.Pod {
return &v1.Pod{ return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
ResourceVersion: fmt.Sprintf("%d", rv), ResourceVersion: fmt.Sprintf("%d", rv),
Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"}, Annotations: map[string]string{
metav1.InitialEventsAnnotationKey: "true",
metav1.InitialEventsListBlueprintAnnotationKey: encodeObjectToBase64String(makeEmptyPodList(), t),
},
}, },
} }
} }
@ -372,3 +377,23 @@ func (f fakeObj) GetObjectKind() schema.ObjectKind {
func (f fakeObj) DeepCopyObject() runtime.Object { func (f fakeObj) DeepCopyObject() runtime.Object {
return fakeObj{} return fakeObj{}
} }
func newJSONSerializer() runtime.Serializer {
return runtimejson.NewSerializerWithOptions(
runtimejson.DefaultMetaFactory,
clientgoscheme.Scheme,
clientgoscheme.Scheme,
runtimejson.SerializerOptions{},
)
}
func encodeObjectToBase64String(obj runtime.Object, t *testing.T) string {
e := newJSONSerializer()
var buf bytes.Buffer
err := e.Encode(obj, &buf)
if err != nil {
t.Fatal(err)
}
return base64.StdEncoding.EncodeToString(buf.Bytes())
}

View File

@ -32,7 +32,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
@ -131,6 +130,7 @@ var _ = SIGDescribe("API Streaming (aka. WatchList)", framework.WithSerial(), fe
ginkgo.By("Verifying if the secret list was properly streamed") ginkgo.By("Verifying if the secret list was properly streamed")
streamedSecrets := secretList.Items streamedSecrets := secretList.Items
gomega.Expect(cmp.Equal(expectedSecrets, streamedSecrets)).To(gomega.BeTrueBecause("data received via watchlist must match the added data")) gomega.Expect(cmp.Equal(expectedSecrets, streamedSecrets)).To(gomega.BeTrueBecause("data received via watchlist must match the added data"))
gomega.Expect(secretList.GetObjectKind().GroupVersionKind()).To(gomega.Equal(v1.SchemeGroupVersion.WithKind("SecretList")))
ginkgo.By("Verifying if expected requests were sent to the server") ginkgo.By("Verifying if expected requests were sent to the server")
expectedRequestMadeByDynamicClient := getExpectedRequestMadeByClientFor(secretList.GetResourceVersion()) expectedRequestMadeByDynamicClient := getExpectedRequestMadeByClientFor(secretList.GetResourceVersion())
@ -159,7 +159,6 @@ var _ = SIGDescribe("API Streaming (aka. WatchList)", framework.WithSerial(), fe
ginkgo.By("Verifying if the secret meta list was properly streamed") ginkgo.By("Verifying if the secret meta list was properly streamed")
streamedMetaSecrets := secretMetaList.Items streamedMetaSecrets := secretMetaList.Items
gomega.Expect(cmp.Equal(expectedMetaSecrets, streamedMetaSecrets)).To(gomega.BeTrueBecause("data received via watchlist must match the added data")) gomega.Expect(cmp.Equal(expectedMetaSecrets, streamedMetaSecrets)).To(gomega.BeTrueBecause("data received via watchlist must match the added data"))
gomega.Expect(secretMetaList.GetObjectKind().GroupVersionKind()).To(gomega.Equal(schema.GroupVersion{}.WithKind("PartialObjectMetadataList")))
ginkgo.By("Verifying if expected requests were sent to the server") ginkgo.By("Verifying if expected requests were sent to the server")
expectedRequestMadeByMetaClient := getExpectedRequestMadeByClientFor(secretMetaList.GetResourceVersion()) expectedRequestMadeByMetaClient := getExpectedRequestMadeByClientFor(secretMetaList.GetResourceVersion())