From 42808c8343671e6783ba4c901dcd619bed648c3d Mon Sep 17 00:00:00 2001 From: arkbriar Date: Wed, 24 Aug 2022 10:21:35 +0800 Subject: [PATCH 1/3] Support cancelable SPDY executor stream Mark remotecommand.Executor as deprecated and related modifications. Handle crash when streamer.stream panics Add a test to verify if stream is closed after connection being closed Remove blank line and update waiting time to 1s to avoid test flakes in CI. Refine the tests of StreamExecutor according to comments. Remove the comment of context controlling the negotiation progress and misc. Signed-off-by: arkbriar --- pkg/client/tests/remotecommand_test.go | 3 +- pkg/kubelet/cri/streaming/server_test.go | 3 +- .../tools/remotecommand/remotecommand.go | 53 ++++++-- .../tools/remotecommand/remotecommand_test.go | 124 ++++++++++++++++-- .../k8s.io/kubectl/pkg/cmd/attach/attach.go | 3 +- .../src/k8s.io/kubectl/pkg/cmd/exec/exec.go | 2 +- test/e2e/framework/pod/exec_util.go | 2 +- 7 files changed, 166 insertions(+), 24 deletions(-) diff --git a/pkg/client/tests/remotecommand_test.go b/pkg/client/tests/remotecommand_test.go index 910cfce38f8..0e15b2fa9a9 100644 --- a/pkg/client/tests/remotecommand_test.go +++ b/pkg/client/tests/remotecommand_test.go @@ -18,6 +18,7 @@ package tests import ( "bytes" + "context" "errors" "fmt" "io" @@ -252,7 +253,7 @@ func TestStream(t *testing.T) { if err != nil { t.Fatalf("%s: unexpected error: %v", name, err) } - err = e.Stream(remoteclient.StreamOptions{ + err = e.StreamWithContext(context.Background(), remoteclient.StreamOptions{ Stdin: streamIn, Stdout: streamOut, Stderr: streamErr, diff --git a/pkg/kubelet/cri/streaming/server_test.go b/pkg/kubelet/cri/streaming/server_test.go index 156c45d8626..9b88c3fdf07 100644 --- a/pkg/kubelet/cri/streaming/server_test.go +++ b/pkg/kubelet/cri/streaming/server_test.go @@ -17,6 +17,7 @@ limitations under the License. package streaming import ( + "context" "crypto/tls" "io" "net/http" @@ -355,7 +356,7 @@ func runRemoteCommandTest(t *testing.T, commandType string) { Stderr: stderrW, Tty: false, } - require.NoError(t, exec.Stream(opts)) + require.NoError(t, exec.StreamWithContext(context.Background(), opts)) }() go func() { diff --git a/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand.go b/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand.go index cb39faf7f1a..4e22a970c35 100644 --- a/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand.go +++ b/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand.go @@ -17,6 +17,7 @@ limitations under the License. package remotecommand import ( + "context" "fmt" "io" "net/http" @@ -26,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/util/httpstream" "k8s.io/apimachinery/pkg/util/remotecommand" + "k8s.io/apimachinery/pkg/util/runtime" restclient "k8s.io/client-go/rest" spdy "k8s.io/client-go/transport/spdy" ) @@ -43,11 +45,16 @@ type StreamOptions struct { // Executor is an interface for transporting shell-style streams. type Executor interface { - // Stream initiates the transport of the standard shell streams. It will transport any - // non-nil stream to a remote system, and return an error if a problem occurs. If tty - // is set, the stderr stream is not used (raw TTY manages stdout and stderr over the - // stdout stream). + // Deprecated: use StreamWithContext instead to avoid possible resource leaks. + // See https://github.com/kubernetes/kubernetes/pull/103177 for details. Stream(options StreamOptions) error + + // StreamWithContext initiates the transport of the standard shell streams. It will + // transport any non-nil stream to a remote system, and return an error if a problem + // occurs. If tty is set, the stderr stream is not used (raw TTY manages stdout and + // stderr over the stdout stream). + // The context controls the entire lifetime of stream execution. + StreamWithContext(ctx context.Context, options StreamOptions) error } type streamCreator interface { @@ -106,9 +113,14 @@ func NewSPDYExecutorForProtocols(transport http.RoundTripper, upgrader spdy.Upgr // Stream opens a protocol streamer to the server and streams until a client closes // the connection or the server disconnects. func (e *streamExecutor) Stream(options StreamOptions) error { - req, err := http.NewRequest(e.method, e.url.String(), nil) + return e.StreamWithContext(context.Background(), options) +} + +// newConnectionAndStream creates a new SPDY connection and a stream protocol handler upon it. +func (e *streamExecutor) newConnectionAndStream(ctx context.Context, options StreamOptions) (httpstream.Connection, streamProtocolHandler, error) { + req, err := http.NewRequestWithContext(ctx, e.method, e.url.String(), nil) if err != nil { - return fmt.Errorf("error creating request: %v", err) + return nil, nil, fmt.Errorf("error creating request: %v", err) } conn, protocol, err := spdy.Negotiate( @@ -118,9 +130,8 @@ func (e *streamExecutor) Stream(options StreamOptions) error { e.protocols..., ) if err != nil { - return err + return nil, nil, err } - defer conn.Close() var streamer streamProtocolHandler @@ -138,5 +149,29 @@ func (e *streamExecutor) Stream(options StreamOptions) error { streamer = newStreamProtocolV1(options) } - return streamer.stream(conn) + return conn, streamer, nil +} + +// StreamWithContext opens a protocol streamer to the server and streams until a client closes +// the connection or the server disconnects or the context is done. +func (e *streamExecutor) StreamWithContext(ctx context.Context, options StreamOptions) error { + conn, streamer, err := e.newConnectionAndStream(ctx, options) + if err != nil { + return err + } + defer conn.Close() + + errorChan := make(chan error, 1) + go func() { + defer runtime.HandleCrash() + defer close(errorChan) + errorChan <- streamer.stream(conn) + }() + + select { + case err := <-errorChan: + return err + case <-ctx.Done(): + return ctx.Err() + } } diff --git a/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand_test.go b/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand_test.go index 7eec4565ed1..2ebdd3ace51 100644 --- a/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand_test.go +++ b/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand_test.go @@ -17,9 +17,17 @@ limitations under the License. package remotecommand import ( + "context" "encoding/json" "errors" "io" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "testing" + "time" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -28,12 +36,6 @@ import ( remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/rest" - "net/http" - "net/http/httptest" - "net/url" - "strings" - "testing" - "time" ) type AttachFunc func(in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan TerminalSize) error @@ -50,6 +52,17 @@ type streamAndReply struct { replySent <-chan struct{} } +type fakeEmptyDataPty struct { +} + +func (s *fakeEmptyDataPty) Read(p []byte) (int, error) { + return len(p), nil +} + +func (s *fakeEmptyDataPty) Write(p []byte) (int, error) { + return len(p), nil +} + type fakeMassiveDataPty struct{} func (s *fakeMassiveDataPty) Read(p []byte) (int, error) { @@ -107,6 +120,7 @@ func writeMassiveData(stdStream io.Writer) struct{} { // write to stdin or stdou func TestSPDYExecutorStream(t *testing.T) { tests := []struct { + timeout time.Duration name string options StreamOptions expectError string @@ -130,12 +144,31 @@ func TestSPDYExecutorStream(t *testing.T) { expectError: "", attacher: fakeMassiveDataAttacher, }, + { + timeout: 500 * time.Millisecond, + name: "timeoutTest", + options: StreamOptions{ + Stdin: &fakeMassiveDataPty{}, + Stderr: &fakeMassiveDataPty{}, + }, + expectError: context.DeadlineExceeded.Error(), + attacher: fakeMassiveDataAttacher, + }, } for _, test := range tests { server := newTestHTTPServer(test.attacher, &test.options) - err := attach2Server(server.URL, test.options) + ctx, cancelFn := context.Background(), func() {} + if test.timeout > 0 { + ctx, cancelFn = context.WithTimeout(ctx, test.timeout) + } + + err := func(ctx context.Context, cancel context.CancelFunc) error { + defer cancelFn() + return attach2Server(ctx, server.URL, test.options) + }(ctx, cancelFn) + gotError := "" if err != nil { gotError = err.Error() @@ -170,16 +203,16 @@ func newTestHTTPServer(f AttachFunc, options *StreamOptions) *httptest.Server { return server } -func attach2Server(rawURL string, options StreamOptions) error { +func attach2Server(ctx context.Context, rawURL string, options StreamOptions) error { uri, _ := url.Parse(rawURL) exec, err := NewSPDYExecutor(&rest.Config{Host: uri.Host}, "POST", uri) if err != nil { return err } - e := make(chan error) + e := make(chan error, 1) go func(e chan error) { - e <- exec.Stream(options) + e <- exec.StreamWithContext(ctx, options) }(e) select { case err := <-e: @@ -263,3 +296,74 @@ func v4WriteStatusFunc(stream io.Writer) func(status *apierrors.StatusError) err return err } } + +// writeDetector provides a helper method to block until the underlying writer written. +type writeDetector struct { + written chan bool + closed bool + io.Writer +} + +func newWriterDetector(w io.Writer) *writeDetector { + return &writeDetector{ + written: make(chan bool), + Writer: w, + } +} + +func (w *writeDetector) BlockUntilWritten() { + <-w.written +} + +func (w *writeDetector) Write(p []byte) (n int, err error) { + if !w.closed { + close(w.written) + w.closed = true + } + return w.Writer.Write(p) +} + +// `Executor.StreamWithContext` starts a goroutine in the background to do the streaming +// and expects the deferred close of the connection leads to the exit of the goroutine on cancellation. +// This test verifies that works. +func TestStreamExitsAfterConnectionIsClosed(t *testing.T) { + writeDetector := newWriterDetector(&fakeEmptyDataPty{}) + options := StreamOptions{ + Stdin: &fakeEmptyDataPty{}, + Stdout: writeDetector, + } + server := newTestHTTPServer(fakeMassiveDataAttacher, &options) + + ctx, cancelFn := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancelFn() + + uri, _ := url.Parse(server.URL) + exec, err := NewSPDYExecutor(&rest.Config{Host: uri.Host}, "POST", uri) + if err != nil { + t.Fatal(err) + } + streamExec := exec.(*streamExecutor) + + conn, streamer, err := streamExec.newConnectionAndStream(ctx, options) + if err != nil { + t.Fatal(err) + } + + errorChan := make(chan error) + go func() { + errorChan <- streamer.stream(conn) + }() + + // Wait until stream goroutine starts. + writeDetector.BlockUntilWritten() + + // Close the connection + conn.Close() + + select { + case <-time.After(1 * time.Second): + t.Fatalf("expect stream to be closed after connection is closed.") + case <-errorChan: + return + } +} diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/attach/attach.go b/staging/src/k8s.io/kubectl/pkg/cmd/attach/attach.go index 668524ab659..e403b5208f5 100644 --- a/staging/src/k8s.io/kubectl/pkg/cmd/attach/attach.go +++ b/staging/src/k8s.io/kubectl/pkg/cmd/attach/attach.go @@ -17,6 +17,7 @@ limitations under the License. package attach import ( + "context" "fmt" "io" "net/url" @@ -159,7 +160,7 @@ func (*DefaultRemoteAttach) Attach(method string, url *url.URL, config *restclie if err != nil { return err } - return exec.Stream(remotecommand.StreamOptions{ + return exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{ Stdin: stdin, Stdout: stdout, Stderr: stderr, diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/exec/exec.go b/staging/src/k8s.io/kubectl/pkg/cmd/exec/exec.go index f3be195db98..ff7a3750d76 100644 --- a/staging/src/k8s.io/kubectl/pkg/cmd/exec/exec.go +++ b/staging/src/k8s.io/kubectl/pkg/cmd/exec/exec.go @@ -122,7 +122,7 @@ func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *restc if err != nil { return err } - return exec.Stream(remotecommand.StreamOptions{ + return exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{ Stdin: stdin, Stdout: stdout, Stderr: stderr, diff --git a/test/e2e/framework/pod/exec_util.go b/test/e2e/framework/pod/exec_util.go index 1e3438c2b6d..a88aee2d7dd 100644 --- a/test/e2e/framework/pod/exec_util.go +++ b/test/e2e/framework/pod/exec_util.go @@ -143,7 +143,7 @@ func execute(method string, url *url.URL, config *restclient.Config, stdin io.Re if err != nil { return err } - return exec.Stream(remotecommand.StreamOptions{ + return exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{ Stdin: stdin, Stdout: stdout, Stderr: stderr, From 86e5d069ecece8591ad693b50bcea49000f6df26 Mon Sep 17 00:00:00 2001 From: arkbriar Date: Mon, 10 Oct 2022 11:24:40 +0800 Subject: [PATCH 2/3] use subtests and defer in TestSPDYExecutorStream Signed-off-by: arkbriar --- .../tools/remotecommand/remotecommand_test.go | 36 +++++++++---------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand_test.go b/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand_test.go index 2ebdd3ace51..9144a14526c 100644 --- a/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand_test.go +++ b/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand_test.go @@ -157,29 +157,27 @@ func TestSPDYExecutorStream(t *testing.T) { } for _, test := range tests { - server := newTestHTTPServer(test.attacher, &test.options) + t.Run(test.name, func(t *testing.T) { + server := newTestHTTPServer(test.attacher, &test.options) + defer server.Close() - ctx, cancelFn := context.Background(), func() {} - if test.timeout > 0 { - ctx, cancelFn = context.WithTimeout(ctx, test.timeout) - } + ctx, cancel := context.Background(), func() {} + if test.timeout > 0 { + ctx, cancel = context.WithTimeout(ctx, test.timeout) + } + defer cancel() - err := func(ctx context.Context, cancel context.CancelFunc) error { - defer cancelFn() - return attach2Server(ctx, server.URL, test.options) - }(ctx, cancelFn) + err := attach2Server(ctx, server.URL, test.options) - gotError := "" - if err != nil { - gotError = err.Error() - } - if test.expectError != gotError { - t.Errorf("%s: expected [%v], got [%v]", test.name, test.expectError, gotError) - } - - server.Close() + gotError := "" + if err != nil { + gotError = err.Error() + } + if test.expectError != gotError { + t.Errorf("%s: expected [%v], got [%v]", test.name, test.expectError, gotError) + } + }) } - } func newTestHTTPServer(f AttachFunc, options *StreamOptions) *httptest.Server { From b7e6c23e9f73e4cc0209e94fe95c5e2809998bf6 Mon Sep 17 00:00:00 2001 From: arkbriar Date: Wed, 2 Nov 2022 11:36:22 +0800 Subject: [PATCH 3/3] Propagate the panic with a channel Signed-off-by: arkbriar --- .../client-go/tools/remotecommand/remotecommand.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand.go b/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand.go index 4e22a970c35..662a3cb4ac7 100644 --- a/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand.go +++ b/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand.go @@ -27,9 +27,8 @@ import ( "k8s.io/apimachinery/pkg/util/httpstream" "k8s.io/apimachinery/pkg/util/remotecommand" - "k8s.io/apimachinery/pkg/util/runtime" restclient "k8s.io/client-go/rest" - spdy "k8s.io/client-go/transport/spdy" + "k8s.io/client-go/transport/spdy" ) // StreamOptions holds information pertaining to the current streaming session: @@ -161,14 +160,20 @@ func (e *streamExecutor) StreamWithContext(ctx context.Context, options StreamOp } defer conn.Close() + panicChan := make(chan any, 1) errorChan := make(chan error, 1) go func() { - defer runtime.HandleCrash() - defer close(errorChan) + defer func() { + if p := recover(); p != nil { + panicChan <- p + } + }() errorChan <- streamer.stream(conn) }() select { + case p := <-panicChan: + panic(p) case err := <-errorChan: return err case <-ctx.Done():