Support cancelable SPDY executor stream

Mark remotecommand.Executor as deprecated and related modifications.

Handle crash when streamer.stream panics

Add a test to verify if stream is closed after connection being closed

Remove blank line and update waiting time to 1s to avoid test flakes in CI.

Refine the tests of StreamExecutor according to comments.

Remove the comment of context controlling the negotiation progress and misc.

Signed-off-by: arkbriar <arkbriar@gmail.com>
This commit is contained in:
arkbriar 2022-08-24 10:21:35 +08:00
parent 575031b68f
commit 42808c8343
7 changed files with 166 additions and 24 deletions

View File

@ -18,6 +18,7 @@ package tests
import ( import (
"bytes" "bytes"
"context"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -252,7 +253,7 @@ func TestStream(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("%s: unexpected error: %v", name, err) t.Fatalf("%s: unexpected error: %v", name, err)
} }
err = e.Stream(remoteclient.StreamOptions{ err = e.StreamWithContext(context.Background(), remoteclient.StreamOptions{
Stdin: streamIn, Stdin: streamIn,
Stdout: streamOut, Stdout: streamOut,
Stderr: streamErr, Stderr: streamErr,

View File

@ -17,6 +17,7 @@ limitations under the License.
package streaming package streaming
import ( import (
"context"
"crypto/tls" "crypto/tls"
"io" "io"
"net/http" "net/http"
@ -355,7 +356,7 @@ func runRemoteCommandTest(t *testing.T, commandType string) {
Stderr: stderrW, Stderr: stderrW,
Tty: false, Tty: false,
} }
require.NoError(t, exec.Stream(opts)) require.NoError(t, exec.StreamWithContext(context.Background(), opts))
}() }()
go func() { go func() {

View File

@ -17,6 +17,7 @@ limitations under the License.
package remotecommand package remotecommand
import ( import (
"context"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
@ -26,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/util/httpstream" "k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/remotecommand" "k8s.io/apimachinery/pkg/util/remotecommand"
"k8s.io/apimachinery/pkg/util/runtime"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
spdy "k8s.io/client-go/transport/spdy" spdy "k8s.io/client-go/transport/spdy"
) )
@ -43,11 +45,16 @@ type StreamOptions struct {
// Executor is an interface for transporting shell-style streams. // Executor is an interface for transporting shell-style streams.
type Executor interface { type Executor interface {
// Stream initiates the transport of the standard shell streams. It will transport any // Deprecated: use StreamWithContext instead to avoid possible resource leaks.
// non-nil stream to a remote system, and return an error if a problem occurs. If tty // See https://github.com/kubernetes/kubernetes/pull/103177 for details.
// is set, the stderr stream is not used (raw TTY manages stdout and stderr over the
// stdout stream).
Stream(options StreamOptions) error Stream(options StreamOptions) error
// StreamWithContext initiates the transport of the standard shell streams. It will
// transport any non-nil stream to a remote system, and return an error if a problem
// occurs. If tty is set, the stderr stream is not used (raw TTY manages stdout and
// stderr over the stdout stream).
// The context controls the entire lifetime of stream execution.
StreamWithContext(ctx context.Context, options StreamOptions) error
} }
type streamCreator interface { type streamCreator interface {
@ -106,9 +113,14 @@ func NewSPDYExecutorForProtocols(transport http.RoundTripper, upgrader spdy.Upgr
// Stream opens a protocol streamer to the server and streams until a client closes // Stream opens a protocol streamer to the server and streams until a client closes
// the connection or the server disconnects. // the connection or the server disconnects.
func (e *streamExecutor) Stream(options StreamOptions) error { func (e *streamExecutor) Stream(options StreamOptions) error {
req, err := http.NewRequest(e.method, e.url.String(), nil) return e.StreamWithContext(context.Background(), options)
}
// newConnectionAndStream creates a new SPDY connection and a stream protocol handler upon it.
func (e *streamExecutor) newConnectionAndStream(ctx context.Context, options StreamOptions) (httpstream.Connection, streamProtocolHandler, error) {
req, err := http.NewRequestWithContext(ctx, e.method, e.url.String(), nil)
if err != nil { if err != nil {
return fmt.Errorf("error creating request: %v", err) return nil, nil, fmt.Errorf("error creating request: %v", err)
} }
conn, protocol, err := spdy.Negotiate( conn, protocol, err := spdy.Negotiate(
@ -118,9 +130,8 @@ func (e *streamExecutor) Stream(options StreamOptions) error {
e.protocols..., e.protocols...,
) )
if err != nil { if err != nil {
return err return nil, nil, err
} }
defer conn.Close()
var streamer streamProtocolHandler var streamer streamProtocolHandler
@ -138,5 +149,29 @@ func (e *streamExecutor) Stream(options StreamOptions) error {
streamer = newStreamProtocolV1(options) streamer = newStreamProtocolV1(options)
} }
return streamer.stream(conn) return conn, streamer, nil
}
// StreamWithContext opens a protocol streamer to the server and streams until a client closes
// the connection or the server disconnects or the context is done.
func (e *streamExecutor) StreamWithContext(ctx context.Context, options StreamOptions) error {
conn, streamer, err := e.newConnectionAndStream(ctx, options)
if err != nil {
return err
}
defer conn.Close()
errorChan := make(chan error, 1)
go func() {
defer runtime.HandleCrash()
defer close(errorChan)
errorChan <- streamer.stream(conn)
}()
select {
case err := <-errorChan:
return err
case <-ctx.Done():
return ctx.Err()
}
} }

View File

@ -17,9 +17,17 @@ limitations under the License.
package remotecommand package remotecommand
import ( import (
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"io" "io"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"time"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -28,12 +36,6 @@ import (
remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand" remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"time"
) )
type AttachFunc func(in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan TerminalSize) error type AttachFunc func(in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan TerminalSize) error
@ -50,6 +52,17 @@ type streamAndReply struct {
replySent <-chan struct{} replySent <-chan struct{}
} }
type fakeEmptyDataPty struct {
}
func (s *fakeEmptyDataPty) Read(p []byte) (int, error) {
return len(p), nil
}
func (s *fakeEmptyDataPty) Write(p []byte) (int, error) {
return len(p), nil
}
type fakeMassiveDataPty struct{} type fakeMassiveDataPty struct{}
func (s *fakeMassiveDataPty) Read(p []byte) (int, error) { func (s *fakeMassiveDataPty) Read(p []byte) (int, error) {
@ -107,6 +120,7 @@ func writeMassiveData(stdStream io.Writer) struct{} { // write to stdin or stdou
func TestSPDYExecutorStream(t *testing.T) { func TestSPDYExecutorStream(t *testing.T) {
tests := []struct { tests := []struct {
timeout time.Duration
name string name string
options StreamOptions options StreamOptions
expectError string expectError string
@ -130,12 +144,31 @@ func TestSPDYExecutorStream(t *testing.T) {
expectError: "", expectError: "",
attacher: fakeMassiveDataAttacher, attacher: fakeMassiveDataAttacher,
}, },
{
timeout: 500 * time.Millisecond,
name: "timeoutTest",
options: StreamOptions{
Stdin: &fakeMassiveDataPty{},
Stderr: &fakeMassiveDataPty{},
},
expectError: context.DeadlineExceeded.Error(),
attacher: fakeMassiveDataAttacher,
},
} }
for _, test := range tests { for _, test := range tests {
server := newTestHTTPServer(test.attacher, &test.options) server := newTestHTTPServer(test.attacher, &test.options)
err := attach2Server(server.URL, test.options) ctx, cancelFn := context.Background(), func() {}
if test.timeout > 0 {
ctx, cancelFn = context.WithTimeout(ctx, test.timeout)
}
err := func(ctx context.Context, cancel context.CancelFunc) error {
defer cancelFn()
return attach2Server(ctx, server.URL, test.options)
}(ctx, cancelFn)
gotError := "" gotError := ""
if err != nil { if err != nil {
gotError = err.Error() gotError = err.Error()
@ -170,16 +203,16 @@ func newTestHTTPServer(f AttachFunc, options *StreamOptions) *httptest.Server {
return server return server
} }
func attach2Server(rawURL string, options StreamOptions) error { func attach2Server(ctx context.Context, rawURL string, options StreamOptions) error {
uri, _ := url.Parse(rawURL) uri, _ := url.Parse(rawURL)
exec, err := NewSPDYExecutor(&rest.Config{Host: uri.Host}, "POST", uri) exec, err := NewSPDYExecutor(&rest.Config{Host: uri.Host}, "POST", uri)
if err != nil { if err != nil {
return err return err
} }
e := make(chan error) e := make(chan error, 1)
go func(e chan error) { go func(e chan error) {
e <- exec.Stream(options) e <- exec.StreamWithContext(ctx, options)
}(e) }(e)
select { select {
case err := <-e: case err := <-e:
@ -263,3 +296,74 @@ func v4WriteStatusFunc(stream io.Writer) func(status *apierrors.StatusError) err
return err return err
} }
} }
// writeDetector provides a helper method to block until the underlying writer written.
type writeDetector struct {
written chan bool
closed bool
io.Writer
}
func newWriterDetector(w io.Writer) *writeDetector {
return &writeDetector{
written: make(chan bool),
Writer: w,
}
}
func (w *writeDetector) BlockUntilWritten() {
<-w.written
}
func (w *writeDetector) Write(p []byte) (n int, err error) {
if !w.closed {
close(w.written)
w.closed = true
}
return w.Writer.Write(p)
}
// `Executor.StreamWithContext` starts a goroutine in the background to do the streaming
// and expects the deferred close of the connection leads to the exit of the goroutine on cancellation.
// This test verifies that works.
func TestStreamExitsAfterConnectionIsClosed(t *testing.T) {
writeDetector := newWriterDetector(&fakeEmptyDataPty{})
options := StreamOptions{
Stdin: &fakeEmptyDataPty{},
Stdout: writeDetector,
}
server := newTestHTTPServer(fakeMassiveDataAttacher, &options)
ctx, cancelFn := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancelFn()
uri, _ := url.Parse(server.URL)
exec, err := NewSPDYExecutor(&rest.Config{Host: uri.Host}, "POST", uri)
if err != nil {
t.Fatal(err)
}
streamExec := exec.(*streamExecutor)
conn, streamer, err := streamExec.newConnectionAndStream(ctx, options)
if err != nil {
t.Fatal(err)
}
errorChan := make(chan error)
go func() {
errorChan <- streamer.stream(conn)
}()
// Wait until stream goroutine starts.
writeDetector.BlockUntilWritten()
// Close the connection
conn.Close()
select {
case <-time.After(1 * time.Second):
t.Fatalf("expect stream to be closed after connection is closed.")
case <-errorChan:
return
}
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package attach package attach
import ( import (
"context"
"fmt" "fmt"
"io" "io"
"net/url" "net/url"
@ -159,7 +160,7 @@ func (*DefaultRemoteAttach) Attach(method string, url *url.URL, config *restclie
if err != nil { if err != nil {
return err return err
} }
return exec.Stream(remotecommand.StreamOptions{ return exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{
Stdin: stdin, Stdin: stdin,
Stdout: stdout, Stdout: stdout,
Stderr: stderr, Stderr: stderr,

View File

@ -122,7 +122,7 @@ func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *restc
if err != nil { if err != nil {
return err return err
} }
return exec.Stream(remotecommand.StreamOptions{ return exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{
Stdin: stdin, Stdin: stdin,
Stdout: stdout, Stdout: stdout,
Stderr: stderr, Stderr: stderr,

View File

@ -143,7 +143,7 @@ func execute(method string, url *url.URL, config *restclient.Config, stdin io.Re
if err != nil { if err != nil {
return err return err
} }
return exec.Stream(remotecommand.StreamOptions{ return exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{
Stdin: stdin, Stdin: stdin,
Stdout: stdout, Stdout: stdout,
Stderr: stderr, Stderr: stderr,