diff --git a/tools/remotecommand/errorstream.go b/tools/remotecommand/errorstream.go index 90bb39b4a..23dd50778 100644 --- a/tools/remotecommand/errorstream.go +++ b/tools/remotecommand/errorstream.go @@ -21,6 +21,7 @@ import ( "io" "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/klog/v2" ) // errorStreamDecoder interprets the data on the error channel and creates a go error object from it. @@ -32,11 +33,11 @@ type errorStreamDecoder interface { // decodes it with the given errorStreamDecoder, sends the decoded error (or nil if the remote // command exited successfully) to the returned error channel, and closes it. // This function returns immediately. -func watchErrorStream(errorStream io.Reader, d errorStreamDecoder) chan error { +func watchErrorStream(logger klog.Logger, errorStream io.Reader, d errorStreamDecoder) chan error { errorChan := make(chan error) go func() { - defer runtime.HandleCrash() + defer runtime.HandleCrashWithLogger(logger) message, err := io.ReadAll(errorStream) switch { diff --git a/tools/remotecommand/fallback.go b/tools/remotecommand/fallback.go index 503323080..bcd5fd313 100644 --- a/tools/remotecommand/fallback.go +++ b/tools/remotecommand/fallback.go @@ -53,7 +53,7 @@ func (f *FallbackExecutor) Stream(options StreamOptions) error { func (f *FallbackExecutor) StreamWithContext(ctx context.Context, options StreamOptions) error { err := f.primary.StreamWithContext(ctx, options) if err != nil && f.shouldFallback(err) { - klog.V(4).Infof("RemoteCommand fallback: %v", err) + klog.FromContext(ctx).V(4).Info("RemoteCommand fallback", "err", err) return f.secondary.StreamWithContext(ctx, options) } return err diff --git a/tools/remotecommand/remotecommand.go b/tools/remotecommand/remotecommand.go index 4cff05cd8..8663b88a8 100644 --- a/tools/remotecommand/remotecommand.go +++ b/tools/remotecommand/remotecommand.go @@ -22,6 +22,7 @@ import ( "net/http" "k8s.io/apimachinery/pkg/util/httpstream" + "k8s.io/klog/v2" ) // StreamOptions holds information pertaining to the current streaming session: @@ -54,5 +55,5 @@ type streamCreator interface { } type streamProtocolHandler interface { - stream(conn streamCreator, ready chan<- struct{}) error + stream(logger klog.Logger, conn streamCreator, ready chan<- struct{}) error } diff --git a/tools/remotecommand/spdy.go b/tools/remotecommand/spdy.go index 34825771a..2f36e925d 100644 --- a/tools/remotecommand/spdy.go +++ b/tools/remotecommand/spdy.go @@ -121,6 +121,7 @@ func (e *spdyStreamExecutor) newConnectionAndStream(ctx context.Context, options var streamer streamProtocolHandler + logger := klog.FromContext(ctx) switch protocol { case remotecommand.StreamProtocolV5Name: streamer = newStreamProtocolV5(options) @@ -131,7 +132,7 @@ func (e *spdyStreamExecutor) newConnectionAndStream(ctx context.Context, options case remotecommand.StreamProtocolV2Name: streamer = newStreamProtocolV2(options) case "": - klog.V(4).Infof("The server did not negotiate a streaming protocol version. Falling back to %s", remotecommand.StreamProtocolV1Name) + logger.V(4).Info("The server did not negotiate a streaming protocol version, falling back", "protocol", remotecommand.StreamProtocolV1Name) fallthrough case remotecommand.StreamProtocolV1Name: streamer = newStreamProtocolV1(options) @@ -161,7 +162,7 @@ func (e *spdyStreamExecutor) StreamWithContext(ctx context.Context, options Stre // The SPDY executor does not need to synchronize stream creation, so we pass a nil // ready channel. The underlying spdystream library handles stream multiplexing // without a race condition. - errorChan <- streamer.stream(conn, nil) + errorChan <- streamer.stream(klog.FromContext(ctx), conn, nil) }() select { diff --git a/tools/remotecommand/spdy_test.go b/tools/remotecommand/spdy_test.go index 9948832a5..35cd09a9f 100644 --- a/tools/remotecommand/spdy_test.go +++ b/tools/remotecommand/spdy_test.go @@ -38,6 +38,7 @@ import ( remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/rest" + "k8s.io/klog/v2/ktesting" ) type AttachFunc func(in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan TerminalSize) error @@ -328,6 +329,7 @@ func (w *writeDetector) Write(p []byte) (n int, err error) { // and expects the deferred close of the connection leads to the exit of the goroutine on cancellation. // This test verifies that works. func TestStreamExitsAfterConnectionIsClosed(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) writeDetector := newWriterDetector(&fakeEmptyDataPty{}) options := StreamOptions{ Stdin: &fakeEmptyDataPty{}, @@ -352,7 +354,7 @@ func TestStreamExitsAfterConnectionIsClosed(t *testing.T) { errorChan := make(chan error) go func() { - errorChan <- streamer.stream(conn, nil) + errorChan <- streamer.stream(logger, conn, nil) }() // Wait until stream goroutine starts. diff --git a/tools/remotecommand/v1.go b/tools/remotecommand/v1.go index 293d809dd..25337e68a 100644 --- a/tools/remotecommand/v1.go +++ b/tools/remotecommand/v1.go @@ -21,7 +21,7 @@ import ( "io" "net/http" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/httpstream" "k8s.io/klog/v2" ) @@ -47,15 +47,15 @@ func newStreamProtocolV1(options StreamOptions) streamProtocolHandler { } } -func (p *streamProtocolV1) stream(conn streamCreator, ready chan<- struct{}) error { +func (p *streamProtocolV1) stream(logger klog.Logger, conn streamCreator, ready chan<- struct{}) error { doneChan := make(chan struct{}, 2) errorChan := make(chan error) cp := func(s string, dst io.Writer, src io.Reader) { - klog.V(6).Infof("Copying %s", s) - defer klog.V(6).Infof("Done copying %s", s) + logger.V(6).Info("Copying", "data", s) + defer logger.V(6).Info("Done copying", "data", s) if _, err := io.Copy(dst, src); err != nil && err != io.EOF { - klog.Errorf("Error copying %s: %v", s, err) + logger.Error(err, "Error copying", "data", s) } if s == v1.StreamTypeStdout || s == v1.StreamTypeStderr { doneChan <- struct{}{} diff --git a/tools/remotecommand/v2.go b/tools/remotecommand/v2.go index a81538a09..75286a12f 100644 --- a/tools/remotecommand/v2.go +++ b/tools/remotecommand/v2.go @@ -22,8 +22,9 @@ import ( "net/http" "sync" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/klog/v2" ) // streamProtocolV2 implements version 2 of the streaming protocol for attach @@ -87,13 +88,13 @@ func (p *streamProtocolV2) createStreams(conn streamCreator) error { return nil } -func (p *streamProtocolV2) copyStdin() { +func (p *streamProtocolV2) copyStdin(logger klog.Logger) { if p.Stdin != nil { var once sync.Once // copy from client's stdin to container's stdin go func() { - defer runtime.HandleCrash() + defer runtime.HandleCrashWithLogger(logger) // if p.stdin is noninteractive, p.g. `echo abc | kubectl exec -i -- cat`, make sure // we close remoteStdin as soon as the copy from p.stdin to remoteStdin finishes. Otherwise @@ -101,7 +102,7 @@ func (p *streamProtocolV2) copyStdin() { defer once.Do(func() { p.remoteStdin.Close() }) if _, err := io.Copy(p.remoteStdin, readerWrapper{p.Stdin}); err != nil { - runtime.HandleError(err) + runtime.HandleErrorWithLogger(logger, err, "Copying stdin failed") } }() @@ -120,26 +121,26 @@ func (p *streamProtocolV2) copyStdin() { // When that happens, we must Close() on our side of remoteStdin, to // allow the copy in hijack to complete, and hijack to return. go func() { - defer runtime.HandleCrash() + defer runtime.HandleCrashWithLogger(logger) defer once.Do(func() { p.remoteStdin.Close() }) // this "copy" doesn't actually read anything - it's just here to wait for // the server to close remoteStdin. if _, err := io.Copy(io.Discard, p.remoteStdin); err != nil { - runtime.HandleError(err) + runtime.HandleErrorWithLogger(logger, err, "Waiting for server to close stdin failed") } }() } } -func (p *streamProtocolV2) copyStdout(wg *sync.WaitGroup) { +func (p *streamProtocolV2) copyStdout(logger klog.Logger, wg *sync.WaitGroup) { if p.Stdout == nil { return } wg.Add(1) go func() { - defer runtime.HandleCrash() + defer runtime.HandleCrashWithLogger(logger) defer wg.Done() // make sure, packet in queue can be consumed. // block in queue may lead to deadlock in conn.server @@ -147,29 +148,29 @@ func (p *streamProtocolV2) copyStdout(wg *sync.WaitGroup) { defer io.Copy(io.Discard, p.remoteStdout) if _, err := io.Copy(p.Stdout, p.remoteStdout); err != nil { - runtime.HandleError(err) + runtime.HandleErrorWithLogger(logger, err, "Copying stdout failed") } }() } -func (p *streamProtocolV2) copyStderr(wg *sync.WaitGroup) { +func (p *streamProtocolV2) copyStderr(logger klog.Logger, wg *sync.WaitGroup) { if p.Stderr == nil || p.Tty { return } wg.Add(1) go func() { - defer runtime.HandleCrash() + defer runtime.HandleCrashWithLogger(logger) defer wg.Done() defer io.Copy(io.Discard, p.remoteStderr) if _, err := io.Copy(p.Stderr, p.remoteStderr); err != nil { - runtime.HandleError(err) + runtime.HandleErrorWithLogger(logger, err, "Copying stderr failed") } }() } -func (p *streamProtocolV2) stream(conn streamCreator, ready chan<- struct{}) error { +func (p *streamProtocolV2) stream(logger klog.Logger, conn streamCreator, ready chan<- struct{}) error { if err := p.createStreams(conn); err != nil { return err } @@ -181,13 +182,13 @@ func (p *streamProtocolV2) stream(conn streamCreator, ready chan<- struct{}) err // now that all the streams have been created, proceed with reading & copying - errorChan := watchErrorStream(p.errorStream, &errorDecoderV2{}) + errorChan := watchErrorStream(logger, p.errorStream, &errorDecoderV2{}) - p.copyStdin() + p.copyStdin(logger) var wg sync.WaitGroup - p.copyStdout(&wg) - p.copyStderr(&wg) + p.copyStdout(logger, &wg) + p.copyStderr(logger, &wg) // we're waiting for stdout/stderr to finish copying wg.Wait() diff --git a/tools/remotecommand/v2_test.go b/tools/remotecommand/v2_test.go index 412cee8d2..e22dd685e 100644 --- a/tools/remotecommand/v2_test.go +++ b/tools/remotecommand/v2_test.go @@ -28,6 +28,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/httpstream" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2/ktesting" ) type fakeReader struct { @@ -179,6 +180,7 @@ func TestV2CreateStreams(t *testing.T) { } func TestV2ErrorStreamReading(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) tests := []struct { name string stream io.Reader @@ -217,7 +219,7 @@ func TestV2ErrorStreamReading(t *testing.T) { h := newStreamProtocolV2(StreamOptions{}).(*streamProtocolV2) h.errorStream = test.stream - ch := watchErrorStream(h.errorStream, &errorDecoderV2{}) + ch := watchErrorStream(logger, h.errorStream, &errorDecoderV2{}) if ch == nil { t.Fatalf("%s: unexpected nil channel", test.name) } diff --git a/tools/remotecommand/v3.go b/tools/remotecommand/v3.go index ece4cfafe..b1e533a8a 100644 --- a/tools/remotecommand/v3.go +++ b/tools/remotecommand/v3.go @@ -22,8 +22,9 @@ import ( "net/http" "sync" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/klog/v2" ) // streamProtocolV3 implements version 3 of the streaming protocol for attach @@ -62,12 +63,12 @@ func (p *streamProtocolV3) createStreams(conn streamCreator) error { return nil } -func (p *streamProtocolV3) handleResizes() { +func (p *streamProtocolV3) handleResizes(logger klog.Logger) { if p.resizeStream == nil || p.TerminalSizeQueue == nil { return } go func() { - defer runtime.HandleCrash() + defer runtime.HandleCrashWithLogger(logger) encoder := json.NewEncoder(p.resizeStream) for { @@ -76,13 +77,13 @@ func (p *streamProtocolV3) handleResizes() { return } if err := encoder.Encode(&size); err != nil { - runtime.HandleError(err) + runtime.HandleErrorWithLogger(logger, err, "Encoding terminal size failed") } } }() } -func (p *streamProtocolV3) stream(conn streamCreator, ready chan<- struct{}) error { +func (p *streamProtocolV3) stream(logger klog.Logger, conn streamCreator, ready chan<- struct{}) error { if err := p.createStreams(conn); err != nil { return err } @@ -94,15 +95,15 @@ func (p *streamProtocolV3) stream(conn streamCreator, ready chan<- struct{}) err // now that all the streams have been created, proceed with reading & copying - errorChan := watchErrorStream(p.errorStream, &errorDecoderV3{}) + errorChan := watchErrorStream(logger, p.errorStream, &errorDecoderV3{}) - p.handleResizes() + p.handleResizes(logger) - p.copyStdin() + p.copyStdin(logger) var wg sync.WaitGroup - p.copyStdout(&wg) - p.copyStderr(&wg) + p.copyStdout(logger, &wg) + p.copyStderr(logger, &wg) // we're waiting for stdout/stderr to finish copying wg.Wait() diff --git a/tools/remotecommand/v4.go b/tools/remotecommand/v4.go index ecedad071..47018ba5f 100644 --- a/tools/remotecommand/v4.go +++ b/tools/remotecommand/v4.go @@ -26,6 +26,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/remotecommand" "k8s.io/client-go/util/exec" + "k8s.io/klog/v2" ) // streamProtocolV4 implements version 4 of the streaming protocol for attach @@ -47,11 +48,11 @@ func (p *streamProtocolV4) createStreams(conn streamCreator) error { return p.streamProtocolV3.createStreams(conn) } -func (p *streamProtocolV4) handleResizes() { - p.streamProtocolV3.handleResizes() +func (p *streamProtocolV4) handleResizes(logger klog.Logger) { + p.streamProtocolV3.handleResizes(logger) } -func (p *streamProtocolV4) stream(conn streamCreator, ready chan<- struct{}) error { +func (p *streamProtocolV4) stream(logger klog.Logger, conn streamCreator, ready chan<- struct{}) error { if err := p.createStreams(conn); err != nil { return err } @@ -63,15 +64,15 @@ func (p *streamProtocolV4) stream(conn streamCreator, ready chan<- struct{}) err // now that all the streams have been created, proceed with reading & copying - errorChan := watchErrorStream(p.errorStream, &errorDecoderV4{}) + errorChan := watchErrorStream(logger, p.errorStream, &errorDecoderV4{}) - p.handleResizes() + p.handleResizes(logger) - p.copyStdin() + p.copyStdin(logger) var wg sync.WaitGroup - p.copyStdout(&wg) - p.copyStderr(&wg) + p.copyStdout(logger, &wg) + p.copyStderr(logger, &wg) // we're waiting for stdout/stderr to finish copying wg.Wait() diff --git a/tools/remotecommand/v5.go b/tools/remotecommand/v5.go index edfd3ccb2..ca79a8828 100644 --- a/tools/remotecommand/v5.go +++ b/tools/remotecommand/v5.go @@ -16,6 +16,8 @@ limitations under the License. package remotecommand +import "k8s.io/klog/v2" + // streamProtocolV5 add support for V5 of the remote command subprotocol. // For the streamProtocolHandler, this version is the same as V4. type streamProtocolV5 struct { @@ -30,6 +32,6 @@ func newStreamProtocolV5(options StreamOptions) streamProtocolHandler { } } -func (p *streamProtocolV5) stream(conn streamCreator, ready chan<- struct{}) error { - return p.streamProtocolV4.stream(conn, ready) +func (p *streamProtocolV5) stream(logger klog.Logger, conn streamCreator, ready chan<- struct{}) error { + return p.streamProtocolV4.stream(logger, conn, ready) } diff --git a/tools/remotecommand/websocket.go b/tools/remotecommand/websocket.go index e0433198f..ce03c1834 100644 --- a/tools/remotecommand/websocket.go +++ b/tools/remotecommand/websocket.go @@ -130,7 +130,8 @@ func (e *wsStreamExecutor) StreamWithContext(ctx context.Context, options Stream } defer conn.Close() e.negotiated = conn.Subprotocol() - klog.V(4).Infof("The subprotocol is %s", e.negotiated) + logger := klog.FromContext(ctx) + logger.V(4).Info("Subprotocol negotiated", "protocol", e.negotiated) var streamer streamProtocolHandler switch e.negotiated { @@ -143,7 +144,7 @@ func (e *wsStreamExecutor) StreamWithContext(ctx context.Context, options Stream case remotecommand.StreamProtocolV2Name: streamer = newStreamProtocolV2(options) case "": - klog.V(4).Infof("The server did not negotiate a streaming protocol version. Falling back to %s", remotecommand.StreamProtocolV1Name) + logger.V(4).Info("The server did not negotiate a streaming protocol version, falling back", "protocol", remotecommand.StreamProtocolV1Name) fallthrough case remotecommand.StreamProtocolV1Name: streamer = newStreamProtocolV1(options) @@ -159,7 +160,7 @@ func (e *wsStreamExecutor) StreamWithContext(ctx context.Context, options Stream }() readyChan := make(chan struct{}) - creator := newWSStreamCreator(conn) + creator := newWSStreamCreator(logger, conn) go func() { select { // Wait until all streams have been created before starting the readDemuxLoop. @@ -177,7 +178,7 @@ func (e *wsStreamExecutor) StreamWithContext(ctx context.Context, options Stream e.heartbeatDeadline, ) }() - errorChan <- streamer.stream(creator, readyChan) + errorChan <- streamer.stream(logger, creator, readyChan) }() select { @@ -191,7 +192,8 @@ func (e *wsStreamExecutor) StreamWithContext(ctx context.Context, options Stream } type wsStreamCreator struct { - conn *gwebsocket.Conn + logger klog.Logger + conn *gwebsocket.Conn // Protects writing to websocket connection; reading is lock-free connWriteLock sync.Mutex // map of stream id to stream; multiple streams read/write the connection @@ -202,8 +204,9 @@ type wsStreamCreator struct { setStreamErr error } -func newWSStreamCreator(conn *gwebsocket.Conn) *wsStreamCreator { +func newWSStreamCreator(logger klog.Logger, conn *gwebsocket.Conn) *wsStreamCreator { return &wsStreamCreator{ + logger: logger, conn: conn, streams: map[byte]*stream{}, } @@ -238,6 +241,7 @@ func (c *wsStreamCreator) CreateStream(headers http.Header) (httpstream.Stream, } reader, writer := io.Pipe() s := &stream{ + logger: klog.LoggerWithValues(c.logger, "id", id), headers: headers, readPipe: reader, writePipe: writer, @@ -260,11 +264,11 @@ func (c *wsStreamCreator) CreateStream(headers http.Header) (httpstream.Stream, // connection reader at a time (a read mutex would provide no benefit). func (c *wsStreamCreator) readDemuxLoop(bufferSize int, period time.Duration, deadline time.Duration) { // Initialize and start the ping/pong heartbeat. - h := newHeartbeat(c.conn, period, deadline) + h := newHeartbeat(c.logger, c.conn, period, deadline) // Set initial timeout for websocket connection reading. - klog.V(5).Infof("Websocket initial read deadline: %s", deadline) + c.logger.V(5).Info("Websocket read starts", "deadline", deadline) if err := c.conn.SetReadDeadline(time.Now().Add(deadline)); err != nil { - klog.Errorf("Websocket initial setting read deadline failed %v", err) + c.logger.Error(err, "Websocket initial setting read deadline failed") return } go h.start() @@ -308,7 +312,7 @@ func (c *wsStreamCreator) readDemuxLoop(bufferSize int, period time.Duration, de streamID := readBuffer[0] s := c.getStream(streamID) if s == nil { - klog.Errorf("Unknown stream id %d, discarding message", streamID) + c.logger.Error(nil, "Unknown stream, discarding message", "id", streamID) continue } for { @@ -351,6 +355,7 @@ func (c *wsStreamCreator) closeAllStreamReaders(err error) { } type stream struct { + logger klog.Logger headers http.Header readPipe *io.PipeReader writePipe *io.PipeWriter @@ -369,8 +374,8 @@ func (s *stream) Read(p []byte) (n int, err error) { // Write writes directly to the underlying WebSocket connection. func (s *stream) Write(p []byte) (n int, err error) { - klog.V(8).Infof("Write() on stream %d", s.id) - defer klog.V(8).Infof("Write() done on stream %d", s.id) + s.logger.V(8).Info("Write() on stream") + defer s.logger.V(8).Info("Write() done on stream") s.connWriteLock.Lock() defer s.connWriteLock.Unlock() if s.conn == nil { @@ -378,7 +383,7 @@ func (s *stream) Write(p []byte) (n int, err error) { } err = s.conn.SetWriteDeadline(time.Now().Add(writeDeadline)) if err != nil { - klog.V(4).Infof("Websocket setting write deadline failed %v", err) + s.logger.V(4).Info("Websocket setting write deadline failed", "err", err) return 0, err } // Message writer buffers the message data, so we don't need to do that ourselves. @@ -407,8 +412,8 @@ func (s *stream) Write(p []byte) (n int, err error) { // Close half-closes the stream, indicating this side is finished with the stream. func (s *stream) Close() error { - klog.V(6).Infof("Close() on stream %d", s.id) - defer klog.V(6).Infof("Close() done on stream %d", s.id) + s.logger.V(6).Info("Close() on stream") + defer s.logger.V(6).Info("Close() done on stream") s.connWriteLock.Lock() defer s.connWriteLock.Unlock() if s.conn == nil { @@ -421,8 +426,8 @@ func (s *stream) Close() error { } func (s *stream) Reset() error { - klog.V(4).Infof("Reset() on stream %d", s.id) - defer klog.V(4).Infof("Reset() done on stream %d", s.id) + s.logger.V(4).Info("Reset() on stream") + defer s.logger.V(4).Info("Reset() done on stream") s.Close() return s.writePipe.Close() } @@ -442,7 +447,8 @@ func (s *stream) Identifier() uint32 { // inside the "readDemuxLoop" will return an i/o error prompting a connection close // and cleanup. type heartbeat struct { - conn *gwebsocket.Conn + logger klog.Logger + conn *gwebsocket.Conn // period defines how often a "ping" heartbeat message is sent to the other endpoint period time.Duration // closing the "closer" channel will clean up the heartbeat timers @@ -456,8 +462,9 @@ type heartbeat struct { // newHeartbeat creates heartbeat structure encapsulating fields necessary to // run the websocket connection ping/pong mechanism and sets up handlers on // the websocket connection. -func newHeartbeat(conn *gwebsocket.Conn, period time.Duration, deadline time.Duration) *heartbeat { +func newHeartbeat(logger klog.Logger, conn *gwebsocket.Conn, period time.Duration, deadline time.Duration) *heartbeat { h := &heartbeat{ + logger: logger, conn: conn, period: period, closer: make(chan struct{}), @@ -467,10 +474,10 @@ func newHeartbeat(conn *gwebsocket.Conn, period time.Duration, deadline time.Dur // be empty. h.conn.SetPongHandler(func(msg string) error { // Push the read deadline into the future. - klog.V(6).Infof("Pong message received (%s)--resetting read deadline", msg) + logger.V(6).Info("Pong message received -- resetting read deadline", "message", msg) err := h.conn.SetReadDeadline(time.Now().Add(deadline)) if err != nil { - klog.Errorf("Websocket setting read deadline failed %v", err) + logger.Error(err, "Websocket setting read deadline failed") return err } if len(msg) > 0 { @@ -502,16 +509,16 @@ func (h *heartbeat) start() { for { select { case <-h.closer: - klog.V(5).Infof("closed channel--returning") + h.logger.V(5).Info("Closed channel -- returning") return case <-t.C: // "WriteControl" does not need to be protected by a mutex. According to // gorilla/websockets library docs: "The Close and WriteControl methods can // be called concurrently with all other methods." if err := h.conn.WriteControl(gwebsocket.PingMessage, h.message, time.Now().Add(pingReadDeadline)); err == nil { - klog.V(6).Infof("Websocket Ping succeeeded") + h.logger.V(6).Info("Websocket Ping succeeeded") } else { - klog.Errorf("Websocket Ping failed: %v", err) + h.logger.Error(err, "Websocket Ping failed") if errors.Is(err, gwebsocket.ErrCloseSent) { // we continue because c.conn.CloseChan will manage closing the connection already continue diff --git a/tools/remotecommand/websocket_test.go b/tools/remotecommand/websocket_test.go index 9f064d935..ef23d8cb5 100644 --- a/tools/remotecommand/websocket_test.go +++ b/tools/remotecommand/websocket_test.go @@ -48,6 +48,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/rest" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + "k8s.io/klog/v2/ktesting" ) // TestWebSocketClient_LoopbackStdinToStdout returns random data sent on the STDIN channel @@ -1049,6 +1050,7 @@ func TestWebSocketClient_ExecutorErrors(t *testing.T) { } func TestWebSocketClient_HeartbeatSucceeds(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) var upgrader = gwebsocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true // Accepting all requests @@ -1081,7 +1083,7 @@ func TestWebSocketClient_HeartbeatSucceeds(t *testing.T) { var expectedMsg = "test heartbeat message" var period = 100 * time.Millisecond var deadline = 200 * time.Millisecond - heartbeat := newHeartbeat(client, period, deadline) + heartbeat := newHeartbeat(logger, client, period, deadline) heartbeat.setMessage(expectedMsg) // Add a channel to the handler to retrieve the "pong" message. pongMsgCh := make(chan string) @@ -1121,7 +1123,8 @@ func TestWebSocketClient_HeartbeatSucceeds(t *testing.T) { } func TestLateStreamCreation(t *testing.T) { - c := newWSStreamCreator(nil) + logger, _ := ktesting.NewTestContext(t) + c := newWSStreamCreator(logger, nil) c.closeAllStreamReaders(nil) if err := c.setStream(0, nil); err == nil { t.Fatal("expected error adding stream after closeAllStreamReaders") @@ -1129,8 +1132,10 @@ func TestLateStreamCreation(t *testing.T) { } func TestWebSocketClient_StreamsAndExpectedErrors(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) + // Validate Stream functions. - c := newWSStreamCreator(nil) + c := newWSStreamCreator(logger, nil) headers := http.Header{} headers.Set(v1.StreamType, v1.StreamTypeStdin) s, err := c.CreateStream(headers)