diff --git a/rest/request.go b/rest/request.go index 850e57da..f5a9f68c 100644 --- a/rest/request.go +++ b/rest/request.go @@ -37,12 +37,15 @@ import ( "golang.org/x/net/http2" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer/streaming" "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/watch" + clientfeatures "k8s.io/client-go/features" restclientwatch "k8s.io/client-go/rest/watch" "k8s.io/client-go/tools/metrics" "k8s.io/client-go/util/flowcontrol" @@ -768,6 +771,142 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) { } } +type WatchListResult struct { + // err holds any errors we might have received + // during streaming. + err error + + // items hold the collected data + items []runtime.Object + + // initialEventsEndBookmarkRV holds the resource version + // extracted from the bookmark event that marks + // the end of the stream. + initialEventsEndBookmarkRV string + + // gv represents the API version + // it is used to construct the final list response + // normally this information is filled by the server + gv schema.GroupVersion +} + +func (r WatchListResult) Into(obj runtime.Object) error { + if r.err != nil { + return r.err + } + + listPtr, err := meta.GetItemsPtr(obj) + if err != nil { + return err + } + listVal, err := conversion.EnforcePtr(listPtr) + if err != nil { + return err + } + if listVal.Kind() != reflect.Slice { + return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind()) + } + + if len(r.items) == 0 { + listVal.Set(reflect.MakeSlice(listVal.Type(), 0, 0)) + } else { + listVal.Set(reflect.MakeSlice(listVal.Type(), len(r.items), len(r.items))) + for i, o := range r.items { + if listVal.Type().Elem() != reflect.TypeOf(o).Elem() { + return fmt.Errorf("received object type = %v at index = %d, doesn't match the list item type = %v", reflect.TypeOf(o).Elem(), i, listVal.Type().Elem()) + } + listVal.Index(i).Set(reflect.ValueOf(o).Elem()) + } + } + + listMeta, err := meta.ListAccessor(obj) + if err != nil { + return err + } + listMeta.SetResourceVersion(r.initialEventsEndBookmarkRV) + + typeMeta, err := meta.TypeAccessor(obj) + if err != nil { + return err + } + version := r.gv.String() + typeMeta.SetAPIVersion(version) + typeMeta.SetKind(reflect.TypeOf(obj).Elem().Name()) + + return nil +} + +// WatchList establishes a stream to get a consistent snapshot of data +// from the server as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#proposal +// +// Note that the watchlist requires properly setting the ListOptions +// otherwise it just establishes a regular watch with the server. +// Check the documentation https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists +// to see what parameters are currently required. +func (r *Request) WatchList(ctx context.Context) WatchListResult { + if !clientfeatures.FeatureGates().Enabled(clientfeatures.WatchListClient) { + return WatchListResult{err: fmt.Errorf("%q feature gate is not enabled", clientfeatures.WatchListClient)} + } + // TODO(#115478): consider validating request parameters (i.e sendInitialEvents). + // Most users use the generated client, which handles the proper setting of parameters. + // We don't have validation for other methods (e.g., the Watch) + // thus, for symmetry, we haven't added additional checks for the WatchList method. + w, err := r.Watch(ctx) + if err != nil { + return WatchListResult{err: err} + } + return r.handleWatchList(ctx, w) +} + +// handleWatchList holds the actual logic for easier unit testing. +// Note that this function will close the passed watch. +func (r *Request) handleWatchList(ctx context.Context, w watch.Interface) WatchListResult { + defer w.Stop() + var lastKey string + var items []runtime.Object + + for { + select { + case <-ctx.Done(): + return WatchListResult{err: ctx.Err()} + case event, ok := <-w.ResultChan(): + if !ok { + return WatchListResult{err: fmt.Errorf("unexpected watch close")} + } + if event.Type == watch.Error { + return WatchListResult{err: errors.FromObject(event.Object)} + } + meta, err := meta.Accessor(event.Object) + if err != nil { + return WatchListResult{err: fmt.Errorf("failed to parse watch event: %#v", event)} + } + + switch event.Type { + case watch.Added: + // the following check ensures that the response is ordered. + // earlier servers had a bug that caused them to not sort the output. + // in such cases, return an error which can trigger fallback logic. + key := objectKeyFromMeta(meta) + if len(lastKey) > 0 && lastKey > key { + return WatchListResult{err: fmt.Errorf("cannot add the obj (%#v) with the key = %s, as it violates the ordering guarantees provided by the watchlist feature in beta phase, lastInsertedKey was = %s", event.Object, key, lastKey)} + } + items = append(items, event.Object) + lastKey = key + case watch.Bookmark: + if meta.GetAnnotations()[metav1.InitialEventsAnnotationKey] == "true" { + return WatchListResult{ + items: items, + initialEventsEndBookmarkRV: meta.GetResourceVersion(), + gv: r.c.content.GroupVersion, + } + } + default: + return WatchListResult{err: fmt.Errorf("unexpected watch event %#v, expected to only receive watch.Added and watch.Bookmark events", event)} + } + } + } +} + func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error) { contentType := resp.Header.Get("Content-Type") mediaType, params, err := mime.ParseMediaType(contentType) @@ -1470,3 +1609,10 @@ func ValidatePathSegmentName(name string, prefix bool) []string { } return IsValidPathSegmentName(name) } + +func objectKeyFromMeta(objMeta metav1.Object) string { + if len(objMeta.GetNamespace()) > 0 { + return fmt.Sprintf("%s/%s", objMeta.GetNamespace(), objMeta.GetName()) + } + return objMeta.GetName() +} diff --git a/rest/request_watchlist_test.go b/rest/request_watchlist_test.go new file mode 100644 index 00000000..4ebfe81b --- /dev/null +++ b/rest/request_watchlist_test.go @@ -0,0 +1,356 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rest + +import ( + "context" + "fmt" + "regexp" + "testing" + + "github.com/google/go-cmp/cmp" + + v1 "k8s.io/api/core/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" +) + +func TestWatchListResult(t *testing.T) { + scenarios := []struct { + name string + target WatchListResult + result runtime.Object + + expectedResult *v1.PodList + expectedErr error + }{ + { + name: "not a pointer", + result: fakeObj{}, + expectedErr: fmt.Errorf("rest.fakeObj is not a list: expected pointer, but got rest.fakeObj type"), + }, + { + name: "nil input won't panic", + result: nil, + expectedErr: fmt.Errorf(" is not a list: expected pointer, but got invalid kind"), + }, + { + name: "not a list", + result: &v1.Pod{}, + expectedErr: fmt.Errorf("*v1.Pod is not a list: no Items field in this object"), + }, + { + name: "an err is always returned", + result: nil, + target: WatchListResult{err: fmt.Errorf("dummy err")}, + expectedErr: fmt.Errorf("dummy err"), + }, + { + name: "empty list", + result: &v1.PodList{}, + expectedResult: &v1.PodList{ + TypeMeta: metav1.TypeMeta{Kind: "PodList"}, + Items: []v1.Pod{}, + }, + }, + { + name: "gv is applied", + result: &v1.PodList{}, + target: WatchListResult{gv: schema.GroupVersion{Group: "g", Version: "v"}}, + expectedResult: &v1.PodList{ + TypeMeta: metav1.TypeMeta{Kind: "PodList", APIVersion: "g/v"}, + Items: []v1.Pod{}, + }, + }, + { + name: "gv is applied, empty group", + result: &v1.PodList{}, + target: WatchListResult{gv: schema.GroupVersion{Version: "v"}}, + expectedResult: &v1.PodList{ + TypeMeta: metav1.TypeMeta{Kind: "PodList", APIVersion: "v"}, + Items: []v1.Pod{}, + }, + }, + { + name: "rv is applied", + result: &v1.PodList{}, + target: WatchListResult{initialEventsEndBookmarkRV: "100"}, + expectedResult: &v1.PodList{ + TypeMeta: metav1.TypeMeta{Kind: "PodList"}, + ListMeta: metav1.ListMeta{ResourceVersion: "100"}, + Items: []v1.Pod{}, + }, + }, + { + name: "items are applied", + result: &v1.PodList{}, + target: WatchListResult{items: []runtime.Object{makePod(1), makePod(2)}}, + expectedResult: &v1.PodList{ + TypeMeta: metav1.TypeMeta{Kind: "PodList"}, + Items: []v1.Pod{*makePod(1), *makePod(2)}, + }, + }, + { + name: "type mismatch", + result: &v1.PodList{}, + target: WatchListResult{items: []runtime.Object{makeNamespace("1")}}, + expectedErr: fmt.Errorf("received object type = v1.Namespace at index = 0, doesn't match the list item type = v1.Pod"), + }, + } + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + err := scenario.target.Into(scenario.result) + if scenario.expectedErr != nil && err == nil { + t.Fatalf("expected an error = %v, got nil", scenario.expectedErr) + } + if scenario.expectedErr == nil && err != nil { + t.Fatalf("didn't expect an error, got = %v", err) + } + if err != nil { + if scenario.expectedErr.Error() != err.Error() { + t.Fatalf("unexpected err = %v, expected = %v", err, scenario.expectedErr) + } + return + } + if !apiequality.Semantic.DeepEqual(scenario.expectedResult, scenario.result) { + t.Errorf("diff: %v", cmp.Diff(scenario.expectedResult, scenario.result)) + } + }) + } +} + +func TestWatchListSuccess(t *testing.T) { + scenarios := []struct { + name string + gv schema.GroupVersion + watchEvents []watch.Event + expectedResult *v1.PodList + }{ + { + name: "happy path", + // Note that the APIVersion for the core API group is "v1" (not "core/v1"). + // We fake "core/v1" here to test if the Group part is properly + // recognized and set on the resulting object. + gv: schema.GroupVersion{Group: "core", Version: "v1"}, + watchEvents: []watch.Event{ + {Type: watch.Added, Object: makePod(1)}, + {Type: watch.Added, Object: makePod(2)}, + {Type: watch.Bookmark, Object: makeBookmarkEvent(5)}, + }, + expectedResult: &v1.PodList{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "core/v1", + Kind: "PodList", + }, + ListMeta: metav1.ListMeta{ResourceVersion: "5"}, + Items: []v1.Pod{*makePod(1), *makePod(2)}, + }, + }, + { + name: "APIVersion with only version provided is properly set", + gv: schema.GroupVersion{Version: "v1"}, + watchEvents: []watch.Event{ + {Type: watch.Added, Object: makePod(1)}, + {Type: watch.Bookmark, Object: makeBookmarkEvent(5)}, + }, + expectedResult: &v1.PodList{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "PodList", + }, + ListMeta: metav1.ListMeta{ResourceVersion: "5"}, + Items: []v1.Pod{*makePod(1)}, + }, + }, + { + name: "only the bookmark", + gv: schema.GroupVersion{Version: "v1"}, + watchEvents: []watch.Event{ + {Type: watch.Bookmark, Object: makeBookmarkEvent(5)}, + }, + expectedResult: &v1.PodList{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "PodList", + }, + ListMeta: metav1.ListMeta{ResourceVersion: "5"}, + Items: []v1.Pod{}, + }, + }, + } + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + ctx := context.Background() + fakeWatcher := watch.NewFake() + target := &Request{ + c: &RESTClient{ + content: ClientContentConfig{ + GroupVersion: scenario.gv, + }, + }, + } + + go func(watchEvents []watch.Event) { + for _, watchEvent := range watchEvents { + fakeWatcher.Action(watchEvent.Type, watchEvent.Object) + } + }(scenario.watchEvents) + + res := target.handleWatchList(ctx, fakeWatcher) + if res.err != nil { + t.Fatal(res.err) + } + + result := &v1.PodList{} + if err := res.Into(result); err != nil { + t.Fatal(err) + } + if !apiequality.Semantic.DeepEqual(scenario.expectedResult, result) { + t.Errorf("diff: %v", cmp.Diff(scenario.expectedResult, result)) + } + if !fakeWatcher.IsStopped() { + t.Fatalf("the watcher wasn't stopped") + } + }) + } +} + +func TestWatchListFailure(t *testing.T) { + scenarios := []struct { + name string + ctx context.Context + watcher *watch.FakeWatcher + watchEvents []watch.Event + + expectedError error + }{ + { + name: "request stop", + ctx: func() context.Context { + ctx, ctxCancel := context.WithCancel(context.TODO()) + ctxCancel() + return ctx + }(), + watcher: watch.NewFake(), + expectedError: fmt.Errorf("context canceled"), + }, + { + name: "stop watcher", + ctx: context.TODO(), + watcher: func() *watch.FakeWatcher { + w := watch.NewFake() + w.Stop() + return w + }(), + expectedError: fmt.Errorf("unexpected watch close"), + }, + { + name: "stop on watch.Error", + ctx: context.TODO(), + watcher: watch.NewFake(), + watchEvents: []watch.Event{{Type: watch.Error, Object: &apierrors.NewInternalError(fmt.Errorf("dummy errror")).ErrStatus}}, + expectedError: fmt.Errorf("Internal error occurred: dummy errror"), + }, + { + name: "incorrect watch type (Deleted)", + ctx: context.TODO(), + watcher: watch.NewFake(), + watchEvents: []watch.Event{{Type: watch.Deleted, Object: makePod(1)}}, + expectedError: fmt.Errorf("unexpected watch event .*, expected to only receive watch.Added and watch.Bookmark events"), + }, + { + name: "incorrect watch type (Modified)", + ctx: context.TODO(), + watcher: watch.NewFake(), + watchEvents: []watch.Event{{Type: watch.Modified, Object: makePod(1)}}, + expectedError: fmt.Errorf("unexpected watch event .*, expected to only receive watch.Added and watch.Bookmark events"), + }, + { + name: "unordered input returns an error", + ctx: context.TODO(), + watcher: watch.NewFake(), + watchEvents: []watch.Event{{Type: watch.Added, Object: makePod(3)}, {Type: watch.Added, Object: makePod(1)}}, + expectedError: fmt.Errorf("cannot add the obj .* with the key = ns/pod-1, as it violates the ordering guarantees provided by the watchlist feature in beta phase, lastInsertedKey was = ns/pod-3"), + }, + } + + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + target := &Request{} + go func(w *watch.FakeWatcher, watchEvents []watch.Event) { + for _, event := range watchEvents { + w.Action(event.Type, event.Object) + } + }(scenario.watcher, scenario.watchEvents) + + res := target.handleWatchList(scenario.ctx, scenario.watcher) + resErr := res.Into(nil) + if resErr == nil { + t.Fatal("expected to get an error, got nil") + } + matched, err := regexp.MatchString(scenario.expectedError.Error(), resErr.Error()) + if err != nil { + t.Fatal(err) + } + if !matched { + t.Fatalf("unexpected err = %v, expected = %v", resErr, scenario.expectedError) + } + if !scenario.watcher.IsStopped() { + t.Fatalf("the watcher wasn't stopped") + } + }) + } +} + +func makePod(rv uint64) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod-%d", rv), + Namespace: "ns", + ResourceVersion: fmt.Sprintf("%d", rv), + Annotations: map[string]string{}, + }, + } +} + +func makeNamespace(name string) *v1.Namespace { + return &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: name}} +} + +func makeBookmarkEvent(rv uint64) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: fmt.Sprintf("%d", rv), + Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"}, + }, + } +} + +type fakeObj struct { +} + +func (f fakeObj) GetObjectKind() schema.ObjectKind { + return schema.EmptyObjectKind +} + +func (f fakeObj) DeepCopyObject() runtime.Object { + return fakeObj{} +}