From 7ff866463af46b5f7cf068ba8d51c68e417b9ece Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Fri, 25 Aug 2023 15:41:14 +0200 Subject: [PATCH] Refactor watch event serialization to allow caching --- .../pkg/endpoints/handlers/response.go | 116 ++++++++++++++++++ .../apiserver/pkg/endpoints/handlers/watch.go | 40 +----- test/integration/apiserver/apiserver_test.go | 103 +++++++++++++--- 3 files changed, 209 insertions(+), 50 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response.go index 02111d9b0e7..fef733605f4 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response.go @@ -33,8 +33,10 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" + "k8s.io/apiserver/pkg/endpoints/metrics" endpointsrequest "k8s.io/apiserver/pkg/endpoints/request" klog "k8s.io/klog/v2" @@ -135,6 +137,120 @@ func (e *watchEmbeddedEncoder) embeddedIdentifier() runtime.Identifier { return runtime.Identifier(result) } +// watchEncoder performs encoding of the watch events. +// +// NOTE: watchEncoder is NOT thread-safe. +type watchEncoder struct { + ctx context.Context + kind schema.GroupVersionKind + embeddedEncoder runtime.Encoder + encoder runtime.Encoder + + buffer runtime.Splice + unknown runtime.Unknown + internalEvent *metav1.InternalEvent + outEvent *metav1.WatchEvent + eventBuffer runtime.Splice + + currentEmbeddedIdentifier runtime.Identifier + identifiers map[watch.EventType]runtime.Identifier +} + +func newWatchEncoder(ctx context.Context, kind schema.GroupVersionKind, embeddedEncoder runtime.Encoder, encoder runtime.Encoder) *watchEncoder { + return &watchEncoder{ + ctx: ctx, + kind: kind, + embeddedEncoder: embeddedEncoder, + encoder: encoder, + buffer: runtime.NewSpliceBuffer(), + internalEvent: &metav1.InternalEvent{}, + outEvent: &metav1.WatchEvent{}, + eventBuffer: runtime.NewSpliceBuffer(), + } +} + +func (e *watchEncoder) Encode(event watch.Event, w io.Writer) error { + encodeFunc := func(obj runtime.Object, w io.Writer) error { + return e.doEncode(obj, event, w) + } + if co, ok := event.Object.(runtime.CacheableObject); ok { + return co.CacheEncode(e.identifier(event.Type), encodeFunc, w) + } + return encodeFunc(event.Object, w) +} + +func (e *watchEncoder) doEncode(obj runtime.Object, event watch.Event, w io.Writer) error { + defer e.buffer.Reset() + + if err := e.embeddedEncoder.Encode(obj, e.buffer); err != nil { + return fmt.Errorf("unable to encode watch object %T: %v", obj, err) + } + + // ContentType is not required here because we are defaulting to the serializer type. + e.unknown.Raw = e.buffer.Bytes() + event.Object = &e.unknown + metrics.WatchEventsSizes.WithContext(e.ctx).WithLabelValues(e.kind.Group, e.kind.Version, e.kind.Kind).Observe(float64(len(e.unknown.Raw))) + + *e.outEvent = metav1.WatchEvent{} + + // create the external type directly and encode it. Clients will only recognize the serialization we provide. + // The internal event is being reused, not reallocated so its just a few extra assignments to do it this way + // and we get the benefit of using conversion functions which already have to stay in sync + *e.internalEvent = metav1.InternalEvent(event) + if err := metav1.Convert_v1_InternalEvent_To_v1_WatchEvent(e.internalEvent, e.outEvent, nil); err != nil { + return fmt.Errorf("unable to convert watch object: %v", err) + } + + defer e.eventBuffer.Reset() + if err := e.encoder.Encode(e.outEvent, e.eventBuffer); err != nil { + return fmt.Errorf("unable to encode watch object %T: %v (%#v)", e.outEvent, err, e) + } + + _, err := w.Write(e.eventBuffer.Bytes()) + return err +} + +type watchEncoderIdentifier struct { + Name string `json:"name,omitempty"` + EmbeddedEncoder string `json:"embeddedEncoder,omitempty"` + Encoder string `json:"encoder,omitempty"` + EventType string `json:"eventType,omitempty"` +} + +func (e *watchEncoder) identifier(eventType watch.EventType) runtime.Identifier { + // We need to take into account that in embeddedEncoder includes table + // transformer, then its identifier is dynamic. As a result, whenever + // the identifier of embeddedEncoder changes, we need to invalidate the + // whole identifiers cache. + // TODO(wojtek-t): Can we optimize it somehow? + if e.currentEmbeddedIdentifier != e.embeddedEncoder.Identifier() { + e.currentEmbeddedIdentifier = e.embeddedEncoder.Identifier() + e.identifiers = map[watch.EventType]runtime.Identifier{} + } + if _, ok := e.identifiers[eventType]; !ok { + e.identifiers[eventType] = e.typeIdentifier(eventType) + } + return e.identifiers[eventType] +} + +func (e *watchEncoder) typeIdentifier(eventType watch.EventType) runtime.Identifier { + // The eventType is a non-standard pattern. This is coming from the fact + // that we're effectively serializing the whole watch event, but storing + // it in serializations of the Object within the watch event. + identifier := watchEncoderIdentifier{ + Name: "watch", + EmbeddedEncoder: string(e.embeddedEncoder.Identifier()), + Encoder: string(e.encoder.Identifier()), + EventType: string(eventType), + } + + result, err := json.Marshal(identifier) + if err != nil { + klog.Fatalf("Failed marshaling identifier for watchEncoder: %v", err) + } + return runtime.Identifier(result) +} + // doTransformResponseObject is used for handling all requests, including watch. func doTransformObject(ctx context.Context, obj runtime.Object, opts interface{}, target *schema.GroupVersionKind, scope *RequestScope) (runtime.Object, error) { if _, ok := obj.(*metav1.Status); ok { diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go index e8eb0bfc263..5f35c284f25 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go @@ -27,7 +27,6 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/serializer/streaming" "k8s.io/apimachinery/pkg/util/httpstream/wsstream" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/watch" @@ -213,9 +212,6 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } - var e streaming.Encoder - e = streaming.NewEncoder(framer, s.Encoder) - // ensure the connection times out timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh() defer cleanup() @@ -226,10 +222,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusOK) flusher.Flush() - var unknown runtime.Unknown - internalEvent := &metav1.InternalEvent{} - outEvent := &metav1.WatchEvent{} - buf := runtime.NewSpliceBuffer() + watchEncoder := newWatchEncoder(req.Context(), kind, s.EmbeddedEncoder, s.Encoder) ch := s.Watching.ResultChan() done := req.Context().Done() @@ -256,43 +249,18 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc() isWatchListLatencyRecordingRequired := shouldRecordWatchListLatency(event) - if err := s.EmbeddedEncoder.Encode(event.Object, buf); err != nil { - // unexpected error - utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", event.Object, err)) - return - } - - // ContentType is not required here because we are defaulting to the serializer - // type - unknown.Raw = buf.Bytes() - event.Object = &unknown - metrics.WatchEventsSizes.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Observe(float64(len(unknown.Raw))) - - *outEvent = metav1.WatchEvent{} - - // create the external type directly and encode it. Clients will only recognize the serialization we provide. - // The internal event is being reused, not reallocated so its just a few extra assignments to do it this way - // and we get the benefit of using conversion functions which already have to stay in sync - *internalEvent = metav1.InternalEvent(event) - err := metav1.Convert_v1_InternalEvent_To_v1_WatchEvent(internalEvent, outEvent, nil) - if err != nil { - utilruntime.HandleError(fmt.Errorf("unable to convert watch object: %v", err)) - // client disconnect. - return - } - if err := e.Encode(outEvent); err != nil { - utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v (%#v)", outEvent, err, e)) + if err := watchEncoder.Encode(event, framer); err != nil { + utilruntime.HandleError(err) // client disconnect. return } + if len(ch) == 0 { flusher.Flush() } if isWatchListLatencyRecordingRequired { metrics.RecordWatchListLatency(req.Context(), s.Scope.Resource, s.metricsScope) } - - buf.Reset() } } } diff --git a/test/integration/apiserver/apiserver_test.go b/test/integration/apiserver/apiserver_test.go index 816220ee3e9..eb769406073 100644 --- a/test/integration/apiserver/apiserver_test.go +++ b/test/integration/apiserver/apiserver_test.go @@ -2361,6 +2361,19 @@ func TestWatchTransformCaching(t *testing.T) { } defer wTableIncludeObject.Close() + wTableIncludeObjectFiltered, err := clientSet.CoreV1().RESTClient().Get(). + AbsPath("/api/v1/namespaces/watch-transform/configmaps"). + SetHeader("Accept", "application/json;as=Table;g=meta.k8s.io;v=v1beta1"). + VersionedParams(listOptions, metav1.ParameterCodec). + Param("includeObject", string(metav1.IncludeObject)). + Param("labelSelector", "foo=bar"). + Timeout(timeout). + Stream(ctx) + if err != nil { + t.Fatalf("Failed to start table object watch: %v", err) + } + defer wTableIncludeObjectFiltered.Close() + configMap, err := clientSet.CoreV1().ConfigMaps("watch-transform").Create(ctx, &v1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{Name: "test1"}, Data: map[string]string{ @@ -2397,6 +2410,30 @@ func TestWatchTransformCaching(t *testing.T) { t.Fatalf("Failed to create a second configMap: %v", err) } + // Now update both configmaps so that filtering watch can observe them. + // This is needed to validate whether events caching done by apiserver + // distinguished objects by type. + + configMapUpdated, err := clientSet.CoreV1().ConfigMaps("watch-transform").Update(ctx, &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: "test1", Labels: map[string]string{"foo": "bar"}}, + Data: map[string]string{ + "foo": "baz", + }, + }, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update a configMap: %v", err) + } + + configMap2Updated, err := clientSet.CoreV1().ConfigMaps("watch-transform").Update(ctx, &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: "test2", Labels: map[string]string{"foo": "bar"}}, + Data: map[string]string{ + "foo": "baz", + }, + }, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update a second configMap: %v", err) + } + metaChecks := []partialObjectMetadataCheck{ func(res *metav1beta1.PartialObjectMetadata) { if !apiequality.Semantic.DeepEqual(configMap.ObjectMeta, res.ObjectMeta) { @@ -2408,6 +2445,16 @@ func TestWatchTransformCaching(t *testing.T) { t.Errorf("expected object: %#v, got: %#v", configMap2.ObjectMeta, res.ObjectMeta) } }, + func(res *metav1beta1.PartialObjectMetadata) { + if !apiequality.Semantic.DeepEqual(configMapUpdated.ObjectMeta, res.ObjectMeta) { + t.Errorf("expected object: %#v, got: %#v", configMapUpdated.ObjectMeta, res.ObjectMeta) + } + }, + func(res *metav1beta1.PartialObjectMetadata) { + if !apiequality.Semantic.DeepEqual(configMap2Updated.ObjectMeta, res.ObjectMeta) { + t.Errorf("expected object: %#v, got: %#v", configMap2Updated.ObjectMeta, res.ObjectMeta) + } + }, } expectPartialObjectMetaEventsProtobufChecks(t, wMeta, metaChecks) @@ -2421,39 +2468,67 @@ func TestWatchTransformCaching(t *testing.T) { } } - objectMetas := expectTableWatchEvents(t, 2, 3, metav1.IncludeMetadata, json.NewDecoder(wTableIncludeMeta)) + objectMetas := expectTableWatchEvents(t, 4, 3, metav1.IncludeMetadata, json.NewDecoder(wTableIncludeMeta)) tableMetaCheck(configMap, objectMetas[0]) tableMetaCheck(configMap2, objectMetas[1]) + tableMetaCheck(configMapUpdated, objectMetas[2]) + tableMetaCheck(configMap2Updated, objectMetas[3]) - tableObjectCheck := func(expected *v1.ConfigMap, got []byte) { + tableObjectCheck := func(expectedType watch.EventType, expectedObj *v1.ConfigMap, got streamedEvent) { var obj *v1.ConfigMap - if err := json.Unmarshal(got, &obj); err != nil { + if err := json.Unmarshal(got.rawObject, &obj); err != nil { t.Fatal(err) } + if expectedType != watch.EventType(got.eventType) { + t.Errorf("expected type: %#v, got: %#v", expectedType, got.eventType) + } obj.TypeMeta = metav1.TypeMeta{} - if !apiequality.Semantic.DeepEqual(expected, obj) { - t.Errorf("expected object: %#v, got: %#v", expected, obj) + if !apiequality.Semantic.DeepEqual(expectedObj, obj) { + t.Errorf("expected object: %#v, got: %#v", expectedObj, obj) } } - objects := expectTableWatchEvents(t, 2, 3, metav1.IncludeObject, json.NewDecoder(wTableIncludeObject)) - tableObjectCheck(configMap, objects[0]) - tableObjectCheck(configMap2, objects[1]) + objects := expectTableWatchEventsWithTypes(t, 4, 3, metav1.IncludeObject, json.NewDecoder(wTableIncludeObject)) + tableObjectCheck(watch.Added, configMap, objects[0]) + tableObjectCheck(watch.Added, configMap2, objects[1]) + tableObjectCheck(watch.Modified, configMapUpdated, objects[2]) + tableObjectCheck(watch.Modified, configMap2Updated, objects[3]) - delayedObjects := expectTableWatchEvents(t, 1, 3, metav1.IncludeObject, json.NewDecoder(wTableIncludeObjectDelayed)) - tableObjectCheck(configMap2, delayedObjects[0]) + filteredObjects := expectTableWatchEventsWithTypes(t, 2, 3, metav1.IncludeObject, json.NewDecoder(wTableIncludeObjectFiltered)) + tableObjectCheck(watch.Added, configMapUpdated, filteredObjects[0]) + tableObjectCheck(watch.Added, configMap2Updated, filteredObjects[1]) + + delayedObjects := expectTableWatchEventsWithTypes(t, 3, 3, metav1.IncludeObject, json.NewDecoder(wTableIncludeObjectDelayed)) + tableObjectCheck(watch.Added, configMap2, delayedObjects[0]) + tableObjectCheck(watch.Modified, configMapUpdated, delayedObjects[1]) + tableObjectCheck(watch.Modified, configMap2Updated, delayedObjects[2]) } func expectTableWatchEvents(t *testing.T, count, columns int, policy metav1.IncludeObjectPolicy, d *json.Decoder) [][]byte { + events := expectTableWatchEventsWithTypes(t, count, columns, policy, d) + var objects [][]byte + for _, event := range events { + objects = append(objects, event.rawObject) + } + return objects +} + +type streamedEvent struct { + eventType string + rawObject []byte +} + +func expectTableWatchEventsWithTypes(t *testing.T, count, columns int, policy metav1.IncludeObjectPolicy, d *json.Decoder) []streamedEvent { t.Helper() - var objects [][]byte + var events []streamedEvent for i := 0; i < count; i++ { var evt metav1.WatchEvent if err := d.Decode(&evt); err != nil { t.Fatal(err) } + var table metav1beta1.Table if err := json.Unmarshal(evt.Object.Raw, &table); err != nil { t.Fatal(err) @@ -2484,7 +2559,7 @@ func expectTableWatchEvents(t *testing.T, count, columns int, policy metav1.Incl if meta.TypeMeta != partialObj { t.Fatalf("expected partial object: %#v", meta) } - objects = append(objects, row.Object.Raw) + events = append(events, streamedEvent{eventType: evt.Type, rawObject: row.Object.Raw}) case metav1.IncludeNone: if len(row.Object.Raw) != 0 { t.Fatalf("Expected no object: %s", string(row.Object.Raw)) @@ -2493,10 +2568,10 @@ func expectTableWatchEvents(t *testing.T, count, columns int, policy metav1.Incl if len(row.Object.Raw) == 0 { t.Fatalf("Expected object: %s", string(row.Object.Raw)) } - objects = append(objects, row.Object.Raw) + events = append(events, streamedEvent{eventType: evt.Type, rawObject: row.Object.Raw}) } } - return objects + return events } func expectPartialObjectMetaEvents(t *testing.T, d *json.Decoder, values ...string) {