Refactor WatchServer to prepare for using encoders

This commit is contained in:
Wojciech Tyczyński 2023-08-03 21:50:08 +02:00
parent 10c622e99a
commit ff56d3b691
3 changed files with 45 additions and 44 deletions

View File

@ -257,3 +257,26 @@ func (d WithoutVersionDecoder) Decode(data []byte, defaults *schema.GroupVersion
}
return obj, gvk, err
}
type encoderWithAllocator struct {
encoder EncoderWithAllocator
memAllocator MemoryAllocator
}
// NewEncoderWithAllocator returns a new encoder
func NewEncoderWithAllocator(e EncoderWithAllocator, a MemoryAllocator) Encoder {
return &encoderWithAllocator{
encoder: e,
memAllocator: a,
}
}
// Encode writes the provided object to the nested writer
func (e *encoderWithAllocator) Encode(obj Object, w io.Writer) error {
return e.encoder.EncodeWithAllocator(obj, w, e.memAllocator)
}
// Identifier returns identifier of this encoder.
func (e *encoderWithAllocator) Identifier() Identifier {
return e.encoder.Identifier()
}

View File

@ -134,23 +134,3 @@ func (e *encoder) Encode(obj runtime.Object) error {
e.buf.Reset()
return err
}
type encoderWithAllocator struct {
writer io.Writer
encoder runtime.EncoderWithAllocator
memAllocator runtime.MemoryAllocator
}
// NewEncoderWithAllocator returns a new streaming encoder
func NewEncoderWithAllocator(w io.Writer, e runtime.EncoderWithAllocator, a runtime.MemoryAllocator) Encoder {
return &encoderWithAllocator{
writer: w,
encoder: e,
memAllocator: a,
}
}
// Encode writes the provided object to the nested writer
func (e *encoderWithAllocator) Encode(obj runtime.Object) error {
return e.encoder.EncodeWithAllocator(obj, e.writer, e.memAllocator)
}

View File

@ -19,7 +19,6 @@ package handlers
import (
"bytes"
"fmt"
"io"
"net/http"
"reflect"
"time"
@ -106,6 +105,26 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n
embeddedEncoder = scope.Serializer.EncoderForVersion(serializer.Serializer, contentKind.GroupVersion())
}
var memoryAllocator runtime.MemoryAllocator
if encoderWithAllocator, supportsAllocator := embeddedEncoder.(runtime.EncoderWithAllocator); supportsAllocator {
// don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call.
// instead, we allocate the buffer for the entire watch session and release it when we close the connection.
memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator)
defer runtime.AllocatorPool.Put(memoryAllocator)
embeddedEncoder = runtime.NewEncoderWithAllocator(encoderWithAllocator, memoryAllocator)
}
if encoderWithAllocator, supportsAllocator := encoder.(runtime.EncoderWithAllocator); supportsAllocator {
if memoryAllocator == nil {
// don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call.
// instead, we allocate the buffer for the entire watch session and release it when we close the connection.
memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator)
defer runtime.AllocatorPool.Put(memoryAllocator)
}
encoder = runtime.NewEncoderWithAllocator(encoderWithAllocator, memoryAllocator)
}
var serverShuttingDownCh <-chan struct{}
if signals := apirequest.ServerShutdownSignalFrom(req.Context()); signals != nil {
serverShuttingDownCh = signals.ShuttingDown()
@ -196,15 +215,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
var e streaming.Encoder
var memoryAllocator runtime.MemoryAllocator
if encoder, supportsAllocator := s.Encoder.(runtime.EncoderWithAllocator); supportsAllocator {
memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator)
defer runtime.AllocatorPool.Put(memoryAllocator)
e = streaming.NewEncoderWithAllocator(framer, encoder, memoryAllocator)
} else {
e = streaming.NewEncoder(framer, s.Encoder)
}
e = streaming.NewEncoder(framer, s.Encoder)
// ensure the connection times out
timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh()
@ -223,19 +234,6 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
ch := s.Watching.ResultChan()
done := req.Context().Done()
embeddedEncodeFn := s.EmbeddedEncoder.Encode
if encoder, supportsAllocator := s.EmbeddedEncoder.(runtime.EncoderWithAllocator); supportsAllocator {
if memoryAllocator == nil {
// don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call.
// instead, we allocate the buffer for the entire watch session and release it when we close the connection.
memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator)
defer runtime.AllocatorPool.Put(memoryAllocator)
}
embeddedEncodeFn = func(obj runtime.Object, w io.Writer) error {
return encoder.EncodeWithAllocator(obj, w, memoryAllocator)
}
}
for {
select {
case <-s.ServerShuttingDownCh:
@ -259,7 +257,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
obj := s.Fixup(event.Object)
if err := embeddedEncodeFn(obj, buf); err != nil {
if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil {
// unexpected error
utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", obj, err))
return