From 12c7874c0d88e9099ab2a29915d26751f0d23c2a Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Fri, 7 Jul 2017 17:54:34 -0400 Subject: [PATCH 1/2] Prepare to introduce websockets for exec and portforward Refactor the code in remotecommand to better represent the structure of what is common between portforward and exec. --- pkg/client/tests/portfoward_test.go | 4 +- pkg/client/tests/remotecommand_test.go | 21 +- pkg/kubectl/cmd/BUILD | 1 - pkg/kubectl/cmd/attach.go | 14 +- pkg/kubectl/cmd/exec.go | 14 +- pkg/kubectl/cmd/portforward.go | 7 +- pkg/kubelet/server/streaming/server_test.go | 4 +- .../tools/remotecommand/remotecommand.go | 199 +++++++++--------- test/e2e/framework/BUILD | 1 - test/e2e/framework/exec_util.go | 12 +- 10 files changed, 151 insertions(+), 126 deletions(-) diff --git a/pkg/client/tests/portfoward_test.go b/pkg/client/tests/portfoward_test.go index 35d633f8008..d6a1644f159 100644 --- a/pkg/client/tests/portfoward_test.go +++ b/pkg/client/tests/portfoward_test.go @@ -132,7 +132,7 @@ func TestForwardPorts(t *testing.T) { server := httptest.NewServer(fakePortForwardServer(t, testName, test.serverSends, test.clientSends)) url, _ := url.Parse(server.URL) - exec, err := remotecommand.NewExecutor(&restclient.Config{}, "POST", url) + exec, err := remotecommand.NewSPDYExecutor(&restclient.Config{}, "POST", url) if err != nil { t.Fatal(err) } @@ -202,7 +202,7 @@ func TestForwardPortsReturnsErrorWhenAllBindsFailed(t *testing.T) { defer server.Close() url, _ := url.Parse(server.URL) - exec, err := remotecommand.NewExecutor(&restclient.Config{}, "POST", url) + exec, err := remotecommand.NewSPDYExecutor(&restclient.Config{}, "POST", url) if err != nil { t.Fatal(err) } diff --git a/pkg/client/tests/remotecommand_test.go b/pkg/client/tests/remotecommand_test.go index cd89b81e0fa..49d4258fbd3 100644 --- a/pkg/client/tests/remotecommand_test.go +++ b/pkg/client/tests/remotecommand_test.go @@ -255,7 +255,7 @@ func TestStream(t *testing.T) { conf := &restclient.Config{ Host: server.URL, } - e, err := remoteclient.NewExecutor(conf, "POST", req.URL()) + e, err := remoteclient.NewSPDYExecutor(conf, "POST", req.URL()) if err != nil { t.Errorf("%s: unexpected error: %v", name, err) continue @@ -352,7 +352,7 @@ func TestDial(t *testing.T) { called = true return rt } - exec, err := remoteclient.NewStreamExecutor(upgrader, testFn, "POST", &url.URL{Host: "something.com", Scheme: "https"}) + exec, err := newStreamExecutor(upgrader, testFn, "POST", &url.URL{Host: "something.com", Scheme: "https"}) if err != nil { t.Fatal(err) } @@ -368,3 +368,20 @@ func TestDial(t *testing.T) { } _ = protocol } + +// newStreamExecutor upgrades the request so that it supports multiplexed bidirectional +// streams. This method takes a stream upgrader and an optional function that is invoked +// to wrap the round tripper. This method may be used by clients that are lower level than +// Kubernetes clients or need to provide their own upgrade round tripper. +func newStreamExecutor(upgrader httpstream.UpgradeRoundTripper, fn func(http.RoundTripper) http.RoundTripper, method string, url *url.URL) (StreamExecutor, error) { + rt := http.RoundTripper(upgrader) + if fn != nil { + rt = fn(rt) + } + return &streamExecutor{ + upgrader: upgrader, + transport: rt, + method: method, + url: url, + }, nil +} diff --git a/pkg/kubectl/cmd/BUILD b/pkg/kubectl/cmd/BUILD index c2198c2973d..dd04a7aba00 100644 --- a/pkg/kubectl/cmd/BUILD +++ b/pkg/kubectl/cmd/BUILD @@ -132,7 +132,6 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/util/json:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/jsonmergepatch:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/mergepatch:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/remotecommand:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/validation:go_default_library", diff --git a/pkg/kubectl/cmd/attach.go b/pkg/kubectl/cmd/attach.go index 8917b275796..d6f53884120 100644 --- a/pkg/kubectl/cmd/attach.go +++ b/pkg/kubectl/cmd/attach.go @@ -28,7 +28,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilerrors "k8s.io/apimachinery/pkg/util/errors" - remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/remotecommand" "k8s.io/kubernetes/pkg/api" @@ -97,17 +96,16 @@ type RemoteAttach interface { type DefaultRemoteAttach struct{} func (*DefaultRemoteAttach) Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error { - exec, err := remotecommand.NewExecutor(config, method, url) + exec, err := remotecommand.NewSPDYExecutor(config, method, url) if err != nil { return err } return exec.Stream(remotecommand.StreamOptions{ - SupportedProtocols: remotecommandconsts.SupportedStreamingProtocols, - Stdin: stdin, - Stdout: stdout, - Stderr: stderr, - Tty: tty, - TerminalSizeQueue: terminalSizeQueue, + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + Tty: tty, + TerminalSizeQueue: terminalSizeQueue, }) } diff --git a/pkg/kubectl/cmd/exec.go b/pkg/kubectl/cmd/exec.go index 3ca48d9a689..76522bdf453 100644 --- a/pkg/kubectl/cmd/exec.go +++ b/pkg/kubectl/cmd/exec.go @@ -25,7 +25,6 @@ import ( "github.com/spf13/cobra" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/remotecommand" "k8s.io/kubernetes/pkg/api" @@ -101,17 +100,16 @@ type RemoteExecutor interface { type DefaultRemoteExecutor struct{} func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error { - exec, err := remotecommand.NewExecutor(config, method, url) + exec, err := remotecommand.NewSPDYExecutor(config, method, url) if err != nil { return err } return exec.Stream(remotecommand.StreamOptions{ - SupportedProtocols: remotecommandconsts.SupportedStreamingProtocols, - Stdin: stdin, - Stdout: stdout, - Stderr: stderr, - Tty: tty, - TerminalSizeQueue: terminalSizeQueue, + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + Tty: tty, + TerminalSizeQueue: terminalSizeQueue, }) } diff --git a/pkg/kubectl/cmd/portforward.go b/pkg/kubectl/cmd/portforward.go index 8e9e15e9736..d53246e86c0 100644 --- a/pkg/kubectl/cmd/portforward.go +++ b/pkg/kubectl/cmd/portforward.go @@ -19,6 +19,7 @@ package cmd import ( "fmt" "io" + "net/http" "net/url" "os" "os/signal" @@ -102,7 +103,11 @@ type defaultPortForwarder struct { } func (f *defaultPortForwarder) ForwardPorts(method string, url *url.URL, opts PortForwardOptions) error { - dialer, err := remotecommand.NewExecutor(opts.Config, method, url) + transport, upgrader, err := remotecommand.SPDYRoundTripperFor(opts.Config) + if err != nil { + return err + } + dialer, err := remotecommand.NewSPDYDialer(upgrader, &http.Client{Transport: transport}, method, url) if err != nil { return err } diff --git a/pkg/kubelet/server/streaming/server_test.go b/pkg/kubelet/server/streaming/server_test.go index 806ba01c986..b6527dfb0c5 100644 --- a/pkg/kubelet/server/streaming/server_test.go +++ b/pkg/kubelet/server/streaming/server_test.go @@ -237,7 +237,7 @@ func TestServePortForward(t *testing.T) { reqURL, err := url.Parse(resp.Url) require.NoError(t, err) - exec, err := remotecommand.NewExecutor(&restclient.Config{}, "POST", reqURL) + exec, err := remotecommand.NewSPDYExecutor(&restclient.Config{}, "POST", reqURL) require.NoError(t, err) streamConn, _, err := exec.Dial(kubeletportforward.ProtocolV1Name) require.NoError(t, err) @@ -297,7 +297,7 @@ func runRemoteCommandTest(t *testing.T, commandType string) { go func() { defer wg.Done() - exec, err := remotecommand.NewExecutor(&restclient.Config{}, "POST", reqURL) + exec, err := remotecommand.NewSPDYExecutor(&restclient.Config{}, "POST", reqURL) require.NoError(t, err) opts := remotecommand.StreamOptions{ diff --git a/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand.go b/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand.go index a90fab1fe45..b3d6ad0af74 100644 --- a/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand.go +++ b/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand.go @@ -35,12 +35,11 @@ import ( // protocols, input/output streams, if the client is requesting a TTY, and a terminal size queue to // support terminal resizing. type StreamOptions struct { - SupportedProtocols []string - Stdin io.Reader - Stdout io.Writer - Stderr io.Writer - Tty bool - TerminalSizeQueue TerminalSizeQueue + Stdin io.Reader + Stdout io.Writer + Stderr io.Writer + Tty bool + TerminalSizeQueue TerminalSizeQueue } // Executor is an interface for transporting shell-style streams. @@ -52,93 +51,10 @@ type Executor interface { Stream(options StreamOptions) error } -// StreamExecutor supports the ability to dial an httpstream connection and the ability to -// run a command line stream protocol over that dialer. -type StreamExecutor interface { - Executor - httpstream.Dialer -} - -// streamExecutor handles transporting standard shell streams over an httpstream connection. -type streamExecutor struct { - upgrader httpstream.UpgradeRoundTripper - transport http.RoundTripper - - method string - url *url.URL -} - -// NewExecutor connects to the provided server and upgrades the connection to -// multiplexed bidirectional streams. The current implementation uses SPDY, -// but this could be replaced with HTTP/2 once it's available, or something else. -// TODO: the common code between this and portforward could be abstracted. -func NewExecutor(config *restclient.Config, method string, url *url.URL) (StreamExecutor, error) { - tlsConfig, err := restclient.TLSConfigFor(config) - if err != nil { - return nil, err - } - - upgradeRoundTripper := spdy.NewRoundTripper(tlsConfig, true) - wrapper, err := restclient.HTTPWrappersForConfig(config, upgradeRoundTripper) - if err != nil { - return nil, err - } - - return &streamExecutor{ - upgrader: upgradeRoundTripper, - transport: wrapper, - method: method, - url: url, - }, nil -} - -// NewStreamExecutor upgrades the request so that it supports multiplexed bidirectional -// streams. This method takes a stream upgrader and an optional function that is invoked -// to wrap the round tripper. This method may be used by clients that are lower level than -// Kubernetes clients or need to provide their own upgrade round tripper. -func NewStreamExecutor(upgrader httpstream.UpgradeRoundTripper, fn func(http.RoundTripper) http.RoundTripper, method string, url *url.URL) (StreamExecutor, error) { - rt := http.RoundTripper(upgrader) - if fn != nil { - rt = fn(rt) - } - return &streamExecutor{ - upgrader: upgrader, - transport: rt, - method: method, - url: url, - }, nil -} - -// Dial opens a connection to a remote server and attempts to negotiate a SPDY -// connection. Upon success, it returns the connection and the protocol -// selected by the server. -func (e *streamExecutor) Dial(protocols ...string) (httpstream.Connection, string, error) { - rt := transport.DebugWrappers(e.transport) - - // TODO the client probably shouldn't be created here, as it doesn't allow - // flexibility to allow callers to configure it. - client := &http.Client{Transport: rt} - - req, err := http.NewRequest(e.method, e.url.String(), nil) - if err != nil { - return nil, "", fmt.Errorf("error creating request: %v", err) - } - for i := range protocols { - req.Header.Add(httpstream.HeaderProtocolVersion, protocols[i]) - } - - resp, err := client.Do(req) - if err != nil { - return nil, "", fmt.Errorf("error sending request: %v", err) - } - defer resp.Body.Close() - - conn, err := e.upgrader.NewConnection(resp) - if err != nil { - return nil, "", err - } - - return conn, resp.Header.Get(httpstream.HeaderProtocolVersion), nil +// SPDYUpgrader validates a response from the server after a SPDY upgrade. +type SPDYUpgrader interface { + // NewConnection validates the response and creates a new Connection. + NewConnection(resp *http.Response) (httpstream.Connection, error) } type streamCreator interface { @@ -149,10 +65,105 @@ type streamProtocolHandler interface { stream(conn streamCreator) error } +// streamExecutor handles transporting standard shell streams over an httpstream connection. +type streamExecutor struct { + upgrader SPDYUpgrader + transport http.RoundTripper + + method string + url *url.URL +} + +// NewSPDYExecutor connects to the provided server and upgrades the connection to +// multiplexed bidirectional streams. +func NewSPDYExecutor(config *restclient.Config, method string, url *url.URL) (Executor, error) { + wrapper, upgradeRoundTripper, err := SPDYRoundTripperFor(config) + if err != nil { + return nil, err + } + wrapper = transport.DebugWrappers(wrapper) + return &streamExecutor{ + upgrader: upgradeRoundTripper, + transport: wrapper, + method: method, + url: url, + }, nil +} + +type spdyDialer struct { + client *http.Client + upgrader SPDYUpgrader + method string + url *url.URL +} + +func NewSPDYDialer(upgrader SPDYUpgrader, client *http.Client, method string, url *url.URL) (httpstream.Dialer, error) { + return &spdyDialer{ + client: client, + upgrader: upgrader, + method: method, + url: url, + }, nil +} + +func (d *spdyDialer) Dial(protocols ...string) (httpstream.Connection, string, error) { + req, err := http.NewRequest(d.method, d.url.String(), nil) + if err != nil { + return nil, "", fmt.Errorf("error creating request: %v", err) + } + return NegotiateSPDYConnection(d.upgrader, d.client, req, protocols...) +} + +// SPDYRoundTripperFor returns a round tripper to use with SPDY. +func SPDYRoundTripperFor(config *restclient.Config) (http.RoundTripper, SPDYUpgrader, error) { + tlsConfig, err := restclient.TLSConfigFor(config) + if err != nil { + return nil, nil, err + } + upgradeRoundTripper := spdy.NewRoundTripper(tlsConfig, true) + wrapper, err := restclient.HTTPWrappersForConfig(config, upgradeRoundTripper) + if err != nil { + return nil, nil, err + } + return wrapper, upgradeRoundTripper, nil +} + +// NegotiateSPDYConnection opens a connection to a remote server and attempts to negotiate +// a SPDY connection. Upon success, it returns the connection and the protocol selected by +// the server. The client transport must use the upgradeRoundTripper - see SPDYRoundTripperFor. +func NegotiateSPDYConnection(upgrader SPDYUpgrader, client *http.Client, req *http.Request, protocols ...string) (httpstream.Connection, string, error) { + for i := range protocols { + req.Header.Add(httpstream.HeaderProtocolVersion, protocols[i]) + } + resp, err := client.Do(req) + if err != nil { + return nil, "", fmt.Errorf("error sending request: %v", err) + } + defer resp.Body.Close() + conn, err := upgrader.NewConnection(resp) + if err != nil { + return nil, "", err + } + return conn, resp.Header.Get(httpstream.HeaderProtocolVersion), nil +} + // Stream opens a protocol streamer to the server and streams until a client closes // the connection or the server disconnects. func (e *streamExecutor) Stream(options StreamOptions) error { - conn, protocol, err := e.Dial(options.SupportedProtocols...) + req, err := http.NewRequest(e.method, e.url.String(), nil) + if err != nil { + return fmt.Errorf("error creating request: %v", err) + } + + conn, protocol, err := NegotiateSPDYConnection( + e.upgrader, + &http.Client{Transport: e.transport}, + req, + remotecommand.StreamProtocolV4Name, + remotecommand.StreamProtocolV3Name, + remotecommand.StreamProtocolV2Name, + remotecommand.StreamProtocolV1Name, + ) if err != nil { return err } diff --git a/test/e2e/framework/BUILD b/test/e2e/framework/BUILD index e1b7222a209..a39dbe20254 100644 --- a/test/e2e/framework/BUILD +++ b/test/e2e/framework/BUILD @@ -123,7 +123,6 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/rand:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/remotecommand:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library", diff --git a/test/e2e/framework/exec_util.go b/test/e2e/framework/exec_util.go index 686f7cf718e..57779f4d466 100644 --- a/test/e2e/framework/exec_util.go +++ b/test/e2e/framework/exec_util.go @@ -24,7 +24,6 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - remocommandconsts "k8s.io/apimachinery/pkg/util/remotecommand" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/remotecommand" "k8s.io/kubernetes/pkg/api" @@ -135,15 +134,14 @@ func (f *Framework) ExecShellInPodWithFullOutput(podName string, cmd string) (st } func execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { - exec, err := remotecommand.NewExecutor(config, method, url) + exec, err := remotecommand.NewSPDYExecutor(config, method, url) if err != nil { return err } return exec.Stream(remotecommand.StreamOptions{ - SupportedProtocols: remocommandconsts.SupportedStreamingProtocols, - Stdin: stdin, - Stdout: stdout, - Stderr: stderr, - Tty: tty, + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + Tty: tty, }) } From cf026a3314fd3513a2c4239c6f4edbbbbcb96a77 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Fri, 7 Jul 2017 18:22:39 -0400 Subject: [PATCH 2/2] Move SPDY specific code into its own package --- hack/.linted_packages | 1 + pkg/client/tests/BUILD | 1 + pkg/client/tests/portfoward_test.go | 18 ++-- pkg/client/tests/remotecommand_test.go | 50 +++------- pkg/kubectl/cmd/BUILD | 1 + pkg/kubectl/cmd/portforward.go | 9 +- pkg/kubelet/server/streaming/BUILD | 2 +- pkg/kubelet/server/streaming/server_test.go | 16 ++-- .../client-go/tools/remotecommand/BUILD | 2 +- .../tools/remotecommand/remotecommand.go | 95 +++++-------------- .../src/k8s.io/client-go/transport/spdy/BUILD | 19 ++++ .../k8s.io/client-go/transport/spdy/spdy.go | 94 ++++++++++++++++++ 12 files changed, 173 insertions(+), 135 deletions(-) create mode 100644 staging/src/k8s.io/client-go/transport/spdy/BUILD create mode 100644 staging/src/k8s.io/client-go/transport/spdy/spdy.go diff --git a/hack/.linted_packages b/hack/.linted_packages index e5d5fe82d88..32ff8e3e1ef 100644 --- a/hack/.linted_packages +++ b/hack/.linted_packages @@ -446,6 +446,7 @@ staging/src/k8s.io/client-go/rest/watch staging/src/k8s.io/client-go/tools/auth staging/src/k8s.io/client-go/tools/metrics staging/src/k8s.io/client-go/tools/remotecommand +staging/src/k8s.io/client-go/transport/spdy staging/src/k8s.io/client-go/util/cert staging/src/k8s.io/client-go/util/homedir staging/src/k8s.io/client-go/util/workqueue diff --git a/pkg/client/tests/BUILD b/pkg/client/tests/BUILD index 70e2af35c29..555019ffdee 100644 --- a/pkg/client/tests/BUILD +++ b/pkg/client/tests/BUILD @@ -40,6 +40,7 @@ go_test( "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/tools/portforward:go_default_library", "//vendor/k8s.io/client-go/tools/remotecommand:go_default_library", + "//vendor/k8s.io/client-go/transport/spdy:go_default_library", "//vendor/k8s.io/client-go/util/testing:go_default_library", ], ) diff --git a/pkg/client/tests/portfoward_test.go b/pkg/client/tests/portfoward_test.go index d6a1644f159..d6122606deb 100644 --- a/pkg/client/tests/portfoward_test.go +++ b/pkg/client/tests/portfoward_test.go @@ -33,7 +33,7 @@ import ( "k8s.io/apimachinery/pkg/types" restclient "k8s.io/client-go/rest" . "k8s.io/client-go/tools/portforward" - "k8s.io/client-go/tools/remotecommand" + "k8s.io/client-go/transport/spdy" "k8s.io/kubernetes/pkg/kubelet/server/portforward" ) @@ -131,16 +131,17 @@ func TestForwardPorts(t *testing.T) { for testName, test := range tests { server := httptest.NewServer(fakePortForwardServer(t, testName, test.serverSends, test.clientSends)) - url, _ := url.Parse(server.URL) - exec, err := remotecommand.NewSPDYExecutor(&restclient.Config{}, "POST", url) + transport, upgrader, err := spdy.RoundTripperFor(&restclient.Config{}) if err != nil { t.Fatal(err) } + url, _ := url.Parse(server.URL) + dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url) stopChan := make(chan struct{}, 1) readyChan := make(chan struct{}) - pf, err := New(exec, test.ports, stopChan, readyChan, os.Stdout, os.Stderr) + pf, err := New(dialer, test.ports, stopChan, readyChan, os.Stdout, os.Stderr) if err != nil { t.Fatalf("%s: unexpected error calling New: %v", testName, err) } @@ -201,17 +202,18 @@ func TestForwardPortsReturnsErrorWhenAllBindsFailed(t *testing.T) { server := httptest.NewServer(fakePortForwardServer(t, "allBindsFailed", nil, nil)) defer server.Close() - url, _ := url.Parse(server.URL) - exec, err := remotecommand.NewSPDYExecutor(&restclient.Config{}, "POST", url) + transport, upgrader, err := spdy.RoundTripperFor(&restclient.Config{}) if err != nil { t.Fatal(err) } + url, _ := url.Parse(server.URL) + dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url) stopChan1 := make(chan struct{}, 1) defer close(stopChan1) readyChan1 := make(chan struct{}) - pf1, err := New(exec, []string{"5555"}, stopChan1, readyChan1, os.Stdout, os.Stderr) + pf1, err := New(dialer, []string{"5555"}, stopChan1, readyChan1, os.Stdout, os.Stderr) if err != nil { t.Fatalf("error creating pf1: %v", err) } @@ -220,7 +222,7 @@ func TestForwardPortsReturnsErrorWhenAllBindsFailed(t *testing.T) { stopChan2 := make(chan struct{}, 1) readyChan2 := make(chan struct{}) - pf2, err := New(exec, []string{"5555"}, stopChan2, readyChan2, os.Stdout, os.Stderr) + pf2, err := New(dialer, []string{"5555"}, stopChan2, readyChan2, os.Stdout, os.Stderr) if err != nil { t.Fatalf("error creating pf2: %v", err) } diff --git a/pkg/client/tests/remotecommand_test.go b/pkg/client/tests/remotecommand_test.go index 49d4258fbd3..7c2a0e816fc 100644 --- a/pkg/client/tests/remotecommand_test.go +++ b/pkg/client/tests/remotecommand_test.go @@ -37,6 +37,7 @@ import ( remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand" restclient "k8s.io/client-go/rest" remoteclient "k8s.io/client-go/tools/remotecommand" + "k8s.io/client-go/transport/spdy" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" @@ -255,17 +256,16 @@ func TestStream(t *testing.T) { conf := &restclient.Config{ Host: server.URL, } - e, err := remoteclient.NewSPDYExecutor(conf, "POST", req.URL()) + e, err := remoteclient.NewSPDYExecutorForProtocols(conf, "POST", req.URL(), testCase.ClientProtocols...) if err != nil { t.Errorf("%s: unexpected error: %v", name, err) continue } err = e.Stream(remoteclient.StreamOptions{ - SupportedProtocols: testCase.ClientProtocols, - Stdin: streamIn, - Stdout: streamOut, - Stderr: streamErr, - Tty: testCase.Tty, + Stdin: streamIn, + Stdout: streamOut, + Stderr: streamErr, + Tty: testCase.Tty, }) hasErr := err != nil @@ -311,11 +311,13 @@ type fakeUpgrader struct { conn httpstream.Connection err, connErr error checkResponse bool + called bool t *testing.T } func (u *fakeUpgrader) RoundTrip(req *http.Request) (*http.Response, error) { + u.called = true u.req = req return u.resp, u.err } @@ -344,44 +346,16 @@ func TestDial(t *testing.T) { Body: ioutil.NopCloser(&bytes.Buffer{}), }, } - var called bool - testFn := func(rt http.RoundTripper) http.RoundTripper { - if rt != upgrader { - t.Fatalf("unexpected round tripper: %#v", rt) - } - called = true - return rt - } - exec, err := newStreamExecutor(upgrader, testFn, "POST", &url.URL{Host: "something.com", Scheme: "https"}) - if err != nil { - t.Fatal(err) - } - conn, protocol, err := exec.Dial("protocol1") + dialer := spdy.NewDialer(upgrader, &http.Client{Transport: upgrader}, "POST", &url.URL{Host: "something.com", Scheme: "https"}) + conn, protocol, err := dialer.Dial("protocol1") if err != nil { t.Fatal(err) } if conn != upgrader.conn { t.Errorf("unexpected connection: %#v", conn) } - if !called { - t.Errorf("wrapper not called") + if !upgrader.called { + t.Errorf("request not called") } _ = protocol } - -// newStreamExecutor upgrades the request so that it supports multiplexed bidirectional -// streams. This method takes a stream upgrader and an optional function that is invoked -// to wrap the round tripper. This method may be used by clients that are lower level than -// Kubernetes clients or need to provide their own upgrade round tripper. -func newStreamExecutor(upgrader httpstream.UpgradeRoundTripper, fn func(http.RoundTripper) http.RoundTripper, method string, url *url.URL) (StreamExecutor, error) { - rt := http.RoundTripper(upgrader) - if fn != nil { - rt = fn(rt) - } - return &streamExecutor{ - upgrader: upgrader, - transport: rt, - method: method, - url: url, - }, nil -} diff --git a/pkg/kubectl/cmd/BUILD b/pkg/kubectl/cmd/BUILD index dd04a7aba00..1cc2c09cbbc 100644 --- a/pkg/kubectl/cmd/BUILD +++ b/pkg/kubectl/cmd/BUILD @@ -145,6 +145,7 @@ go_library( "//vendor/k8s.io/client-go/tools/clientcmd:go_default_library", "//vendor/k8s.io/client-go/tools/portforward:go_default_library", "//vendor/k8s.io/client-go/tools/remotecommand:go_default_library", + "//vendor/k8s.io/client-go/transport/spdy:go_default_library", ], ) diff --git a/pkg/kubectl/cmd/portforward.go b/pkg/kubectl/cmd/portforward.go index d53246e86c0..bcf3e694f4b 100644 --- a/pkg/kubectl/cmd/portforward.go +++ b/pkg/kubectl/cmd/portforward.go @@ -29,7 +29,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/portforward" - "k8s.io/client-go/tools/remotecommand" + "k8s.io/client-go/transport/spdy" "k8s.io/kubernetes/pkg/api" coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" "k8s.io/kubernetes/pkg/kubectl/cmd/templates" @@ -103,14 +103,11 @@ type defaultPortForwarder struct { } func (f *defaultPortForwarder) ForwardPorts(method string, url *url.URL, opts PortForwardOptions) error { - transport, upgrader, err := remotecommand.SPDYRoundTripperFor(opts.Config) - if err != nil { - return err - } - dialer, err := remotecommand.NewSPDYDialer(upgrader, &http.Client{Transport: transport}, method, url) + transport, upgrader, err := spdy.RoundTripperFor(opts.Config) if err != nil { return err } + dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, method, url) fw, err := portforward.New(dialer, opts.Ports, opts.StopChannel, opts.ReadyChannel, f.cmdOut, f.cmdErr) if err != nil { return err diff --git a/pkg/kubelet/server/streaming/BUILD b/pkg/kubelet/server/streaming/BUILD index 5fe94b594c1..c2d62d4c3a9 100644 --- a/pkg/kubelet/server/streaming/BUILD +++ b/pkg/kubelet/server/streaming/BUILD @@ -45,9 +45,9 @@ go_test( "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/remotecommand:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/tools/remotecommand:go_default_library", + "//vendor/k8s.io/client-go/transport/spdy:go_default_library", ], ) diff --git a/pkg/kubelet/server/streaming/server_test.go b/pkg/kubelet/server/streaming/server_test.go index b6527dfb0c5..f368e268f29 100644 --- a/pkg/kubelet/server/streaming/server_test.go +++ b/pkg/kubelet/server/streaming/server_test.go @@ -30,9 +30,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/remotecommand" + "k8s.io/client-go/transport/spdy" "k8s.io/kubernetes/pkg/api" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" kubeletportforward "k8s.io/kubernetes/pkg/kubelet/server/portforward" @@ -237,9 +237,10 @@ func TestServePortForward(t *testing.T) { reqURL, err := url.Parse(resp.Url) require.NoError(t, err) - exec, err := remotecommand.NewSPDYExecutor(&restclient.Config{}, "POST", reqURL) + transport, upgrader, err := spdy.RoundTripperFor(&restclient.Config{}) require.NoError(t, err) - streamConn, _, err := exec.Dial(kubeletportforward.ProtocolV1Name) + dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", reqURL) + streamConn, _, err := dialer.Dial(kubeletportforward.ProtocolV1Name) require.NoError(t, err) defer streamConn.Close() @@ -301,11 +302,10 @@ func runRemoteCommandTest(t *testing.T, commandType string) { require.NoError(t, err) opts := remotecommand.StreamOptions{ - SupportedProtocols: remotecommandconsts.SupportedStreamingProtocols, - Stdin: stdinR, - Stdout: stdoutW, - Stderr: stderrW, - Tty: false, + Stdin: stdinR, + Stdout: stdoutW, + Stderr: stderrW, + Tty: false, } require.NoError(t, exec.Stream(opts)) }() diff --git a/staging/src/k8s.io/client-go/tools/remotecommand/BUILD b/staging/src/k8s.io/client-go/tools/remotecommand/BUILD index 66fc8010402..1435e31e279 100644 --- a/staging/src/k8s.io/client-go/tools/remotecommand/BUILD +++ b/staging/src/k8s.io/client-go/tools/remotecommand/BUILD @@ -41,11 +41,11 @@ go_library( "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/httpstream:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/remotecommand:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/transport:go_default_library", + "//vendor/k8s.io/client-go/transport/spdy:go_default_library", "//vendor/k8s.io/client-go/util/exec:go_default_library", ], ) diff --git a/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand.go b/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand.go index b3d6ad0af74..bcbe9fcd4f0 100644 --- a/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand.go +++ b/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand.go @@ -25,10 +25,10 @@ import ( "github.com/golang/glog" "k8s.io/apimachinery/pkg/util/httpstream" - "k8s.io/apimachinery/pkg/util/httpstream/spdy" "k8s.io/apimachinery/pkg/util/remotecommand" restclient "k8s.io/client-go/rest" "k8s.io/client-go/transport" + spdy "k8s.io/client-go/transport/spdy" ) // StreamOptions holds information pertaining to the current streaming session: supported stream @@ -51,12 +51,6 @@ type Executor interface { Stream(options StreamOptions) error } -// SPDYUpgrader validates a response from the server after a SPDY upgrade. -type SPDYUpgrader interface { - // NewConnection validates the response and creates a new Connection. - NewConnection(resp *http.Response) (httpstream.Connection, error) -} - type streamCreator interface { CreateStream(headers http.Header) (httpstream.Stream, error) } @@ -67,17 +61,31 @@ type streamProtocolHandler interface { // streamExecutor handles transporting standard shell streams over an httpstream connection. type streamExecutor struct { - upgrader SPDYUpgrader + upgrader spdy.Upgrader transport http.RoundTripper - method string - url *url.URL + method string + url *url.URL + protocols []string } // NewSPDYExecutor connects to the provided server and upgrades the connection to // multiplexed bidirectional streams. func NewSPDYExecutor(config *restclient.Config, method string, url *url.URL) (Executor, error) { - wrapper, upgradeRoundTripper, err := SPDYRoundTripperFor(config) + return NewSPDYExecutorForProtocols( + config, method, url, + remotecommand.StreamProtocolV4Name, + remotecommand.StreamProtocolV3Name, + remotecommand.StreamProtocolV2Name, + remotecommand.StreamProtocolV1Name, + ) +} + +// NewSPDYExecutorForProtocols connects to the provided server and upgrades the connection to +// multiplexed bidirectional streams using only the provided protocols. Exposed for testing, most +// callers should use NewSPDYExecutor. +func NewSPDYExecutorForProtocols(config *restclient.Config, method string, url *url.URL, protocols ...string) (Executor, error) { + wrapper, upgradeRoundTripper, err := spdy.RoundTripperFor(config) if err != nil { return nil, err } @@ -87,66 +95,10 @@ func NewSPDYExecutor(config *restclient.Config, method string, url *url.URL) (Ex transport: wrapper, method: method, url: url, + protocols: protocols, }, nil } -type spdyDialer struct { - client *http.Client - upgrader SPDYUpgrader - method string - url *url.URL -} - -func NewSPDYDialer(upgrader SPDYUpgrader, client *http.Client, method string, url *url.URL) (httpstream.Dialer, error) { - return &spdyDialer{ - client: client, - upgrader: upgrader, - method: method, - url: url, - }, nil -} - -func (d *spdyDialer) Dial(protocols ...string) (httpstream.Connection, string, error) { - req, err := http.NewRequest(d.method, d.url.String(), nil) - if err != nil { - return nil, "", fmt.Errorf("error creating request: %v", err) - } - return NegotiateSPDYConnection(d.upgrader, d.client, req, protocols...) -} - -// SPDYRoundTripperFor returns a round tripper to use with SPDY. -func SPDYRoundTripperFor(config *restclient.Config) (http.RoundTripper, SPDYUpgrader, error) { - tlsConfig, err := restclient.TLSConfigFor(config) - if err != nil { - return nil, nil, err - } - upgradeRoundTripper := spdy.NewRoundTripper(tlsConfig, true) - wrapper, err := restclient.HTTPWrappersForConfig(config, upgradeRoundTripper) - if err != nil { - return nil, nil, err - } - return wrapper, upgradeRoundTripper, nil -} - -// NegotiateSPDYConnection opens a connection to a remote server and attempts to negotiate -// a SPDY connection. Upon success, it returns the connection and the protocol selected by -// the server. The client transport must use the upgradeRoundTripper - see SPDYRoundTripperFor. -func NegotiateSPDYConnection(upgrader SPDYUpgrader, client *http.Client, req *http.Request, protocols ...string) (httpstream.Connection, string, error) { - for i := range protocols { - req.Header.Add(httpstream.HeaderProtocolVersion, protocols[i]) - } - resp, err := client.Do(req) - if err != nil { - return nil, "", fmt.Errorf("error sending request: %v", err) - } - defer resp.Body.Close() - conn, err := upgrader.NewConnection(resp) - if err != nil { - return nil, "", err - } - return conn, resp.Header.Get(httpstream.HeaderProtocolVersion), nil -} - // Stream opens a protocol streamer to the server and streams until a client closes // the connection or the server disconnects. func (e *streamExecutor) Stream(options StreamOptions) error { @@ -155,14 +107,11 @@ func (e *streamExecutor) Stream(options StreamOptions) error { return fmt.Errorf("error creating request: %v", err) } - conn, protocol, err := NegotiateSPDYConnection( + conn, protocol, err := spdy.Negotiate( e.upgrader, &http.Client{Transport: e.transport}, req, - remotecommand.StreamProtocolV4Name, - remotecommand.StreamProtocolV3Name, - remotecommand.StreamProtocolV2Name, - remotecommand.StreamProtocolV1Name, + e.protocols..., ) if err != nil { return err diff --git a/staging/src/k8s.io/client-go/transport/spdy/BUILD b/staging/src/k8s.io/client-go/transport/spdy/BUILD new file mode 100644 index 00000000000..184f58fbe20 --- /dev/null +++ b/staging/src/k8s.io/client-go/transport/spdy/BUILD @@ -0,0 +1,19 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = ["spdy.go"], + tags = ["automanaged"], + deps = [ + "//vendor/k8s.io/apimachinery/pkg/util/httpstream:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy:go_default_library", + "//vendor/k8s.io/client-go/rest:go_default_library", + ], +) diff --git a/staging/src/k8s.io/client-go/transport/spdy/spdy.go b/staging/src/k8s.io/client-go/transport/spdy/spdy.go new file mode 100644 index 00000000000..e0eb468ba36 --- /dev/null +++ b/staging/src/k8s.io/client-go/transport/spdy/spdy.go @@ -0,0 +1,94 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package spdy + +import ( + "fmt" + "net/http" + "net/url" + + "k8s.io/apimachinery/pkg/util/httpstream" + "k8s.io/apimachinery/pkg/util/httpstream/spdy" + restclient "k8s.io/client-go/rest" +) + +// Upgrader validates a response from the server after a SPDY upgrade. +type Upgrader interface { + // NewConnection validates the response and creates a new Connection. + NewConnection(resp *http.Response) (httpstream.Connection, error) +} + +// RoundTripperFor returns a round tripper and upgrader to use with SPDY. +func RoundTripperFor(config *restclient.Config) (http.RoundTripper, Upgrader, error) { + tlsConfig, err := restclient.TLSConfigFor(config) + if err != nil { + return nil, nil, err + } + upgradeRoundTripper := spdy.NewRoundTripper(tlsConfig, true) + wrapper, err := restclient.HTTPWrappersForConfig(config, upgradeRoundTripper) + if err != nil { + return nil, nil, err + } + return wrapper, upgradeRoundTripper, nil +} + +// dialer implements the httpstream.Dialer interface. +type dialer struct { + client *http.Client + upgrader Upgrader + method string + url *url.URL +} + +var _ httpstream.Dialer = &dialer{} + +// NewDialer will create a dialer that connects to the provided URL and upgrades the connection to SPDY. +func NewDialer(upgrader Upgrader, client *http.Client, method string, url *url.URL) httpstream.Dialer { + return &dialer{ + client: client, + upgrader: upgrader, + method: method, + url: url, + } +} + +func (d *dialer) Dial(protocols ...string) (httpstream.Connection, string, error) { + req, err := http.NewRequest(d.method, d.url.String(), nil) + if err != nil { + return nil, "", fmt.Errorf("error creating request: %v", err) + } + return Negotiate(d.upgrader, d.client, req, protocols...) +} + +// Negotiate opens a connection to a remote server and attempts to negotiate +// a SPDY connection. Upon success, it returns the connection and the protocol selected by +// the server. The client transport must use the upgradeRoundTripper - see RoundTripperFor. +func Negotiate(upgrader Upgrader, client *http.Client, req *http.Request, protocols ...string) (httpstream.Connection, string, error) { + for i := range protocols { + req.Header.Add(httpstream.HeaderProtocolVersion, protocols[i]) + } + resp, err := client.Do(req) + if err != nil { + return nil, "", fmt.Errorf("error sending request: %v", err) + } + defer resp.Body.Close() + conn, err := upgrader.NewConnection(resp) + if err != nil { + return nil, "", err + } + return conn, resp.Header.Get(httpstream.HeaderProtocolVersion), nil +}