mirror of
https://github.com/kubernetes/client-go.git
synced 2025-09-13 13:52:01 +00:00
client-go/rest/request: decodes initialEventsListBlueprint for watchlist requests
Kubernetes-commit: 7be192ae0bd5d9628c5ee37c2e8b843d602c0fa3
This commit is contained in:
committed by
Kubernetes Publisher
parent
9cff3e43bd
commit
d274c7ba36
@@ -19,6 +19,7 @@ package rest
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -701,6 +702,11 @@ func (b *throttledLogger) Infof(message string, args ...interface{}) {
|
||||
// Watch attempts to begin watching the requested location.
|
||||
// Returns a watch.Interface, or an error.
|
||||
func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
|
||||
w, _, e := r.watchInternal(ctx)
|
||||
return w, e
|
||||
}
|
||||
|
||||
func (r *Request) watchInternal(ctx context.Context) (watch.Interface, runtime.Decoder, error) {
|
||||
if r.body == nil {
|
||||
logBody(ctx, 2, "Request Body", r.bodyBytes)
|
||||
}
|
||||
@@ -708,7 +714,7 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
|
||||
// We specifically don't want to rate limit watches, so we
|
||||
// don't use r.rateLimiter here.
|
||||
if r.err != nil {
|
||||
return nil, r.err
|
||||
return nil, nil, r.err
|
||||
}
|
||||
|
||||
client := r.c.Client
|
||||
@@ -728,12 +734,12 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
|
||||
url := r.URL().String()
|
||||
for {
|
||||
if err := retry.Before(ctx, r); err != nil {
|
||||
return nil, retry.WrapPreviousError(err)
|
||||
return nil, nil, retry.WrapPreviousError(err)
|
||||
}
|
||||
|
||||
req, err := r.newHTTPRequest(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
@@ -761,14 +767,14 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
|
||||
}()
|
||||
if done {
|
||||
if isErrRetryableFunc(req, err) {
|
||||
return watch.NewEmptyWatch(), nil
|
||||
return watch.NewEmptyWatch(), nil, nil
|
||||
}
|
||||
if err == nil {
|
||||
// if the server sent us an HTTP Response object,
|
||||
// we need to return the error object from that.
|
||||
err = transformErr
|
||||
}
|
||||
return nil, retry.WrapPreviousError(err)
|
||||
return nil, nil, retry.WrapPreviousError(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -786,22 +792,35 @@ type WatchListResult struct {
|
||||
// the end of the stream.
|
||||
initialEventsEndBookmarkRV string
|
||||
|
||||
// gv represents the API version
|
||||
// it is used to construct the final list response
|
||||
// normally this information is filled by the server
|
||||
gv schema.GroupVersion
|
||||
// negotiatedObjectDecoder knows how to decode
|
||||
// the initialEventsListBlueprint
|
||||
negotiatedObjectDecoder runtime.Decoder
|
||||
|
||||
// base64EncodedInitialEventsListBlueprint contains an empty,
|
||||
// versioned list encoded in the requested format
|
||||
// (e.g., protobuf, JSON, CBOR) and stored as a base64-encoded string
|
||||
base64EncodedInitialEventsListBlueprint string
|
||||
}
|
||||
|
||||
// Into stores the result into obj. The passed obj parameter must be a pointer to a list type.
|
||||
//
|
||||
// Note:
|
||||
//
|
||||
// Special attention should be given to the type *unstructured.Unstructured,
|
||||
// which represents a list type but does not have an "Items" field.
|
||||
// Users who directly use RESTClient may store the response in such an object.
|
||||
// This particular case is not handled by the current implementation of this function,
|
||||
// but may be considered for future updates.
|
||||
func (r WatchListResult) Into(obj runtime.Object) error {
|
||||
if r.err != nil {
|
||||
return r.err
|
||||
}
|
||||
|
||||
listPtr, err := meta.GetItemsPtr(obj)
|
||||
listItemsPtr, err := meta.GetItemsPtr(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
listVal, err := conversion.EnforcePtr(listPtr)
|
||||
listVal, err := conversion.EnforcePtr(listItemsPtr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -809,6 +828,16 @@ func (r WatchListResult) Into(obj runtime.Object) error {
|
||||
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
|
||||
}
|
||||
|
||||
encodedInitialEventsListBlueprint, err := base64.StdEncoding.DecodeString(r.base64EncodedInitialEventsListBlueprint)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to decode the received blueprint list, err %w", err)
|
||||
}
|
||||
|
||||
err = runtime.DecodeInto(r.negotiatedObjectDecoder, encodedInitialEventsListBlueprint, obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(r.items) == 0 {
|
||||
listVal.Set(reflect.MakeSlice(listVal.Type(), 0, 0))
|
||||
} else {
|
||||
@@ -826,15 +855,6 @@ func (r WatchListResult) Into(obj runtime.Object) error {
|
||||
return err
|
||||
}
|
||||
listMeta.SetResourceVersion(r.initialEventsEndBookmarkRV)
|
||||
|
||||
typeMeta, err := meta.TypeAccessor(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
version := r.gv.String()
|
||||
typeMeta.SetAPIVersion(version)
|
||||
typeMeta.SetKind(reflect.TypeOf(obj).Elem().Name())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -857,16 +877,16 @@ func (r *Request) WatchList(ctx context.Context) WatchListResult {
|
||||
// Most users use the generated client, which handles the proper setting of parameters.
|
||||
// We don't have validation for other methods (e.g., the Watch)
|
||||
// thus, for symmetry, we haven't added additional checks for the WatchList method.
|
||||
w, err := r.Watch(ctx)
|
||||
w, d, err := r.watchInternal(ctx)
|
||||
if err != nil {
|
||||
return WatchListResult{err: err}
|
||||
}
|
||||
return r.handleWatchList(ctx, w)
|
||||
return r.handleWatchList(ctx, w, d)
|
||||
}
|
||||
|
||||
// handleWatchList holds the actual logic for easier unit testing.
|
||||
// Note that this function will close the passed watch.
|
||||
func (r *Request) handleWatchList(ctx context.Context, w watch.Interface) WatchListResult {
|
||||
func (r *Request) handleWatchList(ctx context.Context, w watch.Interface, negotiatedObjectDecoder runtime.Decoder) WatchListResult {
|
||||
defer w.Stop()
|
||||
var lastKey string
|
||||
var items []runtime.Object
|
||||
@@ -900,10 +920,15 @@ func (r *Request) handleWatchList(ctx context.Context, w watch.Interface) WatchL
|
||||
lastKey = key
|
||||
case watch.Bookmark:
|
||||
if meta.GetAnnotations()[metav1.InitialEventsAnnotationKey] == "true" {
|
||||
base64EncodedInitialEventsListBlueprint := meta.GetAnnotations()[metav1.InitialEventsListBlueprintAnnotationKey]
|
||||
if len(base64EncodedInitialEventsListBlueprint) == 0 {
|
||||
return WatchListResult{err: fmt.Errorf("%q annotation is missing content", metav1.InitialEventsListBlueprintAnnotationKey)}
|
||||
}
|
||||
return WatchListResult{
|
||||
items: items,
|
||||
initialEventsEndBookmarkRV: meta.GetResourceVersion(),
|
||||
gv: r.c.content.GroupVersion,
|
||||
items: items,
|
||||
initialEventsEndBookmarkRV: meta.GetResourceVersion(),
|
||||
negotiatedObjectDecoder: negotiatedObjectDecoder,
|
||||
base64EncodedInitialEventsListBlueprint: base64EncodedInitialEventsListBlueprint,
|
||||
}
|
||||
}
|
||||
default:
|
||||
@@ -913,7 +938,7 @@ func (r *Request) handleWatchList(ctx context.Context, w watch.Interface) WatchL
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error) {
|
||||
func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, runtime.Decoder, error) {
|
||||
contentType := resp.Header.Get("Content-Type")
|
||||
mediaType, params, err := mime.ParseMediaType(contentType)
|
||||
if err != nil {
|
||||
@@ -921,7 +946,7 @@ func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error)
|
||||
}
|
||||
objectDecoder, streamingSerializer, framer, err := r.c.content.Negotiator.StreamDecoder(mediaType, params)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
handleWarnings(resp.Header, r.warningHandler)
|
||||
@@ -934,7 +959,7 @@ func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error)
|
||||
// use 500 to indicate that the cause of the error is unknown - other error codes
|
||||
// are more specific to HTTP interactions, and set a reason
|
||||
errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"),
|
||||
), nil
|
||||
), objectDecoder, nil
|
||||
}
|
||||
|
||||
// updateRequestResultMetric increments the RequestResult metric counter,
|
||||
|
Reference in New Issue
Block a user