diff --git a/rest/request.go b/rest/request.go index 9696d1cb..fb572169 100644 --- a/rest/request.go +++ b/rest/request.go @@ -19,7 +19,6 @@ package rest import ( "bytes" "context" - "encoding/base64" "encoding/hex" "fmt" "io" @@ -38,15 +37,12 @@ import ( "golang.org/x/net/http2" "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer/streaming" "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/watch" - clientfeatures "k8s.io/client-go/features" restclientwatch "k8s.io/client-go/rest/watch" "k8s.io/client-go/tools/metrics" "k8s.io/client-go/util/flowcontrol" @@ -760,11 +756,6 @@ func (b *throttledLogger) info(logger klog.Logger, message string, kv ...any) { // Watch attempts to begin watching the requested location. // Returns a watch.Interface, or an error. func (r *Request) Watch(ctx context.Context) (watch.Interface, error) { - w, _, e := r.watchInternal(ctx) - return w, e -} - -func (r *Request) watchInternal(ctx context.Context) (watch.Interface, runtime.Decoder, error) { if r.body == nil { logBody(klog.FromContext(ctx), 2, "Request Body", r.bodyBytes) } @@ -772,7 +763,7 @@ func (r *Request) watchInternal(ctx context.Context) (watch.Interface, runtime.D // We specifically don't want to rate limit watches, so we // don't use r.rateLimiter here. if r.err != nil { - return nil, nil, r.err + return nil, r.err } client := r.c.Client @@ -792,12 +783,12 @@ func (r *Request) watchInternal(ctx context.Context) (watch.Interface, runtime.D url := r.URL().String() for { if err := retry.Before(ctx, r); err != nil { - return nil, nil, retry.WrapPreviousError(err) + return nil, retry.WrapPreviousError(err) } req, err := r.newHTTPRequest(ctx) if err != nil { - return nil, nil, err + return nil, err } resp, err := client.Do(req) @@ -825,178 +816,19 @@ func (r *Request) watchInternal(ctx context.Context) (watch.Interface, runtime.D }() if done { if isErrRetryableFunc(req, err) { - return watch.NewEmptyWatch(), nil, nil + return watch.NewEmptyWatch(), nil } if err == nil { // if the server sent us an HTTP Response object, // we need to return the error object from that. err = transformErr } - return nil, nil, retry.WrapPreviousError(err) + return nil, retry.WrapPreviousError(err) } } } -type WatchListResult struct { - // err holds any errors we might have received - // during streaming. - err error - - // items hold the collected data - items []runtime.Object - - // initialEventsEndBookmarkRV holds the resource version - // extracted from the bookmark event that marks - // the end of the stream. - initialEventsEndBookmarkRV string - - // negotiatedObjectDecoder knows how to decode - // the initialEventsListBlueprint - negotiatedObjectDecoder runtime.Decoder - - // base64EncodedInitialEventsListBlueprint contains an empty, - // versioned list encoded in the requested format - // (e.g., protobuf, JSON, CBOR) and stored as a base64-encoded string - base64EncodedInitialEventsListBlueprint string -} - -// Into stores the result into obj. The passed obj parameter must be a pointer to a list type. -// -// Note: -// -// Special attention should be given to the type *unstructured.Unstructured, -// which represents a list type but does not have an "Items" field. -// Users who directly use RESTClient may store the response in such an object. -// This particular case is not handled by the current implementation of this function, -// but may be considered for future updates. -func (r WatchListResult) Into(obj runtime.Object) error { - if r.err != nil { - return r.err - } - - listItemsPtr, err := meta.GetItemsPtr(obj) - if err != nil { - return err - } - listVal, err := conversion.EnforcePtr(listItemsPtr) - if err != nil { - return err - } - if listVal.Kind() != reflect.Slice { - return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind()) - } - - encodedInitialEventsListBlueprint, err := base64.StdEncoding.DecodeString(r.base64EncodedInitialEventsListBlueprint) - if err != nil { - return fmt.Errorf("failed to decode the received blueprint list, err %w", err) - } - - err = runtime.DecodeInto(r.negotiatedObjectDecoder, encodedInitialEventsListBlueprint, obj) - if err != nil { - return err - } - - if len(r.items) == 0 { - listVal.Set(reflect.MakeSlice(listVal.Type(), 0, 0)) - } else { - listVal.Set(reflect.MakeSlice(listVal.Type(), len(r.items), len(r.items))) - for i, o := range r.items { - if listVal.Type().Elem() != reflect.TypeOf(o).Elem() { - return fmt.Errorf("received object type = %v at index = %d, doesn't match the list item type = %v", reflect.TypeOf(o).Elem(), i, listVal.Type().Elem()) - } - listVal.Index(i).Set(reflect.ValueOf(o).Elem()) - } - } - - listMeta, err := meta.ListAccessor(obj) - if err != nil { - return err - } - listMeta.SetResourceVersion(r.initialEventsEndBookmarkRV) - return nil -} - -// WatchList establishes a stream to get a consistent snapshot of data -// from the server as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#proposal -// -// Note that the watchlist requires properly setting the ListOptions -// otherwise it just establishes a regular watch with the server. -// Check the documentation https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists -// to see what parameters are currently required. -func (r *Request) WatchList(ctx context.Context) WatchListResult { - if r.body == nil { - logBody(klog.FromContext(ctx), 2, "Request Body", r.bodyBytes) - } - - if !clientfeatures.FeatureGates().Enabled(clientfeatures.WatchListClient) { - return WatchListResult{err: fmt.Errorf("%q feature gate is not enabled", clientfeatures.WatchListClient)} - } - // TODO(#115478): consider validating request parameters (i.e sendInitialEvents). - // Most users use the generated client, which handles the proper setting of parameters. - // We don't have validation for other methods (e.g., the Watch) - // thus, for symmetry, we haven't added additional checks for the WatchList method. - w, d, err := r.watchInternal(ctx) - if err != nil { - return WatchListResult{err: err} - } - return r.handleWatchList(ctx, w, d) -} - -// handleWatchList holds the actual logic for easier unit testing. -// Note that this function will close the passed watch. -func (r *Request) handleWatchList(ctx context.Context, w watch.Interface, negotiatedObjectDecoder runtime.Decoder) WatchListResult { - defer w.Stop() - var lastKey string - var items []runtime.Object - - for { - select { - case <-ctx.Done(): - return WatchListResult{err: ctx.Err()} - case event, ok := <-w.ResultChan(): - if !ok { - return WatchListResult{err: fmt.Errorf("unexpected watch close")} - } - if event.Type == watch.Error { - return WatchListResult{err: errors.FromObject(event.Object)} - } - meta, err := meta.Accessor(event.Object) - if err != nil { - return WatchListResult{err: fmt.Errorf("failed to parse watch event: %#v", event)} - } - - switch event.Type { - case watch.Added: - // the following check ensures that the response is ordered. - // earlier servers had a bug that caused them to not sort the output. - // in such cases, return an error which can trigger fallback logic. - key := objectKeyFromMeta(meta) - if len(lastKey) > 0 && lastKey > key { - return WatchListResult{err: fmt.Errorf("cannot add the obj (%#v) with the key = %s, as it violates the ordering guarantees provided by the watchlist feature in beta phase, lastInsertedKey was = %s", event.Object, key, lastKey)} - } - items = append(items, event.Object) - lastKey = key - case watch.Bookmark: - if meta.GetAnnotations()[metav1.InitialEventsAnnotationKey] == "true" { - base64EncodedInitialEventsListBlueprint := meta.GetAnnotations()[metav1.InitialEventsListBlueprintAnnotationKey] - if len(base64EncodedInitialEventsListBlueprint) == 0 { - return WatchListResult{err: fmt.Errorf("%q annotation is missing content", metav1.InitialEventsListBlueprintAnnotationKey)} - } - return WatchListResult{ - items: items, - initialEventsEndBookmarkRV: meta.GetResourceVersion(), - negotiatedObjectDecoder: negotiatedObjectDecoder, - base64EncodedInitialEventsListBlueprint: base64EncodedInitialEventsListBlueprint, - } - } - default: - return WatchListResult{err: fmt.Errorf("unexpected watch event %#v, expected to only receive watch.Added and watch.Bookmark events", event)} - } - } - } -} - -func (r *Request) newStreamWatcher(ctx context.Context, resp *http.Response) (watch.Interface, runtime.Decoder, error) { +func (r *Request) newStreamWatcher(ctx context.Context, resp *http.Response) (watch.Interface, error) { contentType := resp.Header.Get("Content-Type") mediaType, params, err := mime.ParseMediaType(contentType) if err != nil { @@ -1004,7 +836,7 @@ func (r *Request) newStreamWatcher(ctx context.Context, resp *http.Response) (wa } objectDecoder, streamingSerializer, framer, err := r.contentConfig.Negotiator.StreamDecoder(mediaType, params) if err != nil { - return nil, nil, err + return nil, err } handleWarnings(ctx, resp.Header, r.warningHandler) @@ -1018,7 +850,7 @@ func (r *Request) newStreamWatcher(ctx context.Context, resp *http.Response) (wa // use 500 to indicate that the cause of the error is unknown - other error codes // are more specific to HTTP interactions, and set a reason errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"), - ), objectDecoder, nil + ), nil } // updateRequestResultMetric increments the RequestResult metric counter, @@ -1727,10 +1559,3 @@ func ValidatePathSegmentName(name string, prefix bool) []string { } return IsValidPathSegmentName(name) } - -func objectKeyFromMeta(objMeta metav1.Object) string { - if len(objMeta.GetNamespace()) > 0 { - return fmt.Sprintf("%s/%s", objMeta.GetNamespace(), objMeta.GetName()) - } - return objMeta.GetName() -} diff --git a/rest/request_watchlist_test.go b/rest/request_watchlist_test.go deleted file mode 100644 index 8195ece1..00000000 --- a/rest/request_watchlist_test.go +++ /dev/null @@ -1,397 +0,0 @@ -/* -Copyright 2024 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package rest - -import ( - "bytes" - "context" - "encoding/base64" - "fmt" - "regexp" - "testing" - - "github.com/google/go-cmp/cmp" - - v1 "k8s.io/api/core/v1" - apiequality "k8s.io/apimachinery/pkg/api/equality" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - runtimejson "k8s.io/apimachinery/pkg/runtime/serializer/json" - "k8s.io/apimachinery/pkg/watch" - clientfeatures "k8s.io/client-go/features" - clientfeaturestesting "k8s.io/client-go/features/testing" - clientgoscheme "k8s.io/client-go/kubernetes/scheme" -) - -func TestWatchListResult(t *testing.T) { - scenarios := []struct { - name string - target WatchListResult - result runtime.Object - - expectedResult *v1.PodList - expectedErr error - }{ - { - name: "not a pointer", - result: fakeObj{}, - expectedErr: fmt.Errorf("rest.fakeObj is not a list: expected pointer, but got rest.fakeObj type"), - }, - { - name: "nil input won't panic", - result: nil, - expectedErr: fmt.Errorf(" is not a list: expected pointer, but got invalid kind"), - }, - { - name: "not a list", - result: &v1.Pod{}, - expectedErr: fmt.Errorf("*v1.Pod is not a list: no Items field in this object"), - }, - { - name: "invalid base64EncodedInitialEventsListBlueprint", - result: &v1.PodList{}, - target: WatchListResult{ - base64EncodedInitialEventsListBlueprint: "invalid", - negotiatedObjectDecoder: newJSONSerializer(), - }, - expectedErr: fmt.Errorf("failed to decode the received blueprint list, err illegal base64 data at input byte 4"), - }, - { - name: "empty list", - result: &v1.PodList{}, - target: WatchListResult{ - base64EncodedInitialEventsListBlueprint: encodeObjectToBase64String(makeEmptyPodList(), t), - negotiatedObjectDecoder: newJSONSerializer(), - }, - expectedResult: &v1.PodList{ - TypeMeta: metav1.TypeMeta{Kind: "PodList"}, - Items: []v1.Pod{}, - }, - }, - { - name: "rv is applied", - result: &v1.PodList{}, - target: WatchListResult{ - initialEventsEndBookmarkRV: "100", - base64EncodedInitialEventsListBlueprint: encodeObjectToBase64String(makeEmptyPodList(), t), - negotiatedObjectDecoder: newJSONSerializer(), - }, - expectedResult: &v1.PodList{ - TypeMeta: metav1.TypeMeta{Kind: "PodList"}, - ListMeta: metav1.ListMeta{ResourceVersion: "100"}, - Items: []v1.Pod{}, - }, - }, - { - name: "items are applied", - result: &v1.PodList{}, - target: WatchListResult{ - items: []runtime.Object{makePod(1), makePod(2)}, - base64EncodedInitialEventsListBlueprint: encodeObjectToBase64String(makeEmptyPodList(), t), - negotiatedObjectDecoder: newJSONSerializer(), - }, - expectedResult: &v1.PodList{ - TypeMeta: metav1.TypeMeta{Kind: "PodList"}, - Items: []v1.Pod{*makePod(1), *makePod(2)}, - }, - }, - { - name: "list's object type mismatch", - result: &v1.PodList{}, - target: WatchListResult{ - items: []runtime.Object{makeNamespace("1")}, - base64EncodedInitialEventsListBlueprint: encodeObjectToBase64String(makeEmptyPodList(), t), - negotiatedObjectDecoder: newJSONSerializer(), - }, - expectedErr: fmt.Errorf("received object type = v1.Namespace at index = 0, doesn't match the list item type = v1.Pod"), - }, - { - name: "list type mismatch", - result: &v1.SecretList{}, - target: WatchListResult{ - items: []runtime.Object{makePod(1), makePod(2)}, - base64EncodedInitialEventsListBlueprint: encodeObjectToBase64String(makeEmptyPodList(), t), - negotiatedObjectDecoder: newJSONSerializer(), - }, - expectedErr: fmt.Errorf("unable to decode /v1, Kind=PodList into *v1.SecretList"), - }, - } - for _, scenario := range scenarios { - t.Run(scenario.name, func(t *testing.T) { - err := scenario.target.Into(scenario.result) - if scenario.expectedErr != nil && err == nil { - t.Fatalf("expected an error = %v, got nil", scenario.expectedErr) - } - if scenario.expectedErr == nil && err != nil { - t.Fatalf("didn't expect an error, got = %v", err) - } - if err != nil { - if scenario.expectedErr.Error() != err.Error() { - t.Fatalf("unexpected err = %v, expected = %v", err, scenario.expectedErr) - } - return - } - if !apiequality.Semantic.DeepEqual(scenario.expectedResult, scenario.result) { - t.Errorf("diff: %v", cmp.Diff(scenario.expectedResult, scenario.result)) - } - }) - } -} - -func TestWatchListSuccess(t *testing.T) { - scenarios := []struct { - name string - watchEvents []watch.Event - negotiatedObjectDecoder runtime.Serializer - - expectedResult *v1.PodList - }{ - { - name: "happy path", - negotiatedObjectDecoder: newJSONSerializer(), - watchEvents: []watch.Event{ - {Type: watch.Added, Object: makePod(1)}, - {Type: watch.Added, Object: makePod(2)}, - {Type: watch.Bookmark, Object: makeBookmarkEvent(5, t)}, - }, - expectedResult: &v1.PodList{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "", - Kind: "PodList", - }, - ListMeta: metav1.ListMeta{ResourceVersion: "5"}, - Items: []v1.Pod{*makePod(1), *makePod(2)}, - }, - }, - { - name: "only the bookmark", - negotiatedObjectDecoder: newJSONSerializer(), - watchEvents: []watch.Event{ - {Type: watch.Bookmark, Object: makeBookmarkEvent(5, t)}, - }, - expectedResult: &v1.PodList{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "", - Kind: "PodList", - }, - ListMeta: metav1.ListMeta{ResourceVersion: "5"}, - Items: []v1.Pod{}, - }, - }, - } - for _, scenario := range scenarios { - t.Run(scenario.name, func(t *testing.T) { - ctx := context.Background() - fakeWatcher := watch.NewFake() - target := &Request{ - c: &RESTClient{}, - } - - go func(watchEvents []watch.Event) { - for _, watchEvent := range watchEvents { - fakeWatcher.Action(watchEvent.Type, watchEvent.Object) - } - }(scenario.watchEvents) - - res := target.handleWatchList(ctx, fakeWatcher, scenario.negotiatedObjectDecoder) - if res.err != nil { - t.Fatal(res.err) - } - - result := &v1.PodList{} - if err := res.Into(result); err != nil { - t.Fatal(err) - } - if !apiequality.Semantic.DeepEqual(scenario.expectedResult, result) { - t.Errorf("diff: %v", cmp.Diff(scenario.expectedResult, result)) - } - if !fakeWatcher.IsStopped() { - t.Fatalf("the watcher wasn't stopped") - } - }) - } -} - -func TestWatchListFailure(t *testing.T) { - scenarios := []struct { - name string - ctx context.Context - watcher *watch.FakeWatcher - watchEvents []watch.Event - - expectedError error - }{ - { - name: "request stop", - ctx: func() context.Context { - ctx, ctxCancel := context.WithCancel(context.TODO()) - ctxCancel() - return ctx - }(), - watcher: watch.NewFake(), - expectedError: fmt.Errorf("context canceled"), - }, - { - name: "stop watcher", - ctx: context.TODO(), - watcher: func() *watch.FakeWatcher { - w := watch.NewFake() - w.Stop() - return w - }(), - expectedError: fmt.Errorf("unexpected watch close"), - }, - { - name: "stop on watch.Error", - ctx: context.TODO(), - watcher: watch.NewFake(), - watchEvents: []watch.Event{{Type: watch.Error, Object: &apierrors.NewInternalError(fmt.Errorf("dummy errror")).ErrStatus}}, - expectedError: fmt.Errorf("Internal error occurred: dummy errror"), - }, - { - name: "incorrect watch type (Deleted)", - ctx: context.TODO(), - watcher: watch.NewFake(), - watchEvents: []watch.Event{{Type: watch.Deleted, Object: makePod(1)}}, - expectedError: fmt.Errorf("unexpected watch event .*, expected to only receive watch.Added and watch.Bookmark events"), - }, - { - name: "incorrect watch type (Modified)", - ctx: context.TODO(), - watcher: watch.NewFake(), - watchEvents: []watch.Event{{Type: watch.Modified, Object: makePod(1)}}, - expectedError: fmt.Errorf("unexpected watch event .*, expected to only receive watch.Added and watch.Bookmark events"), - }, - { - name: "unordered input returns an error", - ctx: context.TODO(), - watcher: watch.NewFake(), - watchEvents: []watch.Event{{Type: watch.Added, Object: makePod(3)}, {Type: watch.Added, Object: makePod(1)}}, - expectedError: fmt.Errorf("cannot add the obj .* with the key = ns/pod-1, as it violates the ordering guarantees provided by the watchlist feature in beta phase, lastInsertedKey was = ns/pod-3"), - }, - } - - for _, scenario := range scenarios { - t.Run(scenario.name, func(t *testing.T) { - target := &Request{} - go func(w *watch.FakeWatcher, watchEvents []watch.Event) { - for _, event := range watchEvents { - w.Action(event.Type, event.Object) - } - }(scenario.watcher, scenario.watchEvents) - - res := target.handleWatchList(scenario.ctx, scenario.watcher, nil /*TODO*/) - resErr := res.Into(nil) - if resErr == nil { - t.Fatal("expected to get an error, got nil") - } - matched, err := regexp.MatchString(scenario.expectedError.Error(), resErr.Error()) - if err != nil { - t.Fatal(err) - } - if !matched { - t.Fatalf("unexpected err = %v, expected = %v", resErr, scenario.expectedError) - } - if !scenario.watcher.IsStopped() { - t.Fatalf("the watcher wasn't stopped") - } - }) - } -} - -func TestWatchListWhenFeatureGateDisabled(t *testing.T) { - clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, false) - expectedError := fmt.Errorf("%q feature gate is not enabled", clientfeatures.WatchListClient) - target := &Request{} - - res := target.WatchList(context.TODO()) - - resErr := res.Into(nil) - if resErr == nil { - t.Fatal("expected to get an error, got nil") - } - if resErr.Error() != expectedError.Error() { - t.Fatalf("unexpected error: %v, expected: %v", resErr, expectedError) - } -} - -func makeEmptyPodList() *v1.PodList { - return &v1.PodList{ - TypeMeta: metav1.TypeMeta{Kind: "PodList"}, - Items: []v1.Pod{}, - } -} - -func makePod(rv uint64) *v1.Pod { - return &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("pod-%d", rv), - Namespace: "ns", - ResourceVersion: fmt.Sprintf("%d", rv), - Annotations: map[string]string{}, - }, - } -} - -func makeNamespace(name string) *v1.Namespace { - return &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: name}} -} - -func makeBookmarkEvent(rv uint64, t *testing.T) *v1.Pod { - return &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - ResourceVersion: fmt.Sprintf("%d", rv), - Annotations: map[string]string{ - metav1.InitialEventsAnnotationKey: "true", - metav1.InitialEventsListBlueprintAnnotationKey: encodeObjectToBase64String(makeEmptyPodList(), t), - }, - }, - } -} - -type fakeObj struct { -} - -func (f fakeObj) GetObjectKind() schema.ObjectKind { - return schema.EmptyObjectKind -} - -func (f fakeObj) DeepCopyObject() runtime.Object { - return fakeObj{} -} - -func newJSONSerializer() runtime.Serializer { - return runtimejson.NewSerializerWithOptions( - runtimejson.DefaultMetaFactory, - clientgoscheme.Scheme, - clientgoscheme.Scheme, - runtimejson.SerializerOptions{}, - ) -} - -func encodeObjectToBase64String(obj runtime.Object, t *testing.T) string { - e := newJSONSerializer() - - var buf bytes.Buffer - err := e.Encode(obj, &buf) - if err != nil { - t.Fatal(err) - } - return base64.StdEncoding.EncodeToString(buf.Bytes()) -}