handlers/watch: refactor watch serving to prepare offloading

Signed-off-by: Eric Lin <exlin@google.com>
This commit is contained in:
Eric Lin 2023-11-27 10:06:50 +00:00
parent 33beb81d8f
commit 87d817e62d
2 changed files with 35 additions and 20 deletions

View File

@ -265,9 +265,15 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
scope.err(err, w, req) scope.err(err, w, req)
return return
} }
handler, err := serveWatchHandler(watcher, scope, outputMediaType, req, w, timeout, metrics.CleanListScope(ctx, &opts))
if err != nil {
scope.err(err, w, req)
return
}
requestInfo, _ := request.RequestInfoFrom(ctx) requestInfo, _ := request.RequestInfoFrom(ctx)
metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() { metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
serveWatch(watcher, scope, outputMediaType, req, w, timeout, metrics.CleanListScope(ctx, &opts)) defer watcher.Stop()
handler.ServeHTTP(w, req)
}) })
return return
} }

View File

@ -62,30 +62,25 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
return t.C, t.Stop return t.C, t.Stop
} }
// serveWatch will serve a watch response. // serveWatchHandler returns a handle to serve a watch response.
// TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled. // TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled.
func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration, metricsScope string) { func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration, metricsScope string) (http.Handler, error) {
defer watcher.Stop()
options, err := optionsForTransform(mediaTypeOptions, req) options, err := optionsForTransform(mediaTypeOptions, req)
if err != nil { if err != nil {
scope.err(err, w, req) return nil, err
return
} }
// negotiate for the stream serializer from the scope's serializer // negotiate for the stream serializer from the scope's serializer
serializer, err := negotiation.NegotiateOutputMediaTypeStream(req, scope.Serializer, scope) serializer, err := negotiation.NegotiateOutputMediaTypeStream(req, scope.Serializer, scope)
if err != nil { if err != nil {
scope.err(err, w, req) return nil, err
return
} }
framer := serializer.StreamSerializer.Framer framer := serializer.StreamSerializer.Framer
streamSerializer := serializer.StreamSerializer.Serializer streamSerializer := serializer.StreamSerializer.Serializer
encoder := scope.Serializer.EncoderForVersion(streamSerializer, scope.Kind.GroupVersion()) encoder := scope.Serializer.EncoderForVersion(streamSerializer, scope.Kind.GroupVersion())
useTextFraming := serializer.EncodesAsText useTextFraming := serializer.EncodesAsText
if framer == nil { if framer == nil {
scope.err(fmt.Errorf("no framer defined for %q available for embedded encoding", serializer.MediaType), w, req) return nil, fmt.Errorf("no framer defined for %q available for embedded encoding", serializer.MediaType)
return
} }
// TODO: next step, get back mediaTypeOptions from negotiate and return the exact value here // TODO: next step, get back mediaTypeOptions from negotiate and return the exact value here
mediaType := serializer.MediaType mediaType := serializer.MediaType
@ -101,8 +96,7 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n
if transform { if transform {
info, ok := runtime.SerializerInfoForMediaType(contentSerializer.SupportedMediaTypes(), serializer.MediaType) info, ok := runtime.SerializerInfoForMediaType(contentSerializer.SupportedMediaTypes(), serializer.MediaType)
if !ok { if !ok {
scope.err(fmt.Errorf("no encoder for %q exists in the requested target %#v", serializer.MediaType, contentSerializer), w, req) return nil, fmt.Errorf("no encoder for %q exists in the requested target %#v", serializer.MediaType, contentSerializer)
return
} }
embeddedEncoder = contentSerializer.EncoderForVersion(info.Serializer, contentKind.GroupVersion()) embeddedEncoder = contentSerializer.EncoderForVersion(info.Serializer, contentKind.GroupVersion())
} else { } else {
@ -115,7 +109,6 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n
// don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call. // don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call.
// instead, we allocate the buffer for the entire watch session and release it when we close the connection. // instead, we allocate the buffer for the entire watch session and release it when we close the connection.
memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator) memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator)
defer runtime.AllocatorPool.Put(memoryAllocator)
embeddedEncoder = runtime.NewEncoderWithAllocator(encoderWithAllocator, memoryAllocator) embeddedEncoder = runtime.NewEncoderWithAllocator(encoderWithAllocator, memoryAllocator)
} }
var tableOptions *metav1.TableOptions var tableOptions *metav1.TableOptions
@ -123,8 +116,7 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n
if passedOptions, ok := options.(*metav1.TableOptions); ok { if passedOptions, ok := options.(*metav1.TableOptions); ok {
tableOptions = passedOptions tableOptions = passedOptions
} else { } else {
scope.err(fmt.Errorf("unexpected options type: %T", options), w, req) return nil, fmt.Errorf("unexpected options type: %T", options)
return
} }
} }
embeddedEncoder = newWatchEmbeddedEncoder(ctx, embeddedEncoder, mediaTypeOptions.Convert, tableOptions, scope) embeddedEncoder = newWatchEmbeddedEncoder(ctx, embeddedEncoder, mediaTypeOptions.Convert, tableOptions, scope)
@ -134,7 +126,6 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n
// don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call. // don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call.
// instead, we allocate the buffer for the entire watch session and release it when we close the connection. // instead, we allocate the buffer for the entire watch session and release it when we close the connection.
memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator) memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator)
defer runtime.AllocatorPool.Put(memoryAllocator)
} }
encoder = runtime.NewEncoderWithAllocator(encoderWithAllocator, memoryAllocator) encoder = runtime.NewEncoderWithAllocator(encoderWithAllocator, memoryAllocator)
} }
@ -154,6 +145,7 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n
Encoder: encoder, Encoder: encoder,
EmbeddedEncoder: embeddedEncoder, EmbeddedEncoder: embeddedEncoder,
MemoryAllocator: memoryAllocator,
TimeoutFactory: &realTimeoutFactory{timeout}, TimeoutFactory: &realTimeoutFactory{timeout},
ServerShuttingDownCh: serverShuttingDownCh, ServerShuttingDownCh: serverShuttingDownCh,
@ -162,10 +154,9 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n
if wsstream.IsWebSocketRequest(req) { if wsstream.IsWebSocketRequest(req) {
w.Header().Set("Content-Type", server.MediaType) w.Header().Set("Content-Type", server.MediaType)
websocket.Handler(server.HandleWS).ServeHTTP(w, req) return websocket.Handler(server.HandleWS), nil
return
} }
server.HandleHTTP(w, req) return http.HandlerFunc(server.HandleHTTP), nil
} }
// WatchServer serves a watch.Interface over a websocket or vanilla HTTP. // WatchServer serves a watch.Interface over a websocket or vanilla HTTP.
@ -184,6 +175,7 @@ type WatchServer struct {
// used to encode the nested object in the watch stream // used to encode the nested object in the watch stream
EmbeddedEncoder runtime.Encoder EmbeddedEncoder runtime.Encoder
MemoryAllocator runtime.MemoryAllocator
TimeoutFactory TimeoutFactory TimeoutFactory TimeoutFactory
ServerShuttingDownCh <-chan struct{} ServerShuttingDownCh <-chan struct{}
@ -193,6 +185,12 @@ type WatchServer struct {
// HandleHTTP serves a series of encoded events via HTTP with Transfer-Encoding: chunked. // HandleHTTP serves a series of encoded events via HTTP with Transfer-Encoding: chunked.
// or over a websocket connection. // or over a websocket connection.
func (s *WatchServer) HandleHTTP(w http.ResponseWriter, req *http.Request) { func (s *WatchServer) HandleHTTP(w http.ResponseWriter, req *http.Request) {
defer func() {
if s.MemoryAllocator != nil {
runtime.AllocatorPool.Put(s.MemoryAllocator)
}
}()
flusher, ok := w.(http.Flusher) flusher, ok := w.(http.Flusher)
if !ok { if !ok {
err := fmt.Errorf("unable to start watch - can't get http.Flusher: %#v", w) err := fmt.Errorf("unable to start watch - can't get http.Flusher: %#v", w)
@ -266,8 +264,17 @@ func (s *WatchServer) HandleHTTP(w http.ResponseWriter, req *http.Request) {
// HandleWS serves a series of encoded events over a websocket connection. // HandleWS serves a series of encoded events over a websocket connection.
func (s *WatchServer) HandleWS(ws *websocket.Conn) { func (s *WatchServer) HandleWS(ws *websocket.Conn) {
defer func() {
if s.MemoryAllocator != nil {
runtime.AllocatorPool.Put(s.MemoryAllocator)
}
}()
defer ws.Close() defer ws.Close()
done := make(chan struct{}) done := make(chan struct{})
// ensure the connection times out
timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh()
defer cleanup()
go func() { go func() {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
@ -288,6 +295,8 @@ func (s *WatchServer) HandleWS(ws *websocket.Conn) {
select { select {
case <-done: case <-done:
return return
case <-timeoutCh:
return
case event, ok := <-ch: case event, ok := <-ch:
if !ok { if !ok {
// End of results. // End of results.