diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go index d3b501cf52a..a8f2eee72b5 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go @@ -265,9 +265,15 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc scope.err(err, w, req) return } + handler, err := serveWatchHandler(watcher, scope, outputMediaType, req, w, timeout, metrics.CleanListScope(ctx, &opts)) + if err != nil { + scope.err(err, w, req) + return + } requestInfo, _ := request.RequestInfoFrom(ctx) metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() { - serveWatch(watcher, scope, outputMediaType, req, w, timeout, metrics.CleanListScope(ctx, &opts)) + defer watcher.Stop() + handler.ServeHTTP(w, req) }) return } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go index 6043fec3b71..6a9257d10e0 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go @@ -62,30 +62,25 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) { return t.C, t.Stop } -// serveWatch will serve a watch response. +// serveWatchHandler returns a handle to serve a watch response. // TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled. -func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration, metricsScope string) { - defer watcher.Stop() - +func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration, metricsScope string) (http.Handler, error) { options, err := optionsForTransform(mediaTypeOptions, req) if err != nil { - scope.err(err, w, req) - return + return nil, err } // negotiate for the stream serializer from the scope's serializer serializer, err := negotiation.NegotiateOutputMediaTypeStream(req, scope.Serializer, scope) if err != nil { - scope.err(err, w, req) - return + return nil, err } framer := serializer.StreamSerializer.Framer streamSerializer := serializer.StreamSerializer.Serializer encoder := scope.Serializer.EncoderForVersion(streamSerializer, scope.Kind.GroupVersion()) useTextFraming := serializer.EncodesAsText if framer == nil { - scope.err(fmt.Errorf("no framer defined for %q available for embedded encoding", serializer.MediaType), w, req) - return + return nil, fmt.Errorf("no framer defined for %q available for embedded encoding", serializer.MediaType) } // TODO: next step, get back mediaTypeOptions from negotiate and return the exact value here mediaType := serializer.MediaType @@ -101,8 +96,7 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n if transform { info, ok := runtime.SerializerInfoForMediaType(contentSerializer.SupportedMediaTypes(), serializer.MediaType) if !ok { - scope.err(fmt.Errorf("no encoder for %q exists in the requested target %#v", serializer.MediaType, contentSerializer), w, req) - return + return nil, fmt.Errorf("no encoder for %q exists in the requested target %#v", serializer.MediaType, contentSerializer) } embeddedEncoder = contentSerializer.EncoderForVersion(info.Serializer, contentKind.GroupVersion()) } else { @@ -115,7 +109,6 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n // don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call. // instead, we allocate the buffer for the entire watch session and release it when we close the connection. memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator) - defer runtime.AllocatorPool.Put(memoryAllocator) embeddedEncoder = runtime.NewEncoderWithAllocator(encoderWithAllocator, memoryAllocator) } var tableOptions *metav1.TableOptions @@ -123,8 +116,7 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n if passedOptions, ok := options.(*metav1.TableOptions); ok { tableOptions = passedOptions } else { - scope.err(fmt.Errorf("unexpected options type: %T", options), w, req) - return + return nil, fmt.Errorf("unexpected options type: %T", options) } } embeddedEncoder = newWatchEmbeddedEncoder(ctx, embeddedEncoder, mediaTypeOptions.Convert, tableOptions, scope) @@ -134,7 +126,6 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n // don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call. // instead, we allocate the buffer for the entire watch session and release it when we close the connection. memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator) - defer runtime.AllocatorPool.Put(memoryAllocator) } encoder = runtime.NewEncoderWithAllocator(encoderWithAllocator, memoryAllocator) } @@ -154,6 +145,7 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n Encoder: encoder, EmbeddedEncoder: embeddedEncoder, + MemoryAllocator: memoryAllocator, TimeoutFactory: &realTimeoutFactory{timeout}, ServerShuttingDownCh: serverShuttingDownCh, @@ -162,10 +154,9 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n if wsstream.IsWebSocketRequest(req) { w.Header().Set("Content-Type", server.MediaType) - websocket.Handler(server.HandleWS).ServeHTTP(w, req) - return + return websocket.Handler(server.HandleWS), nil } - server.HandleHTTP(w, req) + return http.HandlerFunc(server.HandleHTTP), nil } // WatchServer serves a watch.Interface over a websocket or vanilla HTTP. @@ -184,6 +175,7 @@ type WatchServer struct { // used to encode the nested object in the watch stream EmbeddedEncoder runtime.Encoder + MemoryAllocator runtime.MemoryAllocator TimeoutFactory TimeoutFactory ServerShuttingDownCh <-chan struct{} @@ -193,6 +185,12 @@ type WatchServer struct { // HandleHTTP serves a series of encoded events via HTTP with Transfer-Encoding: chunked. // or over a websocket connection. func (s *WatchServer) HandleHTTP(w http.ResponseWriter, req *http.Request) { + defer func() { + if s.MemoryAllocator != nil { + runtime.AllocatorPool.Put(s.MemoryAllocator) + } + }() + flusher, ok := w.(http.Flusher) if !ok { err := fmt.Errorf("unable to start watch - can't get http.Flusher: %#v", w) @@ -266,8 +264,17 @@ func (s *WatchServer) HandleHTTP(w http.ResponseWriter, req *http.Request) { // HandleWS serves a series of encoded events over a websocket connection. func (s *WatchServer) HandleWS(ws *websocket.Conn) { + defer func() { + if s.MemoryAllocator != nil { + runtime.AllocatorPool.Put(s.MemoryAllocator) + } + }() + defer ws.Close() done := make(chan struct{}) + // ensure the connection times out + timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh() + defer cleanup() go func() { defer utilruntime.HandleCrash() @@ -288,6 +295,8 @@ func (s *WatchServer) HandleWS(ws *websocket.Conn) { select { case <-done: return + case <-timeoutCh: + return case event, ok := <-ch: if !ok { // End of results.