diff --git a/pkg/client/tests/portfoward_test.go b/pkg/client/tests/portfoward_test.go index 35d633f8008..d6a1644f159 100644 --- a/pkg/client/tests/portfoward_test.go +++ b/pkg/client/tests/portfoward_test.go @@ -132,7 +132,7 @@ func TestForwardPorts(t *testing.T) { server := httptest.NewServer(fakePortForwardServer(t, testName, test.serverSends, test.clientSends)) url, _ := url.Parse(server.URL) - exec, err := remotecommand.NewExecutor(&restclient.Config{}, "POST", url) + exec, err := remotecommand.NewSPDYExecutor(&restclient.Config{}, "POST", url) if err != nil { t.Fatal(err) } @@ -202,7 +202,7 @@ func TestForwardPortsReturnsErrorWhenAllBindsFailed(t *testing.T) { defer server.Close() url, _ := url.Parse(server.URL) - exec, err := remotecommand.NewExecutor(&restclient.Config{}, "POST", url) + exec, err := remotecommand.NewSPDYExecutor(&restclient.Config{}, "POST", url) if err != nil { t.Fatal(err) } diff --git a/pkg/client/tests/remotecommand_test.go b/pkg/client/tests/remotecommand_test.go index cd89b81e0fa..49d4258fbd3 100644 --- a/pkg/client/tests/remotecommand_test.go +++ b/pkg/client/tests/remotecommand_test.go @@ -255,7 +255,7 @@ func TestStream(t *testing.T) { conf := &restclient.Config{ Host: server.URL, } - e, err := remoteclient.NewExecutor(conf, "POST", req.URL()) + e, err := remoteclient.NewSPDYExecutor(conf, "POST", req.URL()) if err != nil { t.Errorf("%s: unexpected error: %v", name, err) continue @@ -352,7 +352,7 @@ func TestDial(t *testing.T) { called = true return rt } - exec, err := remoteclient.NewStreamExecutor(upgrader, testFn, "POST", &url.URL{Host: "something.com", Scheme: "https"}) + exec, err := newStreamExecutor(upgrader, testFn, "POST", &url.URL{Host: "something.com", Scheme: "https"}) if err != nil { t.Fatal(err) } @@ -368,3 +368,20 @@ func TestDial(t *testing.T) { } _ = protocol } + +// newStreamExecutor upgrades the request so that it supports multiplexed bidirectional +// streams. This method takes a stream upgrader and an optional function that is invoked +// to wrap the round tripper. This method may be used by clients that are lower level than +// Kubernetes clients or need to provide their own upgrade round tripper. +func newStreamExecutor(upgrader httpstream.UpgradeRoundTripper, fn func(http.RoundTripper) http.RoundTripper, method string, url *url.URL) (StreamExecutor, error) { + rt := http.RoundTripper(upgrader) + if fn != nil { + rt = fn(rt) + } + return &streamExecutor{ + upgrader: upgrader, + transport: rt, + method: method, + url: url, + }, nil +} diff --git a/pkg/kubectl/cmd/BUILD b/pkg/kubectl/cmd/BUILD index c2198c2973d..dd04a7aba00 100644 --- a/pkg/kubectl/cmd/BUILD +++ b/pkg/kubectl/cmd/BUILD @@ -132,7 +132,6 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/util/json:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/jsonmergepatch:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/mergepatch:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/remotecommand:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/validation:go_default_library", diff --git a/pkg/kubectl/cmd/attach.go b/pkg/kubectl/cmd/attach.go index 8917b275796..d6f53884120 100644 --- a/pkg/kubectl/cmd/attach.go +++ b/pkg/kubectl/cmd/attach.go @@ -28,7 +28,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilerrors "k8s.io/apimachinery/pkg/util/errors" - remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/remotecommand" "k8s.io/kubernetes/pkg/api" @@ -97,17 +96,16 @@ type RemoteAttach interface { type DefaultRemoteAttach struct{} func (*DefaultRemoteAttach) Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error { - exec, err := remotecommand.NewExecutor(config, method, url) + exec, err := remotecommand.NewSPDYExecutor(config, method, url) if err != nil { return err } return exec.Stream(remotecommand.StreamOptions{ - SupportedProtocols: remotecommandconsts.SupportedStreamingProtocols, - Stdin: stdin, - Stdout: stdout, - Stderr: stderr, - Tty: tty, - TerminalSizeQueue: terminalSizeQueue, + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + Tty: tty, + TerminalSizeQueue: terminalSizeQueue, }) } diff --git a/pkg/kubectl/cmd/exec.go b/pkg/kubectl/cmd/exec.go index 3ca48d9a689..76522bdf453 100644 --- a/pkg/kubectl/cmd/exec.go +++ b/pkg/kubectl/cmd/exec.go @@ -25,7 +25,6 @@ import ( "github.com/spf13/cobra" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/remotecommand" "k8s.io/kubernetes/pkg/api" @@ -101,17 +100,16 @@ type RemoteExecutor interface { type DefaultRemoteExecutor struct{} func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error { - exec, err := remotecommand.NewExecutor(config, method, url) + exec, err := remotecommand.NewSPDYExecutor(config, method, url) if err != nil { return err } return exec.Stream(remotecommand.StreamOptions{ - SupportedProtocols: remotecommandconsts.SupportedStreamingProtocols, - Stdin: stdin, - Stdout: stdout, - Stderr: stderr, - Tty: tty, - TerminalSizeQueue: terminalSizeQueue, + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + Tty: tty, + TerminalSizeQueue: terminalSizeQueue, }) } diff --git a/pkg/kubectl/cmd/portforward.go b/pkg/kubectl/cmd/portforward.go index 8e9e15e9736..d53246e86c0 100644 --- a/pkg/kubectl/cmd/portforward.go +++ b/pkg/kubectl/cmd/portforward.go @@ -19,6 +19,7 @@ package cmd import ( "fmt" "io" + "net/http" "net/url" "os" "os/signal" @@ -102,7 +103,11 @@ type defaultPortForwarder struct { } func (f *defaultPortForwarder) ForwardPorts(method string, url *url.URL, opts PortForwardOptions) error { - dialer, err := remotecommand.NewExecutor(opts.Config, method, url) + transport, upgrader, err := remotecommand.SPDYRoundTripperFor(opts.Config) + if err != nil { + return err + } + dialer, err := remotecommand.NewSPDYDialer(upgrader, &http.Client{Transport: transport}, method, url) if err != nil { return err } diff --git a/pkg/kubelet/server/streaming/server_test.go b/pkg/kubelet/server/streaming/server_test.go index 806ba01c986..b6527dfb0c5 100644 --- a/pkg/kubelet/server/streaming/server_test.go +++ b/pkg/kubelet/server/streaming/server_test.go @@ -237,7 +237,7 @@ func TestServePortForward(t *testing.T) { reqURL, err := url.Parse(resp.Url) require.NoError(t, err) - exec, err := remotecommand.NewExecutor(&restclient.Config{}, "POST", reqURL) + exec, err := remotecommand.NewSPDYExecutor(&restclient.Config{}, "POST", reqURL) require.NoError(t, err) streamConn, _, err := exec.Dial(kubeletportforward.ProtocolV1Name) require.NoError(t, err) @@ -297,7 +297,7 @@ func runRemoteCommandTest(t *testing.T, commandType string) { go func() { defer wg.Done() - exec, err := remotecommand.NewExecutor(&restclient.Config{}, "POST", reqURL) + exec, err := remotecommand.NewSPDYExecutor(&restclient.Config{}, "POST", reqURL) require.NoError(t, err) opts := remotecommand.StreamOptions{ diff --git a/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand.go b/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand.go index a90fab1fe45..b3d6ad0af74 100644 --- a/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand.go +++ b/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand.go @@ -35,12 +35,11 @@ import ( // protocols, input/output streams, if the client is requesting a TTY, and a terminal size queue to // support terminal resizing. type StreamOptions struct { - SupportedProtocols []string - Stdin io.Reader - Stdout io.Writer - Stderr io.Writer - Tty bool - TerminalSizeQueue TerminalSizeQueue + Stdin io.Reader + Stdout io.Writer + Stderr io.Writer + Tty bool + TerminalSizeQueue TerminalSizeQueue } // Executor is an interface for transporting shell-style streams. @@ -52,93 +51,10 @@ type Executor interface { Stream(options StreamOptions) error } -// StreamExecutor supports the ability to dial an httpstream connection and the ability to -// run a command line stream protocol over that dialer. -type StreamExecutor interface { - Executor - httpstream.Dialer -} - -// streamExecutor handles transporting standard shell streams over an httpstream connection. -type streamExecutor struct { - upgrader httpstream.UpgradeRoundTripper - transport http.RoundTripper - - method string - url *url.URL -} - -// NewExecutor connects to the provided server and upgrades the connection to -// multiplexed bidirectional streams. The current implementation uses SPDY, -// but this could be replaced with HTTP/2 once it's available, or something else. -// TODO: the common code between this and portforward could be abstracted. -func NewExecutor(config *restclient.Config, method string, url *url.URL) (StreamExecutor, error) { - tlsConfig, err := restclient.TLSConfigFor(config) - if err != nil { - return nil, err - } - - upgradeRoundTripper := spdy.NewRoundTripper(tlsConfig, true) - wrapper, err := restclient.HTTPWrappersForConfig(config, upgradeRoundTripper) - if err != nil { - return nil, err - } - - return &streamExecutor{ - upgrader: upgradeRoundTripper, - transport: wrapper, - method: method, - url: url, - }, nil -} - -// NewStreamExecutor upgrades the request so that it supports multiplexed bidirectional -// streams. This method takes a stream upgrader and an optional function that is invoked -// to wrap the round tripper. This method may be used by clients that are lower level than -// Kubernetes clients or need to provide their own upgrade round tripper. -func NewStreamExecutor(upgrader httpstream.UpgradeRoundTripper, fn func(http.RoundTripper) http.RoundTripper, method string, url *url.URL) (StreamExecutor, error) { - rt := http.RoundTripper(upgrader) - if fn != nil { - rt = fn(rt) - } - return &streamExecutor{ - upgrader: upgrader, - transport: rt, - method: method, - url: url, - }, nil -} - -// Dial opens a connection to a remote server and attempts to negotiate a SPDY -// connection. Upon success, it returns the connection and the protocol -// selected by the server. -func (e *streamExecutor) Dial(protocols ...string) (httpstream.Connection, string, error) { - rt := transport.DebugWrappers(e.transport) - - // TODO the client probably shouldn't be created here, as it doesn't allow - // flexibility to allow callers to configure it. - client := &http.Client{Transport: rt} - - req, err := http.NewRequest(e.method, e.url.String(), nil) - if err != nil { - return nil, "", fmt.Errorf("error creating request: %v", err) - } - for i := range protocols { - req.Header.Add(httpstream.HeaderProtocolVersion, protocols[i]) - } - - resp, err := client.Do(req) - if err != nil { - return nil, "", fmt.Errorf("error sending request: %v", err) - } - defer resp.Body.Close() - - conn, err := e.upgrader.NewConnection(resp) - if err != nil { - return nil, "", err - } - - return conn, resp.Header.Get(httpstream.HeaderProtocolVersion), nil +// SPDYUpgrader validates a response from the server after a SPDY upgrade. +type SPDYUpgrader interface { + // NewConnection validates the response and creates a new Connection. + NewConnection(resp *http.Response) (httpstream.Connection, error) } type streamCreator interface { @@ -149,10 +65,105 @@ type streamProtocolHandler interface { stream(conn streamCreator) error } +// streamExecutor handles transporting standard shell streams over an httpstream connection. +type streamExecutor struct { + upgrader SPDYUpgrader + transport http.RoundTripper + + method string + url *url.URL +} + +// NewSPDYExecutor connects to the provided server and upgrades the connection to +// multiplexed bidirectional streams. +func NewSPDYExecutor(config *restclient.Config, method string, url *url.URL) (Executor, error) { + wrapper, upgradeRoundTripper, err := SPDYRoundTripperFor(config) + if err != nil { + return nil, err + } + wrapper = transport.DebugWrappers(wrapper) + return &streamExecutor{ + upgrader: upgradeRoundTripper, + transport: wrapper, + method: method, + url: url, + }, nil +} + +type spdyDialer struct { + client *http.Client + upgrader SPDYUpgrader + method string + url *url.URL +} + +func NewSPDYDialer(upgrader SPDYUpgrader, client *http.Client, method string, url *url.URL) (httpstream.Dialer, error) { + return &spdyDialer{ + client: client, + upgrader: upgrader, + method: method, + url: url, + }, nil +} + +func (d *spdyDialer) Dial(protocols ...string) (httpstream.Connection, string, error) { + req, err := http.NewRequest(d.method, d.url.String(), nil) + if err != nil { + return nil, "", fmt.Errorf("error creating request: %v", err) + } + return NegotiateSPDYConnection(d.upgrader, d.client, req, protocols...) +} + +// SPDYRoundTripperFor returns a round tripper to use with SPDY. +func SPDYRoundTripperFor(config *restclient.Config) (http.RoundTripper, SPDYUpgrader, error) { + tlsConfig, err := restclient.TLSConfigFor(config) + if err != nil { + return nil, nil, err + } + upgradeRoundTripper := spdy.NewRoundTripper(tlsConfig, true) + wrapper, err := restclient.HTTPWrappersForConfig(config, upgradeRoundTripper) + if err != nil { + return nil, nil, err + } + return wrapper, upgradeRoundTripper, nil +} + +// NegotiateSPDYConnection opens a connection to a remote server and attempts to negotiate +// a SPDY connection. Upon success, it returns the connection and the protocol selected by +// the server. The client transport must use the upgradeRoundTripper - see SPDYRoundTripperFor. +func NegotiateSPDYConnection(upgrader SPDYUpgrader, client *http.Client, req *http.Request, protocols ...string) (httpstream.Connection, string, error) { + for i := range protocols { + req.Header.Add(httpstream.HeaderProtocolVersion, protocols[i]) + } + resp, err := client.Do(req) + if err != nil { + return nil, "", fmt.Errorf("error sending request: %v", err) + } + defer resp.Body.Close() + conn, err := upgrader.NewConnection(resp) + if err != nil { + return nil, "", err + } + return conn, resp.Header.Get(httpstream.HeaderProtocolVersion), nil +} + // Stream opens a protocol streamer to the server and streams until a client closes // the connection or the server disconnects. func (e *streamExecutor) Stream(options StreamOptions) error { - conn, protocol, err := e.Dial(options.SupportedProtocols...) + req, err := http.NewRequest(e.method, e.url.String(), nil) + if err != nil { + return fmt.Errorf("error creating request: %v", err) + } + + conn, protocol, err := NegotiateSPDYConnection( + e.upgrader, + &http.Client{Transport: e.transport}, + req, + remotecommand.StreamProtocolV4Name, + remotecommand.StreamProtocolV3Name, + remotecommand.StreamProtocolV2Name, + remotecommand.StreamProtocolV1Name, + ) if err != nil { return err } diff --git a/test/e2e/framework/BUILD b/test/e2e/framework/BUILD index e1b7222a209..a39dbe20254 100644 --- a/test/e2e/framework/BUILD +++ b/test/e2e/framework/BUILD @@ -123,7 +123,6 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/rand:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/remotecommand:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library", diff --git a/test/e2e/framework/exec_util.go b/test/e2e/framework/exec_util.go index 686f7cf718e..57779f4d466 100644 --- a/test/e2e/framework/exec_util.go +++ b/test/e2e/framework/exec_util.go @@ -24,7 +24,6 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - remocommandconsts "k8s.io/apimachinery/pkg/util/remotecommand" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/remotecommand" "k8s.io/kubernetes/pkg/api" @@ -135,15 +134,14 @@ func (f *Framework) ExecShellInPodWithFullOutput(podName string, cmd string) (st } func execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { - exec, err := remotecommand.NewExecutor(config, method, url) + exec, err := remotecommand.NewSPDYExecutor(config, method, url) if err != nil { return err } return exec.Stream(remotecommand.StreamOptions{ - SupportedProtocols: remocommandconsts.SupportedStreamingProtocols, - Stdin: stdin, - Stdout: stdout, - Stderr: stderr, - Tty: tty, + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + Tty: tty, }) }