From 0dd495e6dc253f94b0ad0bb92178fb5e8981116b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Fri, 13 Oct 2023 10:48:16 +0200 Subject: [PATCH] Address review comments --- .../pkg/endpoints/handlers/response.go | 43 ++++++++----------- .../pkg/endpoints/handlers/response_test.go | 10 +++++ .../apiserver/pkg/endpoints/handlers/watch.go | 4 +- 3 files changed, 30 insertions(+), 27 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response.go index fef733605f4..348b1092d7d 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response.go @@ -145,38 +145,39 @@ type watchEncoder struct { kind schema.GroupVersionKind embeddedEncoder runtime.Encoder encoder runtime.Encoder + framer io.Writer - buffer runtime.Splice - unknown runtime.Unknown - internalEvent *metav1.InternalEvent - outEvent *metav1.WatchEvent - eventBuffer runtime.Splice + buffer runtime.Splice + eventBuffer runtime.Splice currentEmbeddedIdentifier runtime.Identifier identifiers map[watch.EventType]runtime.Identifier } -func newWatchEncoder(ctx context.Context, kind schema.GroupVersionKind, embeddedEncoder runtime.Encoder, encoder runtime.Encoder) *watchEncoder { +func newWatchEncoder(ctx context.Context, kind schema.GroupVersionKind, embeddedEncoder runtime.Encoder, encoder runtime.Encoder, framer io.Writer) *watchEncoder { return &watchEncoder{ ctx: ctx, kind: kind, embeddedEncoder: embeddedEncoder, encoder: encoder, + framer: framer, buffer: runtime.NewSpliceBuffer(), - internalEvent: &metav1.InternalEvent{}, - outEvent: &metav1.WatchEvent{}, eventBuffer: runtime.NewSpliceBuffer(), } } -func (e *watchEncoder) Encode(event watch.Event, w io.Writer) error { +// Encode encodes a given watch event. +// NOTE: if events object is implementing the CacheableObject interface, +// +// the serialized version is cached in that object [not the event itself]. +func (e *watchEncoder) Encode(event watch.Event) error { encodeFunc := func(obj runtime.Object, w io.Writer) error { return e.doEncode(obj, event, w) } if co, ok := event.Object.(runtime.CacheableObject); ok { - return co.CacheEncode(e.identifier(event.Type), encodeFunc, w) + return co.CacheEncode(e.identifier(event.Type), encodeFunc, e.framer) } - return encodeFunc(event.Object, w) + return encodeFunc(event.Object, e.framer) } func (e *watchEncoder) doEncode(obj runtime.Object, event watch.Event, w io.Writer) error { @@ -187,23 +188,15 @@ func (e *watchEncoder) doEncode(obj runtime.Object, event watch.Event, w io.Writ } // ContentType is not required here because we are defaulting to the serializer type. - e.unknown.Raw = e.buffer.Bytes() - event.Object = &e.unknown - metrics.WatchEventsSizes.WithContext(e.ctx).WithLabelValues(e.kind.Group, e.kind.Version, e.kind.Kind).Observe(float64(len(e.unknown.Raw))) - - *e.outEvent = metav1.WatchEvent{} - - // create the external type directly and encode it. Clients will only recognize the serialization we provide. - // The internal event is being reused, not reallocated so its just a few extra assignments to do it this way - // and we get the benefit of using conversion functions which already have to stay in sync - *e.internalEvent = metav1.InternalEvent(event) - if err := metav1.Convert_v1_InternalEvent_To_v1_WatchEvent(e.internalEvent, e.outEvent, nil); err != nil { - return fmt.Errorf("unable to convert watch object: %v", err) + outEvent := &metav1.WatchEvent{ + Type: string(event.Type), + Object: runtime.RawExtension{Raw: e.buffer.Bytes()}, } + metrics.WatchEventsSizes.WithContext(e.ctx).WithLabelValues(e.kind.Group, e.kind.Version, e.kind.Kind).Observe(float64(len(outEvent.Object.Raw))) defer e.eventBuffer.Reset() - if err := e.encoder.Encode(e.outEvent, e.eventBuffer); err != nil { - return fmt.Errorf("unable to encode watch object %T: %v (%#v)", e.outEvent, err, e) + if err := e.encoder.Encode(outEvent, e.eventBuffer); err != nil { + return fmt.Errorf("unable to encode watch object %T: %v (%#v)", outEvent, err, e) } _, err := w.Write(e.eventBuffer.Bytes()) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response_test.go index 69f9d44f328..03410ccd106 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response_test.go @@ -212,3 +212,13 @@ func TestAsPartialObjectMetadataList(t *testing.T) { }) } } + +func TestWatchEncoderIdentifier(t *testing.T) { + eventFields := reflect.VisibleFields(reflect.TypeOf(metav1.WatchEvent{})) + if len(eventFields) != 2 { + t.Error("New field was added to metav1.WatchEvent.") + t.Error(" Ensure that the following places are updated accordingly:") + t.Error(" - watchEncoder::doEncode method when creating outEvent") + t.Error(" - watchEncoder::typeIdentifier to capture all relevant fields in identifier") + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go index 5f35c284f25..6e86b79be55 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go @@ -222,7 +222,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusOK) flusher.Flush() - watchEncoder := newWatchEncoder(req.Context(), kind, s.EmbeddedEncoder, s.Encoder) + watchEncoder := newWatchEncoder(req.Context(), kind, s.EmbeddedEncoder, s.Encoder, framer) ch := s.Watching.ResultChan() done := req.Context().Done() @@ -249,7 +249,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc() isWatchListLatencyRecordingRequired := shouldRecordWatchListLatency(event) - if err := watchEncoder.Encode(event, framer); err != nil { + if err := watchEncoder.Encode(event); err != nil { utilruntime.HandleError(err) // client disconnect. return