mirror of
https://github.com/kubernetes/client-go.git
synced 2026-02-21 23:53:16 +00:00
client-go remotecommand: structured, contextual logging
The API for the package already had a context, so all that was missing was to extract and use the logger from that. Kubernetes-commit: 1620b2707623036f5133cf8045da89411d2c4345
This commit is contained in:
committed by
Kubernetes Publisher
parent
24500c1c50
commit
d55310b9f9
@@ -21,6 +21,7 @@ import (
|
||||
"io"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// errorStreamDecoder interprets the data on the error channel and creates a go error object from it.
|
||||
@@ -32,11 +33,11 @@ type errorStreamDecoder interface {
|
||||
// decodes it with the given errorStreamDecoder, sends the decoded error (or nil if the remote
|
||||
// command exited successfully) to the returned error channel, and closes it.
|
||||
// This function returns immediately.
|
||||
func watchErrorStream(errorStream io.Reader, d errorStreamDecoder) chan error {
|
||||
func watchErrorStream(logger klog.Logger, errorStream io.Reader, d errorStreamDecoder) chan error {
|
||||
errorChan := make(chan error)
|
||||
|
||||
go func() {
|
||||
defer runtime.HandleCrash()
|
||||
defer runtime.HandleCrashWithLogger(logger)
|
||||
|
||||
message, err := io.ReadAll(errorStream)
|
||||
switch {
|
||||
|
||||
@@ -53,7 +53,7 @@ func (f *FallbackExecutor) Stream(options StreamOptions) error {
|
||||
func (f *FallbackExecutor) StreamWithContext(ctx context.Context, options StreamOptions) error {
|
||||
err := f.primary.StreamWithContext(ctx, options)
|
||||
if err != nil && f.shouldFallback(err) {
|
||||
klog.V(4).Infof("RemoteCommand fallback: %v", err)
|
||||
klog.FromContext(ctx).V(4).Info("RemoteCommand fallback", "err", err)
|
||||
return f.secondary.StreamWithContext(ctx, options)
|
||||
}
|
||||
return err
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
"net/http"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/httpstream"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// StreamOptions holds information pertaining to the current streaming session:
|
||||
@@ -54,5 +55,5 @@ type streamCreator interface {
|
||||
}
|
||||
|
||||
type streamProtocolHandler interface {
|
||||
stream(conn streamCreator, ready chan<- struct{}) error
|
||||
stream(logger klog.Logger, conn streamCreator, ready chan<- struct{}) error
|
||||
}
|
||||
|
||||
@@ -121,6 +121,7 @@ func (e *spdyStreamExecutor) newConnectionAndStream(ctx context.Context, options
|
||||
|
||||
var streamer streamProtocolHandler
|
||||
|
||||
logger := klog.FromContext(ctx)
|
||||
switch protocol {
|
||||
case remotecommand.StreamProtocolV5Name:
|
||||
streamer = newStreamProtocolV5(options)
|
||||
@@ -131,7 +132,7 @@ func (e *spdyStreamExecutor) newConnectionAndStream(ctx context.Context, options
|
||||
case remotecommand.StreamProtocolV2Name:
|
||||
streamer = newStreamProtocolV2(options)
|
||||
case "":
|
||||
klog.V(4).Infof("The server did not negotiate a streaming protocol version. Falling back to %s", remotecommand.StreamProtocolV1Name)
|
||||
logger.V(4).Info("The server did not negotiate a streaming protocol version, falling back", "protocol", remotecommand.StreamProtocolV1Name)
|
||||
fallthrough
|
||||
case remotecommand.StreamProtocolV1Name:
|
||||
streamer = newStreamProtocolV1(options)
|
||||
@@ -161,7 +162,7 @@ func (e *spdyStreamExecutor) StreamWithContext(ctx context.Context, options Stre
|
||||
// The SPDY executor does not need to synchronize stream creation, so we pass a nil
|
||||
// ready channel. The underlying spdystream library handles stream multiplexing
|
||||
// without a race condition.
|
||||
errorChan <- streamer.stream(conn, nil)
|
||||
errorChan <- streamer.stream(klog.FromContext(ctx), conn, nil)
|
||||
}()
|
||||
|
||||
select {
|
||||
|
||||
@@ -38,6 +38,7 @@ import (
|
||||
remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
)
|
||||
|
||||
type AttachFunc func(in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan TerminalSize) error
|
||||
@@ -328,6 +329,7 @@ func (w *writeDetector) Write(p []byte) (n int, err error) {
|
||||
// and expects the deferred close of the connection leads to the exit of the goroutine on cancellation.
|
||||
// This test verifies that works.
|
||||
func TestStreamExitsAfterConnectionIsClosed(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
writeDetector := newWriterDetector(&fakeEmptyDataPty{})
|
||||
options := StreamOptions{
|
||||
Stdin: &fakeEmptyDataPty{},
|
||||
@@ -352,7 +354,7 @@ func TestStreamExitsAfterConnectionIsClosed(t *testing.T) {
|
||||
|
||||
errorChan := make(chan error)
|
||||
go func() {
|
||||
errorChan <- streamer.stream(conn, nil)
|
||||
errorChan <- streamer.stream(logger, conn, nil)
|
||||
}()
|
||||
|
||||
// Wait until stream goroutine starts.
|
||||
|
||||
@@ -21,7 +21,7 @@ import (
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/httpstream"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
@@ -47,15 +47,15 @@ func newStreamProtocolV1(options StreamOptions) streamProtocolHandler {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *streamProtocolV1) stream(conn streamCreator, ready chan<- struct{}) error {
|
||||
func (p *streamProtocolV1) stream(logger klog.Logger, conn streamCreator, ready chan<- struct{}) error {
|
||||
doneChan := make(chan struct{}, 2)
|
||||
errorChan := make(chan error)
|
||||
|
||||
cp := func(s string, dst io.Writer, src io.Reader) {
|
||||
klog.V(6).Infof("Copying %s", s)
|
||||
defer klog.V(6).Infof("Done copying %s", s)
|
||||
logger.V(6).Info("Copying", "data", s)
|
||||
defer logger.V(6).Info("Done copying", "data", s)
|
||||
if _, err := io.Copy(dst, src); err != nil && err != io.EOF {
|
||||
klog.Errorf("Error copying %s: %v", s, err)
|
||||
logger.Error(err, "Error copying", "data", s)
|
||||
}
|
||||
if s == v1.StreamTypeStdout || s == v1.StreamTypeStderr {
|
||||
doneChan <- struct{}{}
|
||||
|
||||
@@ -22,8 +22,9 @@ import (
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// streamProtocolV2 implements version 2 of the streaming protocol for attach
|
||||
@@ -87,13 +88,13 @@ func (p *streamProtocolV2) createStreams(conn streamCreator) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *streamProtocolV2) copyStdin() {
|
||||
func (p *streamProtocolV2) copyStdin(logger klog.Logger) {
|
||||
if p.Stdin != nil {
|
||||
var once sync.Once
|
||||
|
||||
// copy from client's stdin to container's stdin
|
||||
go func() {
|
||||
defer runtime.HandleCrash()
|
||||
defer runtime.HandleCrashWithLogger(logger)
|
||||
|
||||
// if p.stdin is noninteractive, p.g. `echo abc | kubectl exec -i <pod> -- cat`, make sure
|
||||
// we close remoteStdin as soon as the copy from p.stdin to remoteStdin finishes. Otherwise
|
||||
@@ -101,7 +102,7 @@ func (p *streamProtocolV2) copyStdin() {
|
||||
defer once.Do(func() { p.remoteStdin.Close() })
|
||||
|
||||
if _, err := io.Copy(p.remoteStdin, readerWrapper{p.Stdin}); err != nil {
|
||||
runtime.HandleError(err)
|
||||
runtime.HandleErrorWithLogger(logger, err, "Copying stdin failed")
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -120,26 +121,26 @@ func (p *streamProtocolV2) copyStdin() {
|
||||
// When that happens, we must Close() on our side of remoteStdin, to
|
||||
// allow the copy in hijack to complete, and hijack to return.
|
||||
go func() {
|
||||
defer runtime.HandleCrash()
|
||||
defer runtime.HandleCrashWithLogger(logger)
|
||||
defer once.Do(func() { p.remoteStdin.Close() })
|
||||
|
||||
// this "copy" doesn't actually read anything - it's just here to wait for
|
||||
// the server to close remoteStdin.
|
||||
if _, err := io.Copy(io.Discard, p.remoteStdin); err != nil {
|
||||
runtime.HandleError(err)
|
||||
runtime.HandleErrorWithLogger(logger, err, "Waiting for server to close stdin failed")
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func (p *streamProtocolV2) copyStdout(wg *sync.WaitGroup) {
|
||||
func (p *streamProtocolV2) copyStdout(logger klog.Logger, wg *sync.WaitGroup) {
|
||||
if p.Stdout == nil {
|
||||
return
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer runtime.HandleCrash()
|
||||
defer runtime.HandleCrashWithLogger(logger)
|
||||
defer wg.Done()
|
||||
// make sure, packet in queue can be consumed.
|
||||
// block in queue may lead to deadlock in conn.server
|
||||
@@ -147,29 +148,29 @@ func (p *streamProtocolV2) copyStdout(wg *sync.WaitGroup) {
|
||||
defer io.Copy(io.Discard, p.remoteStdout)
|
||||
|
||||
if _, err := io.Copy(p.Stdout, p.remoteStdout); err != nil {
|
||||
runtime.HandleError(err)
|
||||
runtime.HandleErrorWithLogger(logger, err, "Copying stdout failed")
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (p *streamProtocolV2) copyStderr(wg *sync.WaitGroup) {
|
||||
func (p *streamProtocolV2) copyStderr(logger klog.Logger, wg *sync.WaitGroup) {
|
||||
if p.Stderr == nil || p.Tty {
|
||||
return
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer runtime.HandleCrash()
|
||||
defer runtime.HandleCrashWithLogger(logger)
|
||||
defer wg.Done()
|
||||
defer io.Copy(io.Discard, p.remoteStderr)
|
||||
|
||||
if _, err := io.Copy(p.Stderr, p.remoteStderr); err != nil {
|
||||
runtime.HandleError(err)
|
||||
runtime.HandleErrorWithLogger(logger, err, "Copying stderr failed")
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (p *streamProtocolV2) stream(conn streamCreator, ready chan<- struct{}) error {
|
||||
func (p *streamProtocolV2) stream(logger klog.Logger, conn streamCreator, ready chan<- struct{}) error {
|
||||
if err := p.createStreams(conn); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -181,13 +182,13 @@ func (p *streamProtocolV2) stream(conn streamCreator, ready chan<- struct{}) err
|
||||
|
||||
// now that all the streams have been created, proceed with reading & copying
|
||||
|
||||
errorChan := watchErrorStream(p.errorStream, &errorDecoderV2{})
|
||||
errorChan := watchErrorStream(logger, p.errorStream, &errorDecoderV2{})
|
||||
|
||||
p.copyStdin()
|
||||
p.copyStdin(logger)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
p.copyStdout(&wg)
|
||||
p.copyStderr(&wg)
|
||||
p.copyStdout(logger, &wg)
|
||||
p.copyStderr(logger, &wg)
|
||||
|
||||
// we're waiting for stdout/stderr to finish copying
|
||||
wg.Wait()
|
||||
|
||||
@@ -28,6 +28,7 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/httpstream"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
)
|
||||
|
||||
type fakeReader struct {
|
||||
@@ -179,6 +180,7 @@ func TestV2CreateStreams(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestV2ErrorStreamReading(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
tests := []struct {
|
||||
name string
|
||||
stream io.Reader
|
||||
@@ -217,7 +219,7 @@ func TestV2ErrorStreamReading(t *testing.T) {
|
||||
h := newStreamProtocolV2(StreamOptions{}).(*streamProtocolV2)
|
||||
h.errorStream = test.stream
|
||||
|
||||
ch := watchErrorStream(h.errorStream, &errorDecoderV2{})
|
||||
ch := watchErrorStream(logger, h.errorStream, &errorDecoderV2{})
|
||||
if ch == nil {
|
||||
t.Fatalf("%s: unexpected nil channel", test.name)
|
||||
}
|
||||
|
||||
@@ -22,8 +22,9 @@ import (
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// streamProtocolV3 implements version 3 of the streaming protocol for attach
|
||||
@@ -62,12 +63,12 @@ func (p *streamProtocolV3) createStreams(conn streamCreator) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *streamProtocolV3) handleResizes() {
|
||||
func (p *streamProtocolV3) handleResizes(logger klog.Logger) {
|
||||
if p.resizeStream == nil || p.TerminalSizeQueue == nil {
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
defer runtime.HandleCrash()
|
||||
defer runtime.HandleCrashWithLogger(logger)
|
||||
|
||||
encoder := json.NewEncoder(p.resizeStream)
|
||||
for {
|
||||
@@ -76,13 +77,13 @@ func (p *streamProtocolV3) handleResizes() {
|
||||
return
|
||||
}
|
||||
if err := encoder.Encode(&size); err != nil {
|
||||
runtime.HandleError(err)
|
||||
runtime.HandleErrorWithLogger(logger, err, "Encoding terminal size failed")
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (p *streamProtocolV3) stream(conn streamCreator, ready chan<- struct{}) error {
|
||||
func (p *streamProtocolV3) stream(logger klog.Logger, conn streamCreator, ready chan<- struct{}) error {
|
||||
if err := p.createStreams(conn); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -94,15 +95,15 @@ func (p *streamProtocolV3) stream(conn streamCreator, ready chan<- struct{}) err
|
||||
|
||||
// now that all the streams have been created, proceed with reading & copying
|
||||
|
||||
errorChan := watchErrorStream(p.errorStream, &errorDecoderV3{})
|
||||
errorChan := watchErrorStream(logger, p.errorStream, &errorDecoderV3{})
|
||||
|
||||
p.handleResizes()
|
||||
p.handleResizes(logger)
|
||||
|
||||
p.copyStdin()
|
||||
p.copyStdin(logger)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
p.copyStdout(&wg)
|
||||
p.copyStderr(&wg)
|
||||
p.copyStdout(logger, &wg)
|
||||
p.copyStderr(logger, &wg)
|
||||
|
||||
// we're waiting for stdout/stderr to finish copying
|
||||
wg.Wait()
|
||||
|
||||
@@ -26,6 +26,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/remotecommand"
|
||||
"k8s.io/client-go/util/exec"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// streamProtocolV4 implements version 4 of the streaming protocol for attach
|
||||
@@ -47,11 +48,11 @@ func (p *streamProtocolV4) createStreams(conn streamCreator) error {
|
||||
return p.streamProtocolV3.createStreams(conn)
|
||||
}
|
||||
|
||||
func (p *streamProtocolV4) handleResizes() {
|
||||
p.streamProtocolV3.handleResizes()
|
||||
func (p *streamProtocolV4) handleResizes(logger klog.Logger) {
|
||||
p.streamProtocolV3.handleResizes(logger)
|
||||
}
|
||||
|
||||
func (p *streamProtocolV4) stream(conn streamCreator, ready chan<- struct{}) error {
|
||||
func (p *streamProtocolV4) stream(logger klog.Logger, conn streamCreator, ready chan<- struct{}) error {
|
||||
if err := p.createStreams(conn); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -63,15 +64,15 @@ func (p *streamProtocolV4) stream(conn streamCreator, ready chan<- struct{}) err
|
||||
|
||||
// now that all the streams have been created, proceed with reading & copying
|
||||
|
||||
errorChan := watchErrorStream(p.errorStream, &errorDecoderV4{})
|
||||
errorChan := watchErrorStream(logger, p.errorStream, &errorDecoderV4{})
|
||||
|
||||
p.handleResizes()
|
||||
p.handleResizes(logger)
|
||||
|
||||
p.copyStdin()
|
||||
p.copyStdin(logger)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
p.copyStdout(&wg)
|
||||
p.copyStderr(&wg)
|
||||
p.copyStdout(logger, &wg)
|
||||
p.copyStderr(logger, &wg)
|
||||
|
||||
// we're waiting for stdout/stderr to finish copying
|
||||
wg.Wait()
|
||||
|
||||
@@ -16,6 +16,8 @@ limitations under the License.
|
||||
|
||||
package remotecommand
|
||||
|
||||
import "k8s.io/klog/v2"
|
||||
|
||||
// streamProtocolV5 add support for V5 of the remote command subprotocol.
|
||||
// For the streamProtocolHandler, this version is the same as V4.
|
||||
type streamProtocolV5 struct {
|
||||
@@ -30,6 +32,6 @@ func newStreamProtocolV5(options StreamOptions) streamProtocolHandler {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *streamProtocolV5) stream(conn streamCreator, ready chan<- struct{}) error {
|
||||
return p.streamProtocolV4.stream(conn, ready)
|
||||
func (p *streamProtocolV5) stream(logger klog.Logger, conn streamCreator, ready chan<- struct{}) error {
|
||||
return p.streamProtocolV4.stream(logger, conn, ready)
|
||||
}
|
||||
|
||||
@@ -130,7 +130,8 @@ func (e *wsStreamExecutor) StreamWithContext(ctx context.Context, options Stream
|
||||
}
|
||||
defer conn.Close()
|
||||
e.negotiated = conn.Subprotocol()
|
||||
klog.V(4).Infof("The subprotocol is %s", e.negotiated)
|
||||
logger := klog.FromContext(ctx)
|
||||
logger.V(4).Info("Subprotocol negotiated", "protocol", e.negotiated)
|
||||
|
||||
var streamer streamProtocolHandler
|
||||
switch e.negotiated {
|
||||
@@ -143,7 +144,7 @@ func (e *wsStreamExecutor) StreamWithContext(ctx context.Context, options Stream
|
||||
case remotecommand.StreamProtocolV2Name:
|
||||
streamer = newStreamProtocolV2(options)
|
||||
case "":
|
||||
klog.V(4).Infof("The server did not negotiate a streaming protocol version. Falling back to %s", remotecommand.StreamProtocolV1Name)
|
||||
logger.V(4).Info("The server did not negotiate a streaming protocol version, falling back", "protocol", remotecommand.StreamProtocolV1Name)
|
||||
fallthrough
|
||||
case remotecommand.StreamProtocolV1Name:
|
||||
streamer = newStreamProtocolV1(options)
|
||||
@@ -159,7 +160,7 @@ func (e *wsStreamExecutor) StreamWithContext(ctx context.Context, options Stream
|
||||
}()
|
||||
|
||||
readyChan := make(chan struct{})
|
||||
creator := newWSStreamCreator(conn)
|
||||
creator := newWSStreamCreator(logger, conn)
|
||||
go func() {
|
||||
select {
|
||||
// Wait until all streams have been created before starting the readDemuxLoop.
|
||||
@@ -177,7 +178,7 @@ func (e *wsStreamExecutor) StreamWithContext(ctx context.Context, options Stream
|
||||
e.heartbeatDeadline,
|
||||
)
|
||||
}()
|
||||
errorChan <- streamer.stream(creator, readyChan)
|
||||
errorChan <- streamer.stream(logger, creator, readyChan)
|
||||
}()
|
||||
|
||||
select {
|
||||
@@ -191,7 +192,8 @@ func (e *wsStreamExecutor) StreamWithContext(ctx context.Context, options Stream
|
||||
}
|
||||
|
||||
type wsStreamCreator struct {
|
||||
conn *gwebsocket.Conn
|
||||
logger klog.Logger
|
||||
conn *gwebsocket.Conn
|
||||
// Protects writing to websocket connection; reading is lock-free
|
||||
connWriteLock sync.Mutex
|
||||
// map of stream id to stream; multiple streams read/write the connection
|
||||
@@ -202,8 +204,9 @@ type wsStreamCreator struct {
|
||||
setStreamErr error
|
||||
}
|
||||
|
||||
func newWSStreamCreator(conn *gwebsocket.Conn) *wsStreamCreator {
|
||||
func newWSStreamCreator(logger klog.Logger, conn *gwebsocket.Conn) *wsStreamCreator {
|
||||
return &wsStreamCreator{
|
||||
logger: logger,
|
||||
conn: conn,
|
||||
streams: map[byte]*stream{},
|
||||
}
|
||||
@@ -238,6 +241,7 @@ func (c *wsStreamCreator) CreateStream(headers http.Header) (httpstream.Stream,
|
||||
}
|
||||
reader, writer := io.Pipe()
|
||||
s := &stream{
|
||||
logger: klog.LoggerWithValues(c.logger, "id", id),
|
||||
headers: headers,
|
||||
readPipe: reader,
|
||||
writePipe: writer,
|
||||
@@ -260,11 +264,11 @@ func (c *wsStreamCreator) CreateStream(headers http.Header) (httpstream.Stream,
|
||||
// connection reader at a time (a read mutex would provide no benefit).
|
||||
func (c *wsStreamCreator) readDemuxLoop(bufferSize int, period time.Duration, deadline time.Duration) {
|
||||
// Initialize and start the ping/pong heartbeat.
|
||||
h := newHeartbeat(c.conn, period, deadline)
|
||||
h := newHeartbeat(c.logger, c.conn, period, deadline)
|
||||
// Set initial timeout for websocket connection reading.
|
||||
klog.V(5).Infof("Websocket initial read deadline: %s", deadline)
|
||||
c.logger.V(5).Info("Websocket read starts", "deadline", deadline)
|
||||
if err := c.conn.SetReadDeadline(time.Now().Add(deadline)); err != nil {
|
||||
klog.Errorf("Websocket initial setting read deadline failed %v", err)
|
||||
c.logger.Error(err, "Websocket initial setting read deadline failed")
|
||||
return
|
||||
}
|
||||
go h.start()
|
||||
@@ -308,7 +312,7 @@ func (c *wsStreamCreator) readDemuxLoop(bufferSize int, period time.Duration, de
|
||||
streamID := readBuffer[0]
|
||||
s := c.getStream(streamID)
|
||||
if s == nil {
|
||||
klog.Errorf("Unknown stream id %d, discarding message", streamID)
|
||||
c.logger.Error(nil, "Unknown stream, discarding message", "id", streamID)
|
||||
continue
|
||||
}
|
||||
for {
|
||||
@@ -351,6 +355,7 @@ func (c *wsStreamCreator) closeAllStreamReaders(err error) {
|
||||
}
|
||||
|
||||
type stream struct {
|
||||
logger klog.Logger
|
||||
headers http.Header
|
||||
readPipe *io.PipeReader
|
||||
writePipe *io.PipeWriter
|
||||
@@ -369,8 +374,8 @@ func (s *stream) Read(p []byte) (n int, err error) {
|
||||
|
||||
// Write writes directly to the underlying WebSocket connection.
|
||||
func (s *stream) Write(p []byte) (n int, err error) {
|
||||
klog.V(8).Infof("Write() on stream %d", s.id)
|
||||
defer klog.V(8).Infof("Write() done on stream %d", s.id)
|
||||
s.logger.V(8).Info("Write() on stream")
|
||||
defer s.logger.V(8).Info("Write() done on stream")
|
||||
s.connWriteLock.Lock()
|
||||
defer s.connWriteLock.Unlock()
|
||||
if s.conn == nil {
|
||||
@@ -378,7 +383,7 @@ func (s *stream) Write(p []byte) (n int, err error) {
|
||||
}
|
||||
err = s.conn.SetWriteDeadline(time.Now().Add(writeDeadline))
|
||||
if err != nil {
|
||||
klog.V(4).Infof("Websocket setting write deadline failed %v", err)
|
||||
s.logger.V(4).Info("Websocket setting write deadline failed", "err", err)
|
||||
return 0, err
|
||||
}
|
||||
// Message writer buffers the message data, so we don't need to do that ourselves.
|
||||
@@ -407,8 +412,8 @@ func (s *stream) Write(p []byte) (n int, err error) {
|
||||
|
||||
// Close half-closes the stream, indicating this side is finished with the stream.
|
||||
func (s *stream) Close() error {
|
||||
klog.V(6).Infof("Close() on stream %d", s.id)
|
||||
defer klog.V(6).Infof("Close() done on stream %d", s.id)
|
||||
s.logger.V(6).Info("Close() on stream")
|
||||
defer s.logger.V(6).Info("Close() done on stream")
|
||||
s.connWriteLock.Lock()
|
||||
defer s.connWriteLock.Unlock()
|
||||
if s.conn == nil {
|
||||
@@ -421,8 +426,8 @@ func (s *stream) Close() error {
|
||||
}
|
||||
|
||||
func (s *stream) Reset() error {
|
||||
klog.V(4).Infof("Reset() on stream %d", s.id)
|
||||
defer klog.V(4).Infof("Reset() done on stream %d", s.id)
|
||||
s.logger.V(4).Info("Reset() on stream")
|
||||
defer s.logger.V(4).Info("Reset() done on stream")
|
||||
s.Close()
|
||||
return s.writePipe.Close()
|
||||
}
|
||||
@@ -442,7 +447,8 @@ func (s *stream) Identifier() uint32 {
|
||||
// inside the "readDemuxLoop" will return an i/o error prompting a connection close
|
||||
// and cleanup.
|
||||
type heartbeat struct {
|
||||
conn *gwebsocket.Conn
|
||||
logger klog.Logger
|
||||
conn *gwebsocket.Conn
|
||||
// period defines how often a "ping" heartbeat message is sent to the other endpoint
|
||||
period time.Duration
|
||||
// closing the "closer" channel will clean up the heartbeat timers
|
||||
@@ -456,8 +462,9 @@ type heartbeat struct {
|
||||
// newHeartbeat creates heartbeat structure encapsulating fields necessary to
|
||||
// run the websocket connection ping/pong mechanism and sets up handlers on
|
||||
// the websocket connection.
|
||||
func newHeartbeat(conn *gwebsocket.Conn, period time.Duration, deadline time.Duration) *heartbeat {
|
||||
func newHeartbeat(logger klog.Logger, conn *gwebsocket.Conn, period time.Duration, deadline time.Duration) *heartbeat {
|
||||
h := &heartbeat{
|
||||
logger: logger,
|
||||
conn: conn,
|
||||
period: period,
|
||||
closer: make(chan struct{}),
|
||||
@@ -467,10 +474,10 @@ func newHeartbeat(conn *gwebsocket.Conn, period time.Duration, deadline time.Dur
|
||||
// be empty.
|
||||
h.conn.SetPongHandler(func(msg string) error {
|
||||
// Push the read deadline into the future.
|
||||
klog.V(6).Infof("Pong message received (%s)--resetting read deadline", msg)
|
||||
logger.V(6).Info("Pong message received -- resetting read deadline", "message", msg)
|
||||
err := h.conn.SetReadDeadline(time.Now().Add(deadline))
|
||||
if err != nil {
|
||||
klog.Errorf("Websocket setting read deadline failed %v", err)
|
||||
logger.Error(err, "Websocket setting read deadline failed")
|
||||
return err
|
||||
}
|
||||
if len(msg) > 0 {
|
||||
@@ -502,16 +509,16 @@ func (h *heartbeat) start() {
|
||||
for {
|
||||
select {
|
||||
case <-h.closer:
|
||||
klog.V(5).Infof("closed channel--returning")
|
||||
h.logger.V(5).Info("Closed channel -- returning")
|
||||
return
|
||||
case <-t.C:
|
||||
// "WriteControl" does not need to be protected by a mutex. According to
|
||||
// gorilla/websockets library docs: "The Close and WriteControl methods can
|
||||
// be called concurrently with all other methods."
|
||||
if err := h.conn.WriteControl(gwebsocket.PingMessage, h.message, time.Now().Add(pingReadDeadline)); err == nil {
|
||||
klog.V(6).Infof("Websocket Ping succeeeded")
|
||||
h.logger.V(6).Info("Websocket Ping succeeeded")
|
||||
} else {
|
||||
klog.Errorf("Websocket Ping failed: %v", err)
|
||||
h.logger.Error(err, "Websocket Ping failed")
|
||||
if errors.Is(err, gwebsocket.ErrCloseSent) {
|
||||
// we continue because c.conn.CloseChan will manage closing the connection already
|
||||
continue
|
||||
|
||||
@@ -48,6 +48,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/rest"
|
||||
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
)
|
||||
|
||||
// TestWebSocketClient_LoopbackStdinToStdout returns random data sent on the STDIN channel
|
||||
@@ -1049,6 +1050,7 @@ func TestWebSocketClient_ExecutorErrors(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestWebSocketClient_HeartbeatSucceeds(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
var upgrader = gwebsocket.Upgrader{
|
||||
CheckOrigin: func(r *http.Request) bool {
|
||||
return true // Accepting all requests
|
||||
@@ -1081,7 +1083,7 @@ func TestWebSocketClient_HeartbeatSucceeds(t *testing.T) {
|
||||
var expectedMsg = "test heartbeat message"
|
||||
var period = 100 * time.Millisecond
|
||||
var deadline = 200 * time.Millisecond
|
||||
heartbeat := newHeartbeat(client, period, deadline)
|
||||
heartbeat := newHeartbeat(logger, client, period, deadline)
|
||||
heartbeat.setMessage(expectedMsg)
|
||||
// Add a channel to the handler to retrieve the "pong" message.
|
||||
pongMsgCh := make(chan string)
|
||||
@@ -1121,7 +1123,8 @@ func TestWebSocketClient_HeartbeatSucceeds(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestLateStreamCreation(t *testing.T) {
|
||||
c := newWSStreamCreator(nil)
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
c := newWSStreamCreator(logger, nil)
|
||||
c.closeAllStreamReaders(nil)
|
||||
if err := c.setStream(0, nil); err == nil {
|
||||
t.Fatal("expected error adding stream after closeAllStreamReaders")
|
||||
@@ -1129,8 +1132,10 @@ func TestLateStreamCreation(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestWebSocketClient_StreamsAndExpectedErrors(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
|
||||
// Validate Stream functions.
|
||||
c := newWSStreamCreator(nil)
|
||||
c := newWSStreamCreator(logger, nil)
|
||||
headers := http.Header{}
|
||||
headers.Set(v1.StreamType, v1.StreamTypeStdin)
|
||||
s, err := c.CreateStream(headers)
|
||||
|
||||
Reference in New Issue
Block a user