client-go/rest: introduce watchlist

Kubernetes-commit: ad3d138cda76fc0267da5131fa3ff7906e2ddf76
This commit is contained in:
Lukasz Szaszkiewicz 2024-01-08 16:43:26 +01:00 committed by Kubernetes Publisher
parent 47f558294c
commit a1be94abc3
2 changed files with 502 additions and 0 deletions

View File

@ -37,12 +37,15 @@ import (
"golang.org/x/net/http2"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
"k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/watch"
clientfeatures "k8s.io/client-go/features"
restclientwatch "k8s.io/client-go/rest/watch"
"k8s.io/client-go/tools/metrics"
"k8s.io/client-go/util/flowcontrol"
@ -768,6 +771,142 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
}
}
type WatchListResult struct {
// err holds any errors we might have received
// during streaming.
err error
// items hold the collected data
items []runtime.Object
// initialEventsEndBookmarkRV holds the resource version
// extracted from the bookmark event that marks
// the end of the stream.
initialEventsEndBookmarkRV string
// gv represents the API version
// it is used to construct the final list response
// normally this information is filled by the server
gv schema.GroupVersion
}
func (r WatchListResult) Into(obj runtime.Object) error {
if r.err != nil {
return r.err
}
listPtr, err := meta.GetItemsPtr(obj)
if err != nil {
return err
}
listVal, err := conversion.EnforcePtr(listPtr)
if err != nil {
return err
}
if listVal.Kind() != reflect.Slice {
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
}
if len(r.items) == 0 {
listVal.Set(reflect.MakeSlice(listVal.Type(), 0, 0))
} else {
listVal.Set(reflect.MakeSlice(listVal.Type(), len(r.items), len(r.items)))
for i, o := range r.items {
if listVal.Type().Elem() != reflect.TypeOf(o).Elem() {
return fmt.Errorf("received object type = %v at index = %d, doesn't match the list item type = %v", reflect.TypeOf(o).Elem(), i, listVal.Type().Elem())
}
listVal.Index(i).Set(reflect.ValueOf(o).Elem())
}
}
listMeta, err := meta.ListAccessor(obj)
if err != nil {
return err
}
listMeta.SetResourceVersion(r.initialEventsEndBookmarkRV)
typeMeta, err := meta.TypeAccessor(obj)
if err != nil {
return err
}
version := r.gv.String()
typeMeta.SetAPIVersion(version)
typeMeta.SetKind(reflect.TypeOf(obj).Elem().Name())
return nil
}
// WatchList establishes a stream to get a consistent snapshot of data
// from the server as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#proposal
//
// Note that the watchlist requires properly setting the ListOptions
// otherwise it just establishes a regular watch with the server.
// Check the documentation https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists
// to see what parameters are currently required.
func (r *Request) WatchList(ctx context.Context) WatchListResult {
if !clientfeatures.FeatureGates().Enabled(clientfeatures.WatchListClient) {
return WatchListResult{err: fmt.Errorf("%q feature gate is not enabled", clientfeatures.WatchListClient)}
}
// TODO(#115478): consider validating request parameters (i.e sendInitialEvents).
// Most users use the generated client, which handles the proper setting of parameters.
// We don't have validation for other methods (e.g., the Watch)
// thus, for symmetry, we haven't added additional checks for the WatchList method.
w, err := r.Watch(ctx)
if err != nil {
return WatchListResult{err: err}
}
return r.handleWatchList(ctx, w)
}
// handleWatchList holds the actual logic for easier unit testing.
// Note that this function will close the passed watch.
func (r *Request) handleWatchList(ctx context.Context, w watch.Interface) WatchListResult {
defer w.Stop()
var lastKey string
var items []runtime.Object
for {
select {
case <-ctx.Done():
return WatchListResult{err: ctx.Err()}
case event, ok := <-w.ResultChan():
if !ok {
return WatchListResult{err: fmt.Errorf("unexpected watch close")}
}
if event.Type == watch.Error {
return WatchListResult{err: errors.FromObject(event.Object)}
}
meta, err := meta.Accessor(event.Object)
if err != nil {
return WatchListResult{err: fmt.Errorf("failed to parse watch event: %#v", event)}
}
switch event.Type {
case watch.Added:
// the following check ensures that the response is ordered.
// earlier servers had a bug that caused them to not sort the output.
// in such cases, return an error which can trigger fallback logic.
key := objectKeyFromMeta(meta)
if len(lastKey) > 0 && lastKey > key {
return WatchListResult{err: fmt.Errorf("cannot add the obj (%#v) with the key = %s, as it violates the ordering guarantees provided by the watchlist feature in beta phase, lastInsertedKey was = %s", event.Object, key, lastKey)}
}
items = append(items, event.Object)
lastKey = key
case watch.Bookmark:
if meta.GetAnnotations()[metav1.InitialEventsAnnotationKey] == "true" {
return WatchListResult{
items: items,
initialEventsEndBookmarkRV: meta.GetResourceVersion(),
gv: r.c.content.GroupVersion,
}
}
default:
return WatchListResult{err: fmt.Errorf("unexpected watch event %#v, expected to only receive watch.Added and watch.Bookmark events", event)}
}
}
}
}
func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error) {
contentType := resp.Header.Get("Content-Type")
mediaType, params, err := mime.ParseMediaType(contentType)
@ -1470,3 +1609,10 @@ func ValidatePathSegmentName(name string, prefix bool) []string {
}
return IsValidPathSegmentName(name)
}
func objectKeyFromMeta(objMeta metav1.Object) string {
if len(objMeta.GetNamespace()) > 0 {
return fmt.Sprintf("%s/%s", objMeta.GetNamespace(), objMeta.GetName())
}
return objMeta.GetName()
}

View File

@ -0,0 +1,356 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rest
import (
"context"
"fmt"
"regexp"
"testing"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
)
func TestWatchListResult(t *testing.T) {
scenarios := []struct {
name string
target WatchListResult
result runtime.Object
expectedResult *v1.PodList
expectedErr error
}{
{
name: "not a pointer",
result: fakeObj{},
expectedErr: fmt.Errorf("rest.fakeObj is not a list: expected pointer, but got rest.fakeObj type"),
},
{
name: "nil input won't panic",
result: nil,
expectedErr: fmt.Errorf("<nil> is not a list: expected pointer, but got invalid kind"),
},
{
name: "not a list",
result: &v1.Pod{},
expectedErr: fmt.Errorf("*v1.Pod is not a list: no Items field in this object"),
},
{
name: "an err is always returned",
result: nil,
target: WatchListResult{err: fmt.Errorf("dummy err")},
expectedErr: fmt.Errorf("dummy err"),
},
{
name: "empty list",
result: &v1.PodList{},
expectedResult: &v1.PodList{
TypeMeta: metav1.TypeMeta{Kind: "PodList"},
Items: []v1.Pod{},
},
},
{
name: "gv is applied",
result: &v1.PodList{},
target: WatchListResult{gv: schema.GroupVersion{Group: "g", Version: "v"}},
expectedResult: &v1.PodList{
TypeMeta: metav1.TypeMeta{Kind: "PodList", APIVersion: "g/v"},
Items: []v1.Pod{},
},
},
{
name: "gv is applied, empty group",
result: &v1.PodList{},
target: WatchListResult{gv: schema.GroupVersion{Version: "v"}},
expectedResult: &v1.PodList{
TypeMeta: metav1.TypeMeta{Kind: "PodList", APIVersion: "v"},
Items: []v1.Pod{},
},
},
{
name: "rv is applied",
result: &v1.PodList{},
target: WatchListResult{initialEventsEndBookmarkRV: "100"},
expectedResult: &v1.PodList{
TypeMeta: metav1.TypeMeta{Kind: "PodList"},
ListMeta: metav1.ListMeta{ResourceVersion: "100"},
Items: []v1.Pod{},
},
},
{
name: "items are applied",
result: &v1.PodList{},
target: WatchListResult{items: []runtime.Object{makePod(1), makePod(2)}},
expectedResult: &v1.PodList{
TypeMeta: metav1.TypeMeta{Kind: "PodList"},
Items: []v1.Pod{*makePod(1), *makePod(2)},
},
},
{
name: "type mismatch",
result: &v1.PodList{},
target: WatchListResult{items: []runtime.Object{makeNamespace("1")}},
expectedErr: fmt.Errorf("received object type = v1.Namespace at index = 0, doesn't match the list item type = v1.Pod"),
},
}
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
err := scenario.target.Into(scenario.result)
if scenario.expectedErr != nil && err == nil {
t.Fatalf("expected an error = %v, got nil", scenario.expectedErr)
}
if scenario.expectedErr == nil && err != nil {
t.Fatalf("didn't expect an error, got = %v", err)
}
if err != nil {
if scenario.expectedErr.Error() != err.Error() {
t.Fatalf("unexpected err = %v, expected = %v", err, scenario.expectedErr)
}
return
}
if !apiequality.Semantic.DeepEqual(scenario.expectedResult, scenario.result) {
t.Errorf("diff: %v", cmp.Diff(scenario.expectedResult, scenario.result))
}
})
}
}
func TestWatchListSuccess(t *testing.T) {
scenarios := []struct {
name string
gv schema.GroupVersion
watchEvents []watch.Event
expectedResult *v1.PodList
}{
{
name: "happy path",
// Note that the APIVersion for the core API group is "v1" (not "core/v1").
// We fake "core/v1" here to test if the Group part is properly
// recognized and set on the resulting object.
gv: schema.GroupVersion{Group: "core", Version: "v1"},
watchEvents: []watch.Event{
{Type: watch.Added, Object: makePod(1)},
{Type: watch.Added, Object: makePod(2)},
{Type: watch.Bookmark, Object: makeBookmarkEvent(5)},
},
expectedResult: &v1.PodList{
TypeMeta: metav1.TypeMeta{
APIVersion: "core/v1",
Kind: "PodList",
},
ListMeta: metav1.ListMeta{ResourceVersion: "5"},
Items: []v1.Pod{*makePod(1), *makePod(2)},
},
},
{
name: "APIVersion with only version provided is properly set",
gv: schema.GroupVersion{Version: "v1"},
watchEvents: []watch.Event{
{Type: watch.Added, Object: makePod(1)},
{Type: watch.Bookmark, Object: makeBookmarkEvent(5)},
},
expectedResult: &v1.PodList{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "PodList",
},
ListMeta: metav1.ListMeta{ResourceVersion: "5"},
Items: []v1.Pod{*makePod(1)},
},
},
{
name: "only the bookmark",
gv: schema.GroupVersion{Version: "v1"},
watchEvents: []watch.Event{
{Type: watch.Bookmark, Object: makeBookmarkEvent(5)},
},
expectedResult: &v1.PodList{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "PodList",
},
ListMeta: metav1.ListMeta{ResourceVersion: "5"},
Items: []v1.Pod{},
},
},
}
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
ctx := context.Background()
fakeWatcher := watch.NewFake()
target := &Request{
c: &RESTClient{
content: ClientContentConfig{
GroupVersion: scenario.gv,
},
},
}
go func(watchEvents []watch.Event) {
for _, watchEvent := range watchEvents {
fakeWatcher.Action(watchEvent.Type, watchEvent.Object)
}
}(scenario.watchEvents)
res := target.handleWatchList(ctx, fakeWatcher)
if res.err != nil {
t.Fatal(res.err)
}
result := &v1.PodList{}
if err := res.Into(result); err != nil {
t.Fatal(err)
}
if !apiequality.Semantic.DeepEqual(scenario.expectedResult, result) {
t.Errorf("diff: %v", cmp.Diff(scenario.expectedResult, result))
}
if !fakeWatcher.IsStopped() {
t.Fatalf("the watcher wasn't stopped")
}
})
}
}
func TestWatchListFailure(t *testing.T) {
scenarios := []struct {
name string
ctx context.Context
watcher *watch.FakeWatcher
watchEvents []watch.Event
expectedError error
}{
{
name: "request stop",
ctx: func() context.Context {
ctx, ctxCancel := context.WithCancel(context.TODO())
ctxCancel()
return ctx
}(),
watcher: watch.NewFake(),
expectedError: fmt.Errorf("context canceled"),
},
{
name: "stop watcher",
ctx: context.TODO(),
watcher: func() *watch.FakeWatcher {
w := watch.NewFake()
w.Stop()
return w
}(),
expectedError: fmt.Errorf("unexpected watch close"),
},
{
name: "stop on watch.Error",
ctx: context.TODO(),
watcher: watch.NewFake(),
watchEvents: []watch.Event{{Type: watch.Error, Object: &apierrors.NewInternalError(fmt.Errorf("dummy errror")).ErrStatus}},
expectedError: fmt.Errorf("Internal error occurred: dummy errror"),
},
{
name: "incorrect watch type (Deleted)",
ctx: context.TODO(),
watcher: watch.NewFake(),
watchEvents: []watch.Event{{Type: watch.Deleted, Object: makePod(1)}},
expectedError: fmt.Errorf("unexpected watch event .*, expected to only receive watch.Added and watch.Bookmark events"),
},
{
name: "incorrect watch type (Modified)",
ctx: context.TODO(),
watcher: watch.NewFake(),
watchEvents: []watch.Event{{Type: watch.Modified, Object: makePod(1)}},
expectedError: fmt.Errorf("unexpected watch event .*, expected to only receive watch.Added and watch.Bookmark events"),
},
{
name: "unordered input returns an error",
ctx: context.TODO(),
watcher: watch.NewFake(),
watchEvents: []watch.Event{{Type: watch.Added, Object: makePod(3)}, {Type: watch.Added, Object: makePod(1)}},
expectedError: fmt.Errorf("cannot add the obj .* with the key = ns/pod-1, as it violates the ordering guarantees provided by the watchlist feature in beta phase, lastInsertedKey was = ns/pod-3"),
},
}
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
target := &Request{}
go func(w *watch.FakeWatcher, watchEvents []watch.Event) {
for _, event := range watchEvents {
w.Action(event.Type, event.Object)
}
}(scenario.watcher, scenario.watchEvents)
res := target.handleWatchList(scenario.ctx, scenario.watcher)
resErr := res.Into(nil)
if resErr == nil {
t.Fatal("expected to get an error, got nil")
}
matched, err := regexp.MatchString(scenario.expectedError.Error(), resErr.Error())
if err != nil {
t.Fatal(err)
}
if !matched {
t.Fatalf("unexpected err = %v, expected = %v", resErr, scenario.expectedError)
}
if !scenario.watcher.IsStopped() {
t.Fatalf("the watcher wasn't stopped")
}
})
}
}
func makePod(rv uint64) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-%d", rv),
Namespace: "ns",
ResourceVersion: fmt.Sprintf("%d", rv),
Annotations: map[string]string{},
},
}
}
func makeNamespace(name string) *v1.Namespace {
return &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: name}}
}
func makeBookmarkEvent(rv uint64) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: fmt.Sprintf("%d", rv),
Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"},
},
}
}
type fakeObj struct {
}
func (f fakeObj) GetObjectKind() schema.ObjectKind {
return schema.EmptyObjectKind
}
func (f fakeObj) DeepCopyObject() runtime.Object {
return fakeObj{}
}