update the watch server to use EncoderWithAllocator during object serialization.

It allows us to allocate a single buffer for the entire watch session and release it when a watch connection is closed.
Previously memory was allocated for every object serialization putting a lot of pressure on GC and consuming more memory than needed.
This commit is contained in:
Lukasz Szaszkiewicz 2022-02-17 16:04:50 +01:00
parent a06e272124
commit eda1b0c68e

View File

@ -19,10 +19,13 @@ package handlers
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"io"
"net/http" "net/http"
"reflect" "reflect"
"time" "time"
"golang.org/x/net/websocket"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
@ -32,8 +35,6 @@ import (
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation" "k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/metrics" "k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/util/wsstream" "k8s.io/apiserver/pkg/util/wsstream"
"golang.org/x/net/websocket"
) )
// nothing will ever be sent down this channel // nothing will ever be sent down this channel
@ -187,7 +188,17 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
s.Scope.err(errors.NewBadRequest(err.Error()), w, req) s.Scope.err(errors.NewBadRequest(err.Error()), w, req)
return return
} }
e := streaming.NewEncoder(framer, s.Encoder)
var e streaming.Encoder
var memoryAllocator runtime.MemoryAllocator
if encoder, supportsAllocator := s.Encoder.(runtime.EncoderWithAllocator); supportsAllocator {
memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator)
defer runtime.AllocatorPool.Put(memoryAllocator)
e = streaming.NewEncoderWithAllocator(framer, encoder, memoryAllocator)
} else {
e = streaming.NewEncoder(framer, s.Encoder)
}
// ensure the connection times out // ensure the connection times out
timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh() timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh()
@ -206,6 +217,19 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
ch := s.Watching.ResultChan() ch := s.Watching.ResultChan()
done := req.Context().Done() done := req.Context().Done()
embeddedEncodeFn := s.EmbeddedEncoder.Encode
if encoder, supportsAllocator := s.EmbeddedEncoder.(runtime.EncoderWithAllocator); supportsAllocator {
if memoryAllocator == nil {
// don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call.
// instead, we allocate the buffer for the entire watch session and release it when we close the connection.
memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator)
defer runtime.AllocatorPool.Put(memoryAllocator)
}
embeddedEncodeFn = func(obj runtime.Object, w io.Writer) error {
return encoder.EncodeWithAllocator(obj, w, memoryAllocator)
}
}
for { for {
select { select {
case <-done: case <-done:
@ -220,7 +244,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc() metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
obj := s.Fixup(event.Object) obj := s.Fixup(event.Object)
if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil { if err := embeddedEncodeFn(obj, buf); err != nil {
// unexpected error // unexpected error
utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", obj, err)) utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", obj, err))
return return