client-go/rest: rm watchlist

Kubernetes-commit: 6a37976848b43dae9aa82854b5c5d321f148bb0b
This commit is contained in:
Lukasz Szaszkiewicz
2025-06-13 10:48:45 +02:00
committed by Kubernetes Publisher
parent 03836b3bdc
commit 65a2176a48
2 changed files with 8 additions and 580 deletions

View File

@@ -19,7 +19,6 @@ package rest
import (
"bytes"
"context"
"encoding/base64"
"encoding/hex"
"fmt"
"io"
@@ -38,15 +37,12 @@ import (
"golang.org/x/net/http2"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
"k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/watch"
clientfeatures "k8s.io/client-go/features"
restclientwatch "k8s.io/client-go/rest/watch"
"k8s.io/client-go/tools/metrics"
"k8s.io/client-go/util/flowcontrol"
@@ -760,11 +756,6 @@ func (b *throttledLogger) info(logger klog.Logger, message string, kv ...any) {
// Watch attempts to begin watching the requested location.
// Returns a watch.Interface, or an error.
func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
w, _, e := r.watchInternal(ctx)
return w, e
}
func (r *Request) watchInternal(ctx context.Context) (watch.Interface, runtime.Decoder, error) {
if r.body == nil {
logBody(klog.FromContext(ctx), 2, "Request Body", r.bodyBytes)
}
@@ -772,7 +763,7 @@ func (r *Request) watchInternal(ctx context.Context) (watch.Interface, runtime.D
// We specifically don't want to rate limit watches, so we
// don't use r.rateLimiter here.
if r.err != nil {
return nil, nil, r.err
return nil, r.err
}
client := r.c.Client
@@ -792,12 +783,12 @@ func (r *Request) watchInternal(ctx context.Context) (watch.Interface, runtime.D
url := r.URL().String()
for {
if err := retry.Before(ctx, r); err != nil {
return nil, nil, retry.WrapPreviousError(err)
return nil, retry.WrapPreviousError(err)
}
req, err := r.newHTTPRequest(ctx)
if err != nil {
return nil, nil, err
return nil, err
}
resp, err := client.Do(req)
@@ -825,178 +816,19 @@ func (r *Request) watchInternal(ctx context.Context) (watch.Interface, runtime.D
}()
if done {
if isErrRetryableFunc(req, err) {
return watch.NewEmptyWatch(), nil, nil
return watch.NewEmptyWatch(), nil
}
if err == nil {
// if the server sent us an HTTP Response object,
// we need to return the error object from that.
err = transformErr
}
return nil, nil, retry.WrapPreviousError(err)
return nil, retry.WrapPreviousError(err)
}
}
}
type WatchListResult struct {
// err holds any errors we might have received
// during streaming.
err error
// items hold the collected data
items []runtime.Object
// initialEventsEndBookmarkRV holds the resource version
// extracted from the bookmark event that marks
// the end of the stream.
initialEventsEndBookmarkRV string
// negotiatedObjectDecoder knows how to decode
// the initialEventsListBlueprint
negotiatedObjectDecoder runtime.Decoder
// base64EncodedInitialEventsListBlueprint contains an empty,
// versioned list encoded in the requested format
// (e.g., protobuf, JSON, CBOR) and stored as a base64-encoded string
base64EncodedInitialEventsListBlueprint string
}
// Into stores the result into obj. The passed obj parameter must be a pointer to a list type.
//
// Note:
//
// Special attention should be given to the type *unstructured.Unstructured,
// which represents a list type but does not have an "Items" field.
// Users who directly use RESTClient may store the response in such an object.
// This particular case is not handled by the current implementation of this function,
// but may be considered for future updates.
func (r WatchListResult) Into(obj runtime.Object) error {
if r.err != nil {
return r.err
}
listItemsPtr, err := meta.GetItemsPtr(obj)
if err != nil {
return err
}
listVal, err := conversion.EnforcePtr(listItemsPtr)
if err != nil {
return err
}
if listVal.Kind() != reflect.Slice {
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
}
encodedInitialEventsListBlueprint, err := base64.StdEncoding.DecodeString(r.base64EncodedInitialEventsListBlueprint)
if err != nil {
return fmt.Errorf("failed to decode the received blueprint list, err %w", err)
}
err = runtime.DecodeInto(r.negotiatedObjectDecoder, encodedInitialEventsListBlueprint, obj)
if err != nil {
return err
}
if len(r.items) == 0 {
listVal.Set(reflect.MakeSlice(listVal.Type(), 0, 0))
} else {
listVal.Set(reflect.MakeSlice(listVal.Type(), len(r.items), len(r.items)))
for i, o := range r.items {
if listVal.Type().Elem() != reflect.TypeOf(o).Elem() {
return fmt.Errorf("received object type = %v at index = %d, doesn't match the list item type = %v", reflect.TypeOf(o).Elem(), i, listVal.Type().Elem())
}
listVal.Index(i).Set(reflect.ValueOf(o).Elem())
}
}
listMeta, err := meta.ListAccessor(obj)
if err != nil {
return err
}
listMeta.SetResourceVersion(r.initialEventsEndBookmarkRV)
return nil
}
// WatchList establishes a stream to get a consistent snapshot of data
// from the server as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#proposal
//
// Note that the watchlist requires properly setting the ListOptions
// otherwise it just establishes a regular watch with the server.
// Check the documentation https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists
// to see what parameters are currently required.
func (r *Request) WatchList(ctx context.Context) WatchListResult {
if r.body == nil {
logBody(klog.FromContext(ctx), 2, "Request Body", r.bodyBytes)
}
if !clientfeatures.FeatureGates().Enabled(clientfeatures.WatchListClient) {
return WatchListResult{err: fmt.Errorf("%q feature gate is not enabled", clientfeatures.WatchListClient)}
}
// TODO(#115478): consider validating request parameters (i.e sendInitialEvents).
// Most users use the generated client, which handles the proper setting of parameters.
// We don't have validation for other methods (e.g., the Watch)
// thus, for symmetry, we haven't added additional checks for the WatchList method.
w, d, err := r.watchInternal(ctx)
if err != nil {
return WatchListResult{err: err}
}
return r.handleWatchList(ctx, w, d)
}
// handleWatchList holds the actual logic for easier unit testing.
// Note that this function will close the passed watch.
func (r *Request) handleWatchList(ctx context.Context, w watch.Interface, negotiatedObjectDecoder runtime.Decoder) WatchListResult {
defer w.Stop()
var lastKey string
var items []runtime.Object
for {
select {
case <-ctx.Done():
return WatchListResult{err: ctx.Err()}
case event, ok := <-w.ResultChan():
if !ok {
return WatchListResult{err: fmt.Errorf("unexpected watch close")}
}
if event.Type == watch.Error {
return WatchListResult{err: errors.FromObject(event.Object)}
}
meta, err := meta.Accessor(event.Object)
if err != nil {
return WatchListResult{err: fmt.Errorf("failed to parse watch event: %#v", event)}
}
switch event.Type {
case watch.Added:
// the following check ensures that the response is ordered.
// earlier servers had a bug that caused them to not sort the output.
// in such cases, return an error which can trigger fallback logic.
key := objectKeyFromMeta(meta)
if len(lastKey) > 0 && lastKey > key {
return WatchListResult{err: fmt.Errorf("cannot add the obj (%#v) with the key = %s, as it violates the ordering guarantees provided by the watchlist feature in beta phase, lastInsertedKey was = %s", event.Object, key, lastKey)}
}
items = append(items, event.Object)
lastKey = key
case watch.Bookmark:
if meta.GetAnnotations()[metav1.InitialEventsAnnotationKey] == "true" {
base64EncodedInitialEventsListBlueprint := meta.GetAnnotations()[metav1.InitialEventsListBlueprintAnnotationKey]
if len(base64EncodedInitialEventsListBlueprint) == 0 {
return WatchListResult{err: fmt.Errorf("%q annotation is missing content", metav1.InitialEventsListBlueprintAnnotationKey)}
}
return WatchListResult{
items: items,
initialEventsEndBookmarkRV: meta.GetResourceVersion(),
negotiatedObjectDecoder: negotiatedObjectDecoder,
base64EncodedInitialEventsListBlueprint: base64EncodedInitialEventsListBlueprint,
}
}
default:
return WatchListResult{err: fmt.Errorf("unexpected watch event %#v, expected to only receive watch.Added and watch.Bookmark events", event)}
}
}
}
}
func (r *Request) newStreamWatcher(ctx context.Context, resp *http.Response) (watch.Interface, runtime.Decoder, error) {
func (r *Request) newStreamWatcher(ctx context.Context, resp *http.Response) (watch.Interface, error) {
contentType := resp.Header.Get("Content-Type")
mediaType, params, err := mime.ParseMediaType(contentType)
if err != nil {
@@ -1004,7 +836,7 @@ func (r *Request) newStreamWatcher(ctx context.Context, resp *http.Response) (wa
}
objectDecoder, streamingSerializer, framer, err := r.contentConfig.Negotiator.StreamDecoder(mediaType, params)
if err != nil {
return nil, nil, err
return nil, err
}
handleWarnings(ctx, resp.Header, r.warningHandler)
@@ -1018,7 +850,7 @@ func (r *Request) newStreamWatcher(ctx context.Context, resp *http.Response) (wa
// use 500 to indicate that the cause of the error is unknown - other error codes
// are more specific to HTTP interactions, and set a reason
errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"),
), objectDecoder, nil
), nil
}
// updateRequestResultMetric increments the RequestResult metric counter,
@@ -1727,10 +1559,3 @@ func ValidatePathSegmentName(name string, prefix bool) []string {
}
return IsValidPathSegmentName(name)
}
func objectKeyFromMeta(objMeta metav1.Object) string {
if len(objMeta.GetNamespace()) > 0 {
return fmt.Sprintf("%s/%s", objMeta.GetNamespace(), objMeta.GetName())
}
return objMeta.GetName()
}