From 5e0a53136c082fe052585e4fab25a5b96e68afc7 Mon Sep 17 00:00:00 2001 From: arkbriar Date: Wed, 24 Aug 2022 10:21:35 +0800 Subject: [PATCH 1/3] Support cancelable SPDY executor stream Mark remotecommand.Executor as deprecated and related modifications. Handle crash when streamer.stream panics Add a test to verify if stream is closed after connection being closed Remove blank line and update waiting time to 1s to avoid test flakes in CI. Refine the tests of StreamExecutor according to comments. Remove the comment of context controlling the negotiation progress and misc. Signed-off-by: arkbriar Kubernetes-commit: 42808c8343671e6783ba4c901dcd619bed648c3d --- tools/remotecommand/remotecommand.go | 53 +++++++-- tools/remotecommand/remotecommand_test.go | 124 ++++++++++++++++++++-- 2 files changed, 158 insertions(+), 19 deletions(-) diff --git a/tools/remotecommand/remotecommand.go b/tools/remotecommand/remotecommand.go index cb39faf7..4e22a970 100644 --- a/tools/remotecommand/remotecommand.go +++ b/tools/remotecommand/remotecommand.go @@ -17,6 +17,7 @@ limitations under the License. package remotecommand import ( + "context" "fmt" "io" "net/http" @@ -26,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/util/httpstream" "k8s.io/apimachinery/pkg/util/remotecommand" + "k8s.io/apimachinery/pkg/util/runtime" restclient "k8s.io/client-go/rest" spdy "k8s.io/client-go/transport/spdy" ) @@ -43,11 +45,16 @@ type StreamOptions struct { // Executor is an interface for transporting shell-style streams. type Executor interface { - // Stream initiates the transport of the standard shell streams. It will transport any - // non-nil stream to a remote system, and return an error if a problem occurs. If tty - // is set, the stderr stream is not used (raw TTY manages stdout and stderr over the - // stdout stream). + // Deprecated: use StreamWithContext instead to avoid possible resource leaks. + // See https://github.com/kubernetes/kubernetes/pull/103177 for details. Stream(options StreamOptions) error + + // StreamWithContext initiates the transport of the standard shell streams. It will + // transport any non-nil stream to a remote system, and return an error if a problem + // occurs. If tty is set, the stderr stream is not used (raw TTY manages stdout and + // stderr over the stdout stream). + // The context controls the entire lifetime of stream execution. + StreamWithContext(ctx context.Context, options StreamOptions) error } type streamCreator interface { @@ -106,9 +113,14 @@ func NewSPDYExecutorForProtocols(transport http.RoundTripper, upgrader spdy.Upgr // Stream opens a protocol streamer to the server and streams until a client closes // the connection or the server disconnects. func (e *streamExecutor) Stream(options StreamOptions) error { - req, err := http.NewRequest(e.method, e.url.String(), nil) + return e.StreamWithContext(context.Background(), options) +} + +// newConnectionAndStream creates a new SPDY connection and a stream protocol handler upon it. +func (e *streamExecutor) newConnectionAndStream(ctx context.Context, options StreamOptions) (httpstream.Connection, streamProtocolHandler, error) { + req, err := http.NewRequestWithContext(ctx, e.method, e.url.String(), nil) if err != nil { - return fmt.Errorf("error creating request: %v", err) + return nil, nil, fmt.Errorf("error creating request: %v", err) } conn, protocol, err := spdy.Negotiate( @@ -118,9 +130,8 @@ func (e *streamExecutor) Stream(options StreamOptions) error { e.protocols..., ) if err != nil { - return err + return nil, nil, err } - defer conn.Close() var streamer streamProtocolHandler @@ -138,5 +149,29 @@ func (e *streamExecutor) Stream(options StreamOptions) error { streamer = newStreamProtocolV1(options) } - return streamer.stream(conn) + return conn, streamer, nil +} + +// StreamWithContext opens a protocol streamer to the server and streams until a client closes +// the connection or the server disconnects or the context is done. +func (e *streamExecutor) StreamWithContext(ctx context.Context, options StreamOptions) error { + conn, streamer, err := e.newConnectionAndStream(ctx, options) + if err != nil { + return err + } + defer conn.Close() + + errorChan := make(chan error, 1) + go func() { + defer runtime.HandleCrash() + defer close(errorChan) + errorChan <- streamer.stream(conn) + }() + + select { + case err := <-errorChan: + return err + case <-ctx.Done(): + return ctx.Err() + } } diff --git a/tools/remotecommand/remotecommand_test.go b/tools/remotecommand/remotecommand_test.go index 7eec4565..2ebdd3ac 100644 --- a/tools/remotecommand/remotecommand_test.go +++ b/tools/remotecommand/remotecommand_test.go @@ -17,9 +17,17 @@ limitations under the License. package remotecommand import ( + "context" "encoding/json" "errors" "io" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "testing" + "time" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -28,12 +36,6 @@ import ( remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/rest" - "net/http" - "net/http/httptest" - "net/url" - "strings" - "testing" - "time" ) type AttachFunc func(in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan TerminalSize) error @@ -50,6 +52,17 @@ type streamAndReply struct { replySent <-chan struct{} } +type fakeEmptyDataPty struct { +} + +func (s *fakeEmptyDataPty) Read(p []byte) (int, error) { + return len(p), nil +} + +func (s *fakeEmptyDataPty) Write(p []byte) (int, error) { + return len(p), nil +} + type fakeMassiveDataPty struct{} func (s *fakeMassiveDataPty) Read(p []byte) (int, error) { @@ -107,6 +120,7 @@ func writeMassiveData(stdStream io.Writer) struct{} { // write to stdin or stdou func TestSPDYExecutorStream(t *testing.T) { tests := []struct { + timeout time.Duration name string options StreamOptions expectError string @@ -130,12 +144,31 @@ func TestSPDYExecutorStream(t *testing.T) { expectError: "", attacher: fakeMassiveDataAttacher, }, + { + timeout: 500 * time.Millisecond, + name: "timeoutTest", + options: StreamOptions{ + Stdin: &fakeMassiveDataPty{}, + Stderr: &fakeMassiveDataPty{}, + }, + expectError: context.DeadlineExceeded.Error(), + attacher: fakeMassiveDataAttacher, + }, } for _, test := range tests { server := newTestHTTPServer(test.attacher, &test.options) - err := attach2Server(server.URL, test.options) + ctx, cancelFn := context.Background(), func() {} + if test.timeout > 0 { + ctx, cancelFn = context.WithTimeout(ctx, test.timeout) + } + + err := func(ctx context.Context, cancel context.CancelFunc) error { + defer cancelFn() + return attach2Server(ctx, server.URL, test.options) + }(ctx, cancelFn) + gotError := "" if err != nil { gotError = err.Error() @@ -170,16 +203,16 @@ func newTestHTTPServer(f AttachFunc, options *StreamOptions) *httptest.Server { return server } -func attach2Server(rawURL string, options StreamOptions) error { +func attach2Server(ctx context.Context, rawURL string, options StreamOptions) error { uri, _ := url.Parse(rawURL) exec, err := NewSPDYExecutor(&rest.Config{Host: uri.Host}, "POST", uri) if err != nil { return err } - e := make(chan error) + e := make(chan error, 1) go func(e chan error) { - e <- exec.Stream(options) + e <- exec.StreamWithContext(ctx, options) }(e) select { case err := <-e: @@ -263,3 +296,74 @@ func v4WriteStatusFunc(stream io.Writer) func(status *apierrors.StatusError) err return err } } + +// writeDetector provides a helper method to block until the underlying writer written. +type writeDetector struct { + written chan bool + closed bool + io.Writer +} + +func newWriterDetector(w io.Writer) *writeDetector { + return &writeDetector{ + written: make(chan bool), + Writer: w, + } +} + +func (w *writeDetector) BlockUntilWritten() { + <-w.written +} + +func (w *writeDetector) Write(p []byte) (n int, err error) { + if !w.closed { + close(w.written) + w.closed = true + } + return w.Writer.Write(p) +} + +// `Executor.StreamWithContext` starts a goroutine in the background to do the streaming +// and expects the deferred close of the connection leads to the exit of the goroutine on cancellation. +// This test verifies that works. +func TestStreamExitsAfterConnectionIsClosed(t *testing.T) { + writeDetector := newWriterDetector(&fakeEmptyDataPty{}) + options := StreamOptions{ + Stdin: &fakeEmptyDataPty{}, + Stdout: writeDetector, + } + server := newTestHTTPServer(fakeMassiveDataAttacher, &options) + + ctx, cancelFn := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancelFn() + + uri, _ := url.Parse(server.URL) + exec, err := NewSPDYExecutor(&rest.Config{Host: uri.Host}, "POST", uri) + if err != nil { + t.Fatal(err) + } + streamExec := exec.(*streamExecutor) + + conn, streamer, err := streamExec.newConnectionAndStream(ctx, options) + if err != nil { + t.Fatal(err) + } + + errorChan := make(chan error) + go func() { + errorChan <- streamer.stream(conn) + }() + + // Wait until stream goroutine starts. + writeDetector.BlockUntilWritten() + + // Close the connection + conn.Close() + + select { + case <-time.After(1 * time.Second): + t.Fatalf("expect stream to be closed after connection is closed.") + case <-errorChan: + return + } +} From 2362c7b16225585d3f0dc1671f3a9cdfe2290982 Mon Sep 17 00:00:00 2001 From: arkbriar Date: Mon, 10 Oct 2022 11:24:40 +0800 Subject: [PATCH 2/3] use subtests and defer in TestSPDYExecutorStream Signed-off-by: arkbriar Kubernetes-commit: 86e5d069ecece8591ad693b50bcea49000f6df26 --- tools/remotecommand/remotecommand_test.go | 36 +++++++++++------------ 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/tools/remotecommand/remotecommand_test.go b/tools/remotecommand/remotecommand_test.go index 2ebdd3ac..9144a145 100644 --- a/tools/remotecommand/remotecommand_test.go +++ b/tools/remotecommand/remotecommand_test.go @@ -157,29 +157,27 @@ func TestSPDYExecutorStream(t *testing.T) { } for _, test := range tests { - server := newTestHTTPServer(test.attacher, &test.options) + t.Run(test.name, func(t *testing.T) { + server := newTestHTTPServer(test.attacher, &test.options) + defer server.Close() - ctx, cancelFn := context.Background(), func() {} - if test.timeout > 0 { - ctx, cancelFn = context.WithTimeout(ctx, test.timeout) - } + ctx, cancel := context.Background(), func() {} + if test.timeout > 0 { + ctx, cancel = context.WithTimeout(ctx, test.timeout) + } + defer cancel() - err := func(ctx context.Context, cancel context.CancelFunc) error { - defer cancelFn() - return attach2Server(ctx, server.URL, test.options) - }(ctx, cancelFn) + err := attach2Server(ctx, server.URL, test.options) - gotError := "" - if err != nil { - gotError = err.Error() - } - if test.expectError != gotError { - t.Errorf("%s: expected [%v], got [%v]", test.name, test.expectError, gotError) - } - - server.Close() + gotError := "" + if err != nil { + gotError = err.Error() + } + if test.expectError != gotError { + t.Errorf("%s: expected [%v], got [%v]", test.name, test.expectError, gotError) + } + }) } - } func newTestHTTPServer(f AttachFunc, options *StreamOptions) *httptest.Server { From 0563decd0ab8f2ce99b7eadef2e4b81557784cfb Mon Sep 17 00:00:00 2001 From: arkbriar Date: Wed, 2 Nov 2022 11:36:22 +0800 Subject: [PATCH 3/3] Propagate the panic with a channel Signed-off-by: arkbriar Kubernetes-commit: b7e6c23e9f73e4cc0209e94fe95c5e2809998bf6 --- tools/remotecommand/remotecommand.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/tools/remotecommand/remotecommand.go b/tools/remotecommand/remotecommand.go index 4e22a970..662a3cb4 100644 --- a/tools/remotecommand/remotecommand.go +++ b/tools/remotecommand/remotecommand.go @@ -27,9 +27,8 @@ import ( "k8s.io/apimachinery/pkg/util/httpstream" "k8s.io/apimachinery/pkg/util/remotecommand" - "k8s.io/apimachinery/pkg/util/runtime" restclient "k8s.io/client-go/rest" - spdy "k8s.io/client-go/transport/spdy" + "k8s.io/client-go/transport/spdy" ) // StreamOptions holds information pertaining to the current streaming session: @@ -161,14 +160,20 @@ func (e *streamExecutor) StreamWithContext(ctx context.Context, options StreamOp } defer conn.Close() + panicChan := make(chan any, 1) errorChan := make(chan error, 1) go func() { - defer runtime.HandleCrash() - defer close(errorChan) + defer func() { + if p := recover(); p != nil { + panicChan <- p + } + }() errorChan <- streamer.stream(conn) }() select { + case p := <-panicChan: + panic(p) case err := <-errorChan: return err case <-ctx.Done():