diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go index 818d94ae4c8..c155b925f78 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go @@ -19,10 +19,13 @@ package handlers import ( "bytes" "fmt" + "io" "net/http" "reflect" "time" + "golang.org/x/net/websocket" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -32,8 +35,6 @@ import ( "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" "k8s.io/apiserver/pkg/endpoints/metrics" "k8s.io/apiserver/pkg/util/wsstream" - - "golang.org/x/net/websocket" ) // nothing will ever be sent down this channel @@ -187,7 +188,17 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { s.Scope.err(errors.NewBadRequest(err.Error()), w, req) return } - e := streaming.NewEncoder(framer, s.Encoder) + + var e streaming.Encoder + var memoryAllocator runtime.MemoryAllocator + + if encoder, supportsAllocator := s.Encoder.(runtime.EncoderWithAllocator); supportsAllocator { + memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator) + defer runtime.AllocatorPool.Put(memoryAllocator) + e = streaming.NewEncoderWithAllocator(framer, encoder, memoryAllocator) + } else { + e = streaming.NewEncoder(framer, s.Encoder) + } // ensure the connection times out timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh() @@ -206,6 +217,19 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { ch := s.Watching.ResultChan() done := req.Context().Done() + embeddedEncodeFn := s.EmbeddedEncoder.Encode + if encoder, supportsAllocator := s.EmbeddedEncoder.(runtime.EncoderWithAllocator); supportsAllocator { + if memoryAllocator == nil { + // don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call. + // instead, we allocate the buffer for the entire watch session and release it when we close the connection. + memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator) + defer runtime.AllocatorPool.Put(memoryAllocator) + } + embeddedEncodeFn = func(obj runtime.Object, w io.Writer) error { + return encoder.EncodeWithAllocator(obj, w, memoryAllocator) + } + } + for { select { case <-done: @@ -220,7 +244,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc() obj := s.Fixup(event.Object) - if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil { + if err := embeddedEncodeFn(obj, buf); err != nil { // unexpected error utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", obj, err)) return