Merge pull request #122115 from wojtek-t/cleanup_watch_handler

Minor cleanups in watch handler toward unification between http and websockets
This commit is contained in:
Kubernetes Prow Robot 2023-12-14 06:17:49 +01:00 committed by GitHub
commit 3c8241f4f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 49 additions and 57 deletions

View File

@ -17,8 +17,9 @@ limitations under the License.
package handlers
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"time"
@ -159,7 +160,12 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n
metricsScope: metricsScope,
}
server.ServeHTTP(w, req)
if wsstream.IsWebSocketRequest(req) {
w.Header().Set("Content-Type", server.MediaType)
websocket.Handler(server.HandleWS).ServeHTTP(w, req)
return
}
server.HandleHTTP(w, req)
}
// WatchServer serves a watch.Interface over a websocket or vanilla HTTP.
@ -184,17 +190,9 @@ type WatchServer struct {
metricsScope string
}
// ServeHTTP serves a series of encoded events via HTTP with Transfer-Encoding: chunked
// HandleHTTP serves a series of encoded events via HTTP with Transfer-Encoding: chunked.
// or over a websocket connection.
func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
kind := s.Scope.Kind
if wsstream.IsWebSocketRequest(req) {
w.Header().Set("Content-Type", s.MediaType)
websocket.Handler(s.HandleWS).ServeHTTP(w, req)
return
}
func (s *WatchServer) HandleHTTP(w http.ResponseWriter, req *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
err := fmt.Errorf("unable to start watch - can't get http.Flusher: %#v", w)
@ -222,6 +220,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK)
flusher.Flush()
kind := s.Scope.Kind
watchEncoder := newWatchEncoder(req.Context(), kind, s.EmbeddedEncoder, s.Encoder, framer)
ch := s.Watching.ResultChan()
done := req.Context().Done()
@ -265,7 +264,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
}
// HandleWS implements a websocket handler.
// HandleWS serves a series of encoded events over a websocket connection.
func (s *WatchServer) HandleWS(ws *websocket.Conn) {
defer ws.Close()
done := make(chan struct{})
@ -279,10 +278,10 @@ func (s *WatchServer) HandleWS(ws *websocket.Conn) {
close(done)
}()
var unknown runtime.Unknown
internalEvent := &metav1.InternalEvent{}
buf := &bytes.Buffer{}
streamBuf := &bytes.Buffer{}
framer := newWebsocketFramer(ws, s.UseTextFraming)
kind := s.Scope.Kind
watchEncoder := newWatchEncoder(context.TODO(), kind, s.EmbeddedEncoder, s.Encoder, framer)
ch := s.Watching.ResultChan()
for {
@ -295,51 +294,44 @@ func (s *WatchServer) HandleWS(ws *websocket.Conn) {
return
}
if err := s.EmbeddedEncoder.Encode(event.Object, buf); err != nil {
// unexpected error
utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", event.Object, err))
return
}
// ContentType is not required here because we are defaulting to the serializer
// type
unknown.Raw = buf.Bytes()
event.Object = &unknown
// the internal event will be versioned by the encoder
// create the external type directly and encode it. Clients will only recognize the serialization we provide.
// The internal event is being reused, not reallocated so its just a few extra assignments to do it this way
// and we get the benefit of using conversion functions which already have to stay in sync
outEvent := &metav1.WatchEvent{}
*internalEvent = metav1.InternalEvent(event)
err := metav1.Convert_v1_InternalEvent_To_v1_WatchEvent(internalEvent, outEvent, nil)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to convert watch object: %v", err))
if err := watchEncoder.Encode(event); err != nil {
utilruntime.HandleError(err)
// client disconnect.
return
}
if err := s.Encoder.Encode(outEvent, streamBuf); err != nil {
// encoding error
utilruntime.HandleError(fmt.Errorf("unable to encode event: %v", err))
return
}
if s.UseTextFraming {
if err := websocket.Message.Send(ws, streamBuf.String()); err != nil {
// Client disconnect.
return
}
} else {
if err := websocket.Message.Send(ws, streamBuf.Bytes()); err != nil {
// Client disconnect.
return
}
}
buf.Reset()
streamBuf.Reset()
}
}
}
type websocketFramer struct {
ws *websocket.Conn
useTextFraming bool
}
func newWebsocketFramer(ws *websocket.Conn, useTextFraming bool) io.Writer {
return &websocketFramer{
ws: ws,
useTextFraming: useTextFraming,
}
}
func (w *websocketFramer) Write(p []byte) (int, error) {
if w.useTextFraming {
// bytes.Buffer::String() has a special handling of nil value, but given
// we're writing serialized watch events, this will never happen here.
if err := websocket.Message.Send(w.ws, string(p)); err != nil {
return 0, err
}
return len(p), nil
}
if err := websocket.Message.Send(w.ws, p); err != nil {
return 0, err
}
return len(p), nil
}
var _ io.Writer = &websocketFramer{}
func shouldRecordWatchListLatency(event watch.Event) bool {
if event.Type != watch.Bookmark || !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) {
return false

View File

@ -612,7 +612,7 @@ func (t *fakeTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
}
// serveWatch will serve a watch response according to the watcher and watchServer.
// Before watchServer.ServeHTTP, an error may occur like k8s.io/apiserver/pkg/endpoints/handlers/watch.go#serveWatch does.
// Before watchServer.HandleHTTP, an error may occur like k8s.io/apiserver/pkg/endpoints/handlers/watch.go#serveWatch does.
func serveWatch(watcher watch.Interface, watchServer *handlers.WatchServer, preServeErr error) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
defer watcher.Stop()
@ -622,7 +622,7 @@ func serveWatch(watcher watch.Interface, watchServer *handlers.WatchServer, preS
return
}
watchServer.ServeHTTP(w, req)
watchServer.HandleHTTP(w, req)
}
}