Address review comments

This commit is contained in:
Wojciech Tyczyński 2023-10-13 10:48:16 +02:00
parent 7ff866463a
commit 0dd495e6dc
3 changed files with 30 additions and 27 deletions

View File

@ -145,38 +145,39 @@ type watchEncoder struct {
kind schema.GroupVersionKind
embeddedEncoder runtime.Encoder
encoder runtime.Encoder
framer io.Writer
buffer runtime.Splice
unknown runtime.Unknown
internalEvent *metav1.InternalEvent
outEvent *metav1.WatchEvent
eventBuffer runtime.Splice
buffer runtime.Splice
eventBuffer runtime.Splice
currentEmbeddedIdentifier runtime.Identifier
identifiers map[watch.EventType]runtime.Identifier
}
func newWatchEncoder(ctx context.Context, kind schema.GroupVersionKind, embeddedEncoder runtime.Encoder, encoder runtime.Encoder) *watchEncoder {
func newWatchEncoder(ctx context.Context, kind schema.GroupVersionKind, embeddedEncoder runtime.Encoder, encoder runtime.Encoder, framer io.Writer) *watchEncoder {
return &watchEncoder{
ctx: ctx,
kind: kind,
embeddedEncoder: embeddedEncoder,
encoder: encoder,
framer: framer,
buffer: runtime.NewSpliceBuffer(),
internalEvent: &metav1.InternalEvent{},
outEvent: &metav1.WatchEvent{},
eventBuffer: runtime.NewSpliceBuffer(),
}
}
func (e *watchEncoder) Encode(event watch.Event, w io.Writer) error {
// Encode encodes a given watch event.
// NOTE: if events object is implementing the CacheableObject interface,
//
// the serialized version is cached in that object [not the event itself].
func (e *watchEncoder) Encode(event watch.Event) error {
encodeFunc := func(obj runtime.Object, w io.Writer) error {
return e.doEncode(obj, event, w)
}
if co, ok := event.Object.(runtime.CacheableObject); ok {
return co.CacheEncode(e.identifier(event.Type), encodeFunc, w)
return co.CacheEncode(e.identifier(event.Type), encodeFunc, e.framer)
}
return encodeFunc(event.Object, w)
return encodeFunc(event.Object, e.framer)
}
func (e *watchEncoder) doEncode(obj runtime.Object, event watch.Event, w io.Writer) error {
@ -187,23 +188,15 @@ func (e *watchEncoder) doEncode(obj runtime.Object, event watch.Event, w io.Writ
}
// ContentType is not required here because we are defaulting to the serializer type.
e.unknown.Raw = e.buffer.Bytes()
event.Object = &e.unknown
metrics.WatchEventsSizes.WithContext(e.ctx).WithLabelValues(e.kind.Group, e.kind.Version, e.kind.Kind).Observe(float64(len(e.unknown.Raw)))
*e.outEvent = metav1.WatchEvent{}
// create the external type directly and encode it. Clients will only recognize the serialization we provide.
// The internal event is being reused, not reallocated so its just a few extra assignments to do it this way
// and we get the benefit of using conversion functions which already have to stay in sync
*e.internalEvent = metav1.InternalEvent(event)
if err := metav1.Convert_v1_InternalEvent_To_v1_WatchEvent(e.internalEvent, e.outEvent, nil); err != nil {
return fmt.Errorf("unable to convert watch object: %v", err)
outEvent := &metav1.WatchEvent{
Type: string(event.Type),
Object: runtime.RawExtension{Raw: e.buffer.Bytes()},
}
metrics.WatchEventsSizes.WithContext(e.ctx).WithLabelValues(e.kind.Group, e.kind.Version, e.kind.Kind).Observe(float64(len(outEvent.Object.Raw)))
defer e.eventBuffer.Reset()
if err := e.encoder.Encode(e.outEvent, e.eventBuffer); err != nil {
return fmt.Errorf("unable to encode watch object %T: %v (%#v)", e.outEvent, err, e)
if err := e.encoder.Encode(outEvent, e.eventBuffer); err != nil {
return fmt.Errorf("unable to encode watch object %T: %v (%#v)", outEvent, err, e)
}
_, err := w.Write(e.eventBuffer.Bytes())

View File

@ -212,3 +212,13 @@ func TestAsPartialObjectMetadataList(t *testing.T) {
})
}
}
func TestWatchEncoderIdentifier(t *testing.T) {
eventFields := reflect.VisibleFields(reflect.TypeOf(metav1.WatchEvent{}))
if len(eventFields) != 2 {
t.Error("New field was added to metav1.WatchEvent.")
t.Error(" Ensure that the following places are updated accordingly:")
t.Error(" - watchEncoder::doEncode method when creating outEvent")
t.Error(" - watchEncoder::typeIdentifier to capture all relevant fields in identifier")
}
}

View File

@ -222,7 +222,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK)
flusher.Flush()
watchEncoder := newWatchEncoder(req.Context(), kind, s.EmbeddedEncoder, s.Encoder)
watchEncoder := newWatchEncoder(req.Context(), kind, s.EmbeddedEncoder, s.Encoder, framer)
ch := s.Watching.ResultChan()
done := req.Context().Done()
@ -249,7 +249,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
isWatchListLatencyRecordingRequired := shouldRecordWatchListLatency(event)
if err := watchEncoder.Encode(event, framer); err != nil {
if err := watchEncoder.Encode(event); err != nil {
utilruntime.HandleError(err)
// client disconnect.
return