apiserver/handlers/watch: encode initialEventsListBlueprint with watchEncoder (#127587)

* apiserver/handlers/get: construct versionedList

* storage/cacher: document caching the serialization of bookmark events

* endpoints/handlers/response: add watchListTransformer

* endpoints/handlers/watch: wire watchListTransformer
This commit is contained in:
Lukasz Szaszkiewicz 2024-10-01 11:27:49 +02:00 committed by GitHub
parent 22a30e7cbb
commit fbf1a0dc18
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 262 additions and 19 deletions

View File

@ -265,6 +265,16 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
if timeout == 0 && minRequestTimeout > 0 { if timeout == 0 && minRequestTimeout > 0 {
timeout = time.Duration(float64(minRequestTimeout) * (rand.Float64() + 1.0)) timeout = time.Duration(float64(minRequestTimeout) * (rand.Float64() + 1.0))
} }
var emptyVersionedList runtime.Object
if isListWatchRequest(opts) {
emptyVersionedList, err = scope.Convertor.ConvertToVersion(r.NewList(), scope.Kind.GroupVersion())
if err != nil {
scope.err(errors.NewInternalError(err), w, req)
return
}
}
klog.V(3).InfoS("Starting watch", "path", req.URL.Path, "resourceVersion", opts.ResourceVersion, "labels", opts.LabelSelector, "fields", opts.FieldSelector, "timeout", timeout) klog.V(3).InfoS("Starting watch", "path", req.URL.Path, "resourceVersion", opts.ResourceVersion, "labels", opts.LabelSelector, "fields", opts.FieldSelector, "timeout", timeout)
ctx, cancel := context.WithTimeout(ctx, timeout) ctx, cancel := context.WithTimeout(ctx, timeout)
defer func() { cancel() }() defer func() { cancel() }()
@ -273,7 +283,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
scope.err(err, w, req) scope.err(err, w, req)
return return
} }
handler, err := serveWatchHandler(watcher, scope, outputMediaType, req, w, timeout, metrics.CleanListScope(ctx, &opts)) handler, err := serveWatchHandler(watcher, scope, outputMediaType, req, w, timeout, metrics.CleanListScope(ctx, &opts), emptyVersionedList)
if err != nil { if err != nil {
scope.err(err, w, req) scope.err(err, w, req)
return return

View File

@ -18,6 +18,7 @@ package handlers
import ( import (
"context" "context"
"encoding/base64"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
@ -38,8 +39,9 @@ import (
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/metrics" "k8s.io/apiserver/pkg/endpoints/metrics"
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request" endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/storage"
klog "k8s.io/klog/v2" "k8s.io/klog/v2"
) )
// watchEmbeddedEncoder performs encoding of the embedded object. // watchEmbeddedEncoder performs encoding of the embedded object.
@ -147,6 +149,8 @@ type watchEncoder struct {
encoder runtime.Encoder encoder runtime.Encoder
framer io.Writer framer io.Writer
watchListTransformerFn watchListTransformerFunction
buffer runtime.Splice buffer runtime.Splice
eventBuffer runtime.Splice eventBuffer runtime.Splice
@ -154,15 +158,16 @@ type watchEncoder struct {
identifiers map[watch.EventType]runtime.Identifier identifiers map[watch.EventType]runtime.Identifier
} }
func newWatchEncoder(ctx context.Context, kind schema.GroupVersionKind, embeddedEncoder runtime.Encoder, encoder runtime.Encoder, framer io.Writer) *watchEncoder { func newWatchEncoder(ctx context.Context, kind schema.GroupVersionKind, embeddedEncoder runtime.Encoder, encoder runtime.Encoder, framer io.Writer, watchListTransformerFn watchListTransformerFunction) *watchEncoder {
return &watchEncoder{ return &watchEncoder{
ctx: ctx, ctx: ctx,
kind: kind, kind: kind,
embeddedEncoder: embeddedEncoder, embeddedEncoder: embeddedEncoder,
encoder: encoder, encoder: encoder,
framer: framer, framer: framer,
buffer: runtime.NewSpliceBuffer(), watchListTransformerFn: watchListTransformerFn,
eventBuffer: runtime.NewSpliceBuffer(), buffer: runtime.NewSpliceBuffer(),
eventBuffer: runtime.NewSpliceBuffer(),
} }
} }
@ -174,6 +179,12 @@ func (e *watchEncoder) Encode(event watch.Event) error {
encodeFunc := func(obj runtime.Object, w io.Writer) error { encodeFunc := func(obj runtime.Object, w io.Writer) error {
return e.doEncode(obj, event, w) return e.doEncode(obj, event, w)
} }
if event.Type == watch.Bookmark {
// Bookmark objects are small, and we don't yet support serialization for them.
// Additionally, we need to additionally transform them to support watch-list feature
event = e.watchListTransformerFn(event)
return encodeFunc(event.Object, e.framer)
}
if co, ok := event.Object.(runtime.CacheableObject); ok { if co, ok := event.Object.(runtime.CacheableObject); ok {
return co.CacheEncode(e.identifier(event.Type), encodeFunc, e.framer) return co.CacheEncode(e.identifier(event.Type), encodeFunc, e.framer)
} }
@ -479,3 +490,94 @@ func asPartialObjectMetadataList(result runtime.Object, groupVersion schema.Grou
return nil, newNotAcceptableError(fmt.Sprintf("no PartialObjectMetadataList exists in group version %s", groupVersion)) return nil, newNotAcceptableError(fmt.Sprintf("no PartialObjectMetadataList exists in group version %s", groupVersion))
} }
} }
// watchListTransformerFunction an optional function
// applied to watchlist bookmark events that transforms
// the embedded object before sending it to a client.
type watchListTransformerFunction func(watch.Event) watch.Event
// watchListTransformer performs transformation of
// a special watchList bookmark event.
//
// The bookmark is annotated with InitialEventsListBlueprintAnnotationKey
// and contains an empty, versioned list that we must encode in the requested format
// (e.g., protobuf, JSON, CBOR) and then store as a base64-encoded string.
type watchListTransformer struct {
initialEventsListBlueprint runtime.Object
targetGVK *schema.GroupVersionKind
negotiatedEncoder runtime.Encoder
buffer runtime.Splice
}
// createWatchListTransformerIfRequested returns a transformer function for watchlist bookmark event.
func newWatchListTransformer(initialEventsListBlueprint runtime.Object, targetGVK *schema.GroupVersionKind, negotiatedEncoder runtime.Encoder) *watchListTransformer {
return &watchListTransformer{
initialEventsListBlueprint: initialEventsListBlueprint,
targetGVK: targetGVK,
negotiatedEncoder: negotiatedEncoder,
buffer: runtime.NewSpliceBuffer(),
}
}
func (e *watchListTransformer) transform(event watch.Event) watch.Event {
if e.initialEventsListBlueprint == nil {
return event
}
hasAnnotation, err := storage.HasInitialEventsEndBookmarkAnnotation(event.Object)
if err != nil {
return newWatchEventErrorFor(err)
}
if !hasAnnotation {
return event
}
if err = e.encodeInitialEventsListBlueprint(event.Object); err != nil {
return newWatchEventErrorFor(err)
}
return event
}
func (e *watchListTransformer) encodeInitialEventsListBlueprint(object runtime.Object) error {
initialEventsListBlueprint, err := e.transformInitialEventsListBlueprint()
if err != nil {
return err
}
defer e.buffer.Reset()
if err = e.negotiatedEncoder.Encode(initialEventsListBlueprint, e.buffer); err != nil {
return err
}
encodedInitialEventsListBlueprint := e.buffer.Bytes()
// the storage layer creates a deep copy of the obj before modifying it.
// since the object has the annotation, we can modify it directly.
objectMeta, err := meta.Accessor(object)
if err != nil {
return err
}
annotations := objectMeta.GetAnnotations()
annotations[metav1.InitialEventsListBlueprintAnnotationKey] = base64.StdEncoding.EncodeToString(encodedInitialEventsListBlueprint)
objectMeta.SetAnnotations(annotations)
return nil
}
func (e *watchListTransformer) transformInitialEventsListBlueprint() (runtime.Object, error) {
if e.targetGVK != nil && e.targetGVK.Kind == "PartialObjectMetadata" {
return asPartialObjectMetadataList(e.initialEventsListBlueprint, e.targetGVK.GroupVersion())
}
return e.initialEventsListBlueprint, nil
}
func newWatchEventErrorFor(err error) watch.Event {
return watch.Event{
Type: watch.Error,
Object: &metav1.Status{
Status: metav1.StatusFailure,
Message: err.Error(),
Reason: metav1.StatusReasonInternalError,
Code: http.StatusInternalServerError,
},
}
}

View File

@ -17,7 +17,9 @@ limitations under the License.
package handlers package handlers
import ( import (
"bytes"
"context" "context"
"encoding/base64"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
@ -25,13 +27,18 @@ import (
"testing" "testing"
"time" "time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1" metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
runtimejson "k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/watch"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1" examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/registry/rest"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
) )
var _ runtime.CacheableObject = &mockCacheableObject{} var _ runtime.CacheableObject = &mockCacheableObject{}
@ -222,3 +229,118 @@ func TestWatchEncoderIdentifier(t *testing.T) {
t.Error(" - watchEncoder::typeIdentifier to capture all relevant fields in identifier") t.Error(" - watchEncoder::typeIdentifier to capture all relevant fields in identifier")
} }
} }
func TestWatchListEncoder(t *testing.T) {
makePartialObjectMetadataListWithoutKind := func(rv string) *metav1.PartialObjectMetadataList {
return &metav1.PartialObjectMetadataList{
// do not set the type info to match
// newWatchListTransformer
ListMeta: metav1.ListMeta{ResourceVersion: rv},
}
}
makePodListWithKind := func(rv string) *v1.PodList {
return &v1.PodList{
TypeMeta: metav1.TypeMeta{
// set the type info so
// that it differs from
// PartialObjectMetadataList
Kind: "PodList",
},
ListMeta: metav1.ListMeta{
ResourceVersion: rv,
},
}
}
makeBookmarkEventFor := func(pod *v1.Pod) watch.Event {
return watch.Event{
Type: watch.Bookmark,
Object: pod,
}
}
makePod := func(name string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "ns",
Annotations: map[string]string{},
},
}
}
makePodWithInitialEventsAnnotation := func(name string) *v1.Pod {
p := makePod(name)
p.Annotations[metav1.InitialEventsAnnotationKey] = "true"
return p
}
scenarios := []struct {
name string
negotiatedEncoder runtime.Serializer
targetGVK *schema.GroupVersionKind
actualEvent watch.Event
listBlueprint runtime.Object
expectedBase64ListBlueprint string
}{
{
name: "pass through, an obj without the annotation received",
actualEvent: makeBookmarkEventFor(makePod("1")),
negotiatedEncoder: newJSONSerializer(),
},
{
name: "encodes the initialEventsListBlueprint if an obj with the annotation is passed",
actualEvent: makeBookmarkEventFor(makePodWithInitialEventsAnnotation("1")),
listBlueprint: makePodListWithKind("100"),
expectedBase64ListBlueprint: encodeObjectToBase64String(makePodListWithKind("100"), t),
negotiatedEncoder: newJSONSerializer(),
},
{
name: "encodes the initialEventsListBlueprint as PartialObjectMetadata when requested",
targetGVK: &schema.GroupVersionKind{Group: "meta.k8s.io", Version: "v1", Kind: "PartialObjectMetadata"},
actualEvent: makeBookmarkEventFor(makePodWithInitialEventsAnnotation("2")),
listBlueprint: makePodListWithKind("101"),
expectedBase64ListBlueprint: encodeObjectToBase64String(makePartialObjectMetadataListWithoutKind("101"), t),
negotiatedEncoder: newJSONSerializer(),
},
}
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
target := newWatchListTransformer(scenario.listBlueprint, scenario.targetGVK, scenario.negotiatedEncoder)
transformedEvent := target.transform(scenario.actualEvent)
actualObjectMeta, err := meta.Accessor(transformedEvent.Object)
if err != nil {
t.Fatal(err)
}
base64ListBlueprint, ok := actualObjectMeta.GetAnnotations()[metav1.InitialEventsListBlueprintAnnotationKey]
if !ok && len(scenario.expectedBase64ListBlueprint) != 0 {
t.Fatalf("the encoded obj doesn't have %q", metav1.InitialEventsListBlueprintAnnotationKey)
}
if base64ListBlueprint != scenario.expectedBase64ListBlueprint {
t.Fatalf("unexpected base64ListBlueprint = %s, expected = %s", base64ListBlueprint, scenario.expectedBase64ListBlueprint)
}
})
}
}
func encodeObjectToBase64String(obj runtime.Object, t *testing.T) string {
e := newJSONSerializer()
var buf bytes.Buffer
err := e.Encode(obj, &buf)
if err != nil {
t.Fatal(err)
}
return base64.StdEncoding.EncodeToString(buf.Bytes())
}
func newJSONSerializer() runtime.Serializer {
return runtimejson.NewSerializerWithOptions(
runtimejson.DefaultMetaFactory,
clientgoscheme.Scheme,
clientgoscheme.Scheme,
runtimejson.SerializerOptions{},
)
}

View File

@ -64,7 +64,7 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
// serveWatchHandler returns a handle to serve a watch response. // serveWatchHandler returns a handle to serve a watch response.
// TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled. // TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled.
func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration, metricsScope string) (http.Handler, error) { func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration, metricsScope string, initialEventsListBlueprint runtime.Object) (http.Handler, error) {
options, err := optionsForTransform(mediaTypeOptions, req) options, err := optionsForTransform(mediaTypeOptions, req)
if err != nil { if err != nil {
return nil, err return nil, err
@ -91,25 +91,25 @@ func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOp
ctx := req.Context() ctx := req.Context()
// locate the appropriate embedded encoder based on the transform // locate the appropriate embedded encoder based on the transform
var embeddedEncoder runtime.Encoder var negotiatedEncoder runtime.Encoder
contentKind, contentSerializer, transform := targetEncodingForTransform(scope, mediaTypeOptions, req) contentKind, contentSerializer, transform := targetEncodingForTransform(scope, mediaTypeOptions, req)
if transform { if transform {
info, ok := runtime.SerializerInfoForMediaType(contentSerializer.SupportedMediaTypes(), serializer.MediaType) info, ok := runtime.SerializerInfoForMediaType(contentSerializer.SupportedMediaTypes(), serializer.MediaType)
if !ok { if !ok {
return nil, fmt.Errorf("no encoder for %q exists in the requested target %#v", serializer.MediaType, contentSerializer) return nil, fmt.Errorf("no encoder for %q exists in the requested target %#v", serializer.MediaType, contentSerializer)
} }
embeddedEncoder = contentSerializer.EncoderForVersion(info.Serializer, contentKind.GroupVersion()) negotiatedEncoder = contentSerializer.EncoderForVersion(info.Serializer, contentKind.GroupVersion())
} else { } else {
embeddedEncoder = scope.Serializer.EncoderForVersion(serializer.Serializer, contentKind.GroupVersion()) negotiatedEncoder = scope.Serializer.EncoderForVersion(serializer.Serializer, contentKind.GroupVersion())
} }
var memoryAllocator runtime.MemoryAllocator var memoryAllocator runtime.MemoryAllocator
if encoderWithAllocator, supportsAllocator := embeddedEncoder.(runtime.EncoderWithAllocator); supportsAllocator { if encoderWithAllocator, supportsAllocator := negotiatedEncoder.(runtime.EncoderWithAllocator); supportsAllocator {
// don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call. // don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call.
// instead, we allocate the buffer for the entire watch session and release it when we close the connection. // instead, we allocate the buffer for the entire watch session and release it when we close the connection.
memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator) memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator)
embeddedEncoder = runtime.NewEncoderWithAllocator(encoderWithAllocator, memoryAllocator) negotiatedEncoder = runtime.NewEncoderWithAllocator(encoderWithAllocator, memoryAllocator)
} }
var tableOptions *metav1.TableOptions var tableOptions *metav1.TableOptions
if options != nil { if options != nil {
@ -119,7 +119,7 @@ func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOp
return nil, fmt.Errorf("unexpected options type: %T", options) return nil, fmt.Errorf("unexpected options type: %T", options)
} }
} }
embeddedEncoder = newWatchEmbeddedEncoder(ctx, embeddedEncoder, mediaTypeOptions.Convert, tableOptions, scope) embeddedEncoder := newWatchEmbeddedEncoder(ctx, negotiatedEncoder, mediaTypeOptions.Convert, tableOptions, scope)
if encoderWithAllocator, supportsAllocator := encoder.(runtime.EncoderWithAllocator); supportsAllocator { if encoderWithAllocator, supportsAllocator := encoder.(runtime.EncoderWithAllocator); supportsAllocator {
if memoryAllocator == nil { if memoryAllocator == nil {
@ -145,6 +145,8 @@ func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOp
Encoder: encoder, Encoder: encoder,
EmbeddedEncoder: embeddedEncoder, EmbeddedEncoder: embeddedEncoder,
watchListTransformerFn: newWatchListTransformer(initialEventsListBlueprint, mediaTypeOptions.Convert, negotiatedEncoder).transform,
MemoryAllocator: memoryAllocator, MemoryAllocator: memoryAllocator,
TimeoutFactory: &realTimeoutFactory{timeout}, TimeoutFactory: &realTimeoutFactory{timeout},
ServerShuttingDownCh: serverShuttingDownCh, ServerShuttingDownCh: serverShuttingDownCh,
@ -174,6 +176,10 @@ type WatchServer struct {
Encoder runtime.Encoder Encoder runtime.Encoder
// used to encode the nested object in the watch stream // used to encode the nested object in the watch stream
EmbeddedEncoder runtime.Encoder EmbeddedEncoder runtime.Encoder
// watchListTransformerFn a function applied
// to watchlist bookmark events that transforms
// the embedded object before sending it to a client.
watchListTransformerFn watchListTransformerFunction
MemoryAllocator runtime.MemoryAllocator MemoryAllocator runtime.MemoryAllocator
TimeoutFactory TimeoutFactory TimeoutFactory TimeoutFactory
@ -219,7 +225,7 @@ func (s *WatchServer) HandleHTTP(w http.ResponseWriter, req *http.Request) {
flusher.Flush() flusher.Flush()
kind := s.Scope.Kind kind := s.Scope.Kind
watchEncoder := newWatchEncoder(req.Context(), kind, s.EmbeddedEncoder, s.Encoder, framer) watchEncoder := newWatchEncoder(req.Context(), kind, s.EmbeddedEncoder, s.Encoder, framer, s.watchListTransformerFn)
ch := s.Watching.ResultChan() ch := s.Watching.ResultChan()
done := req.Context().Done() done := req.Context().Done()
@ -288,7 +294,7 @@ func (s *WatchServer) HandleWS(ws *websocket.Conn) {
framer := newWebsocketFramer(ws, s.UseTextFraming) framer := newWebsocketFramer(ws, s.UseTextFraming)
kind := s.Scope.Kind kind := s.Scope.Kind
watchEncoder := newWatchEncoder(context.TODO(), kind, s.EmbeddedEncoder, s.Encoder, framer) watchEncoder := newWatchEncoder(context.TODO(), kind, s.EmbeddedEncoder, s.Encoder, framer, s.watchListTransformerFn)
ch := s.Watching.ResultChan() ch := s.Watching.ResultChan()
for { for {

View File

@ -1130,6 +1130,9 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
// Since add() can block, we explicitly add when cacher is unlocked. // Since add() can block, we explicitly add when cacher is unlocked.
// Dispatching event in nonblocking way first, which make faster watchers // Dispatching event in nonblocking way first, which make faster watchers
// not be blocked by slower ones. // not be blocked by slower ones.
//
// Note: if we ever decide to cache the serialization of bookmark events,
// we will also need to modify the watchEncoder encoder
if event.Type == watch.Bookmark { if event.Type == watch.Bookmark {
for _, watcher := range c.watchersBuffer { for _, watcher := range c.watchersBuffer {
watcher.nonblockingAdd(event) watcher.nonblockingAdd(event)