diff --git a/hack/.linted_packages b/hack/.linted_packages index e5d5fe82d88..32ff8e3e1ef 100644 --- a/hack/.linted_packages +++ b/hack/.linted_packages @@ -446,6 +446,7 @@ staging/src/k8s.io/client-go/rest/watch staging/src/k8s.io/client-go/tools/auth staging/src/k8s.io/client-go/tools/metrics staging/src/k8s.io/client-go/tools/remotecommand +staging/src/k8s.io/client-go/transport/spdy staging/src/k8s.io/client-go/util/cert staging/src/k8s.io/client-go/util/homedir staging/src/k8s.io/client-go/util/workqueue diff --git a/pkg/client/tests/BUILD b/pkg/client/tests/BUILD index 70e2af35c29..555019ffdee 100644 --- a/pkg/client/tests/BUILD +++ b/pkg/client/tests/BUILD @@ -40,6 +40,7 @@ go_test( "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/tools/portforward:go_default_library", "//vendor/k8s.io/client-go/tools/remotecommand:go_default_library", + "//vendor/k8s.io/client-go/transport/spdy:go_default_library", "//vendor/k8s.io/client-go/util/testing:go_default_library", ], ) diff --git a/pkg/client/tests/portfoward_test.go b/pkg/client/tests/portfoward_test.go index d6a1644f159..d6122606deb 100644 --- a/pkg/client/tests/portfoward_test.go +++ b/pkg/client/tests/portfoward_test.go @@ -33,7 +33,7 @@ import ( "k8s.io/apimachinery/pkg/types" restclient "k8s.io/client-go/rest" . "k8s.io/client-go/tools/portforward" - "k8s.io/client-go/tools/remotecommand" + "k8s.io/client-go/transport/spdy" "k8s.io/kubernetes/pkg/kubelet/server/portforward" ) @@ -131,16 +131,17 @@ func TestForwardPorts(t *testing.T) { for testName, test := range tests { server := httptest.NewServer(fakePortForwardServer(t, testName, test.serverSends, test.clientSends)) - url, _ := url.Parse(server.URL) - exec, err := remotecommand.NewSPDYExecutor(&restclient.Config{}, "POST", url) + transport, upgrader, err := spdy.RoundTripperFor(&restclient.Config{}) if err != nil { t.Fatal(err) } + url, _ := url.Parse(server.URL) + dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url) stopChan := make(chan struct{}, 1) readyChan := make(chan struct{}) - pf, err := New(exec, test.ports, stopChan, readyChan, os.Stdout, os.Stderr) + pf, err := New(dialer, test.ports, stopChan, readyChan, os.Stdout, os.Stderr) if err != nil { t.Fatalf("%s: unexpected error calling New: %v", testName, err) } @@ -201,17 +202,18 @@ func TestForwardPortsReturnsErrorWhenAllBindsFailed(t *testing.T) { server := httptest.NewServer(fakePortForwardServer(t, "allBindsFailed", nil, nil)) defer server.Close() - url, _ := url.Parse(server.URL) - exec, err := remotecommand.NewSPDYExecutor(&restclient.Config{}, "POST", url) + transport, upgrader, err := spdy.RoundTripperFor(&restclient.Config{}) if err != nil { t.Fatal(err) } + url, _ := url.Parse(server.URL) + dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url) stopChan1 := make(chan struct{}, 1) defer close(stopChan1) readyChan1 := make(chan struct{}) - pf1, err := New(exec, []string{"5555"}, stopChan1, readyChan1, os.Stdout, os.Stderr) + pf1, err := New(dialer, []string{"5555"}, stopChan1, readyChan1, os.Stdout, os.Stderr) if err != nil { t.Fatalf("error creating pf1: %v", err) } @@ -220,7 +222,7 @@ func TestForwardPortsReturnsErrorWhenAllBindsFailed(t *testing.T) { stopChan2 := make(chan struct{}, 1) readyChan2 := make(chan struct{}) - pf2, err := New(exec, []string{"5555"}, stopChan2, readyChan2, os.Stdout, os.Stderr) + pf2, err := New(dialer, []string{"5555"}, stopChan2, readyChan2, os.Stdout, os.Stderr) if err != nil { t.Fatalf("error creating pf2: %v", err) } diff --git a/pkg/client/tests/remotecommand_test.go b/pkg/client/tests/remotecommand_test.go index 49d4258fbd3..7c2a0e816fc 100644 --- a/pkg/client/tests/remotecommand_test.go +++ b/pkg/client/tests/remotecommand_test.go @@ -37,6 +37,7 @@ import ( remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand" restclient "k8s.io/client-go/rest" remoteclient "k8s.io/client-go/tools/remotecommand" + "k8s.io/client-go/transport/spdy" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" @@ -255,17 +256,16 @@ func TestStream(t *testing.T) { conf := &restclient.Config{ Host: server.URL, } - e, err := remoteclient.NewSPDYExecutor(conf, "POST", req.URL()) + e, err := remoteclient.NewSPDYExecutorForProtocols(conf, "POST", req.URL(), testCase.ClientProtocols...) if err != nil { t.Errorf("%s: unexpected error: %v", name, err) continue } err = e.Stream(remoteclient.StreamOptions{ - SupportedProtocols: testCase.ClientProtocols, - Stdin: streamIn, - Stdout: streamOut, - Stderr: streamErr, - Tty: testCase.Tty, + Stdin: streamIn, + Stdout: streamOut, + Stderr: streamErr, + Tty: testCase.Tty, }) hasErr := err != nil @@ -311,11 +311,13 @@ type fakeUpgrader struct { conn httpstream.Connection err, connErr error checkResponse bool + called bool t *testing.T } func (u *fakeUpgrader) RoundTrip(req *http.Request) (*http.Response, error) { + u.called = true u.req = req return u.resp, u.err } @@ -344,44 +346,16 @@ func TestDial(t *testing.T) { Body: ioutil.NopCloser(&bytes.Buffer{}), }, } - var called bool - testFn := func(rt http.RoundTripper) http.RoundTripper { - if rt != upgrader { - t.Fatalf("unexpected round tripper: %#v", rt) - } - called = true - return rt - } - exec, err := newStreamExecutor(upgrader, testFn, "POST", &url.URL{Host: "something.com", Scheme: "https"}) - if err != nil { - t.Fatal(err) - } - conn, protocol, err := exec.Dial("protocol1") + dialer := spdy.NewDialer(upgrader, &http.Client{Transport: upgrader}, "POST", &url.URL{Host: "something.com", Scheme: "https"}) + conn, protocol, err := dialer.Dial("protocol1") if err != nil { t.Fatal(err) } if conn != upgrader.conn { t.Errorf("unexpected connection: %#v", conn) } - if !called { - t.Errorf("wrapper not called") + if !upgrader.called { + t.Errorf("request not called") } _ = protocol } - -// newStreamExecutor upgrades the request so that it supports multiplexed bidirectional -// streams. This method takes a stream upgrader and an optional function that is invoked -// to wrap the round tripper. This method may be used by clients that are lower level than -// Kubernetes clients or need to provide their own upgrade round tripper. -func newStreamExecutor(upgrader httpstream.UpgradeRoundTripper, fn func(http.RoundTripper) http.RoundTripper, method string, url *url.URL) (StreamExecutor, error) { - rt := http.RoundTripper(upgrader) - if fn != nil { - rt = fn(rt) - } - return &streamExecutor{ - upgrader: upgrader, - transport: rt, - method: method, - url: url, - }, nil -} diff --git a/pkg/kubectl/cmd/BUILD b/pkg/kubectl/cmd/BUILD index dd04a7aba00..1cc2c09cbbc 100644 --- a/pkg/kubectl/cmd/BUILD +++ b/pkg/kubectl/cmd/BUILD @@ -145,6 +145,7 @@ go_library( "//vendor/k8s.io/client-go/tools/clientcmd:go_default_library", "//vendor/k8s.io/client-go/tools/portforward:go_default_library", "//vendor/k8s.io/client-go/tools/remotecommand:go_default_library", + "//vendor/k8s.io/client-go/transport/spdy:go_default_library", ], ) diff --git a/pkg/kubectl/cmd/portforward.go b/pkg/kubectl/cmd/portforward.go index d53246e86c0..bcf3e694f4b 100644 --- a/pkg/kubectl/cmd/portforward.go +++ b/pkg/kubectl/cmd/portforward.go @@ -29,7 +29,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/portforward" - "k8s.io/client-go/tools/remotecommand" + "k8s.io/client-go/transport/spdy" "k8s.io/kubernetes/pkg/api" coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" "k8s.io/kubernetes/pkg/kubectl/cmd/templates" @@ -103,14 +103,11 @@ type defaultPortForwarder struct { } func (f *defaultPortForwarder) ForwardPorts(method string, url *url.URL, opts PortForwardOptions) error { - transport, upgrader, err := remotecommand.SPDYRoundTripperFor(opts.Config) - if err != nil { - return err - } - dialer, err := remotecommand.NewSPDYDialer(upgrader, &http.Client{Transport: transport}, method, url) + transport, upgrader, err := spdy.RoundTripperFor(opts.Config) if err != nil { return err } + dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, method, url) fw, err := portforward.New(dialer, opts.Ports, opts.StopChannel, opts.ReadyChannel, f.cmdOut, f.cmdErr) if err != nil { return err diff --git a/pkg/kubelet/server/streaming/BUILD b/pkg/kubelet/server/streaming/BUILD index 5fe94b594c1..c2d62d4c3a9 100644 --- a/pkg/kubelet/server/streaming/BUILD +++ b/pkg/kubelet/server/streaming/BUILD @@ -45,9 +45,9 @@ go_test( "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/remotecommand:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/tools/remotecommand:go_default_library", + "//vendor/k8s.io/client-go/transport/spdy:go_default_library", ], ) diff --git a/pkg/kubelet/server/streaming/server_test.go b/pkg/kubelet/server/streaming/server_test.go index b6527dfb0c5..f368e268f29 100644 --- a/pkg/kubelet/server/streaming/server_test.go +++ b/pkg/kubelet/server/streaming/server_test.go @@ -30,9 +30,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/remotecommand" + "k8s.io/client-go/transport/spdy" "k8s.io/kubernetes/pkg/api" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" kubeletportforward "k8s.io/kubernetes/pkg/kubelet/server/portforward" @@ -237,9 +237,10 @@ func TestServePortForward(t *testing.T) { reqURL, err := url.Parse(resp.Url) require.NoError(t, err) - exec, err := remotecommand.NewSPDYExecutor(&restclient.Config{}, "POST", reqURL) + transport, upgrader, err := spdy.RoundTripperFor(&restclient.Config{}) require.NoError(t, err) - streamConn, _, err := exec.Dial(kubeletportforward.ProtocolV1Name) + dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", reqURL) + streamConn, _, err := dialer.Dial(kubeletportforward.ProtocolV1Name) require.NoError(t, err) defer streamConn.Close() @@ -301,11 +302,10 @@ func runRemoteCommandTest(t *testing.T, commandType string) { require.NoError(t, err) opts := remotecommand.StreamOptions{ - SupportedProtocols: remotecommandconsts.SupportedStreamingProtocols, - Stdin: stdinR, - Stdout: stdoutW, - Stderr: stderrW, - Tty: false, + Stdin: stdinR, + Stdout: stdoutW, + Stderr: stderrW, + Tty: false, } require.NoError(t, exec.Stream(opts)) }() diff --git a/staging/src/k8s.io/client-go/tools/remotecommand/BUILD b/staging/src/k8s.io/client-go/tools/remotecommand/BUILD index 66fc8010402..1435e31e279 100644 --- a/staging/src/k8s.io/client-go/tools/remotecommand/BUILD +++ b/staging/src/k8s.io/client-go/tools/remotecommand/BUILD @@ -41,11 +41,11 @@ go_library( "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/httpstream:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/remotecommand:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/transport:go_default_library", + "//vendor/k8s.io/client-go/transport/spdy:go_default_library", "//vendor/k8s.io/client-go/util/exec:go_default_library", ], ) diff --git a/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand.go b/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand.go index b3d6ad0af74..bcbe9fcd4f0 100644 --- a/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand.go +++ b/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand.go @@ -25,10 +25,10 @@ import ( "github.com/golang/glog" "k8s.io/apimachinery/pkg/util/httpstream" - "k8s.io/apimachinery/pkg/util/httpstream/spdy" "k8s.io/apimachinery/pkg/util/remotecommand" restclient "k8s.io/client-go/rest" "k8s.io/client-go/transport" + spdy "k8s.io/client-go/transport/spdy" ) // StreamOptions holds information pertaining to the current streaming session: supported stream @@ -51,12 +51,6 @@ type Executor interface { Stream(options StreamOptions) error } -// SPDYUpgrader validates a response from the server after a SPDY upgrade. -type SPDYUpgrader interface { - // NewConnection validates the response and creates a new Connection. - NewConnection(resp *http.Response) (httpstream.Connection, error) -} - type streamCreator interface { CreateStream(headers http.Header) (httpstream.Stream, error) } @@ -67,17 +61,31 @@ type streamProtocolHandler interface { // streamExecutor handles transporting standard shell streams over an httpstream connection. type streamExecutor struct { - upgrader SPDYUpgrader + upgrader spdy.Upgrader transport http.RoundTripper - method string - url *url.URL + method string + url *url.URL + protocols []string } // NewSPDYExecutor connects to the provided server and upgrades the connection to // multiplexed bidirectional streams. func NewSPDYExecutor(config *restclient.Config, method string, url *url.URL) (Executor, error) { - wrapper, upgradeRoundTripper, err := SPDYRoundTripperFor(config) + return NewSPDYExecutorForProtocols( + config, method, url, + remotecommand.StreamProtocolV4Name, + remotecommand.StreamProtocolV3Name, + remotecommand.StreamProtocolV2Name, + remotecommand.StreamProtocolV1Name, + ) +} + +// NewSPDYExecutorForProtocols connects to the provided server and upgrades the connection to +// multiplexed bidirectional streams using only the provided protocols. Exposed for testing, most +// callers should use NewSPDYExecutor. +func NewSPDYExecutorForProtocols(config *restclient.Config, method string, url *url.URL, protocols ...string) (Executor, error) { + wrapper, upgradeRoundTripper, err := spdy.RoundTripperFor(config) if err != nil { return nil, err } @@ -87,66 +95,10 @@ func NewSPDYExecutor(config *restclient.Config, method string, url *url.URL) (Ex transport: wrapper, method: method, url: url, + protocols: protocols, }, nil } -type spdyDialer struct { - client *http.Client - upgrader SPDYUpgrader - method string - url *url.URL -} - -func NewSPDYDialer(upgrader SPDYUpgrader, client *http.Client, method string, url *url.URL) (httpstream.Dialer, error) { - return &spdyDialer{ - client: client, - upgrader: upgrader, - method: method, - url: url, - }, nil -} - -func (d *spdyDialer) Dial(protocols ...string) (httpstream.Connection, string, error) { - req, err := http.NewRequest(d.method, d.url.String(), nil) - if err != nil { - return nil, "", fmt.Errorf("error creating request: %v", err) - } - return NegotiateSPDYConnection(d.upgrader, d.client, req, protocols...) -} - -// SPDYRoundTripperFor returns a round tripper to use with SPDY. -func SPDYRoundTripperFor(config *restclient.Config) (http.RoundTripper, SPDYUpgrader, error) { - tlsConfig, err := restclient.TLSConfigFor(config) - if err != nil { - return nil, nil, err - } - upgradeRoundTripper := spdy.NewRoundTripper(tlsConfig, true) - wrapper, err := restclient.HTTPWrappersForConfig(config, upgradeRoundTripper) - if err != nil { - return nil, nil, err - } - return wrapper, upgradeRoundTripper, nil -} - -// NegotiateSPDYConnection opens a connection to a remote server and attempts to negotiate -// a SPDY connection. Upon success, it returns the connection and the protocol selected by -// the server. The client transport must use the upgradeRoundTripper - see SPDYRoundTripperFor. -func NegotiateSPDYConnection(upgrader SPDYUpgrader, client *http.Client, req *http.Request, protocols ...string) (httpstream.Connection, string, error) { - for i := range protocols { - req.Header.Add(httpstream.HeaderProtocolVersion, protocols[i]) - } - resp, err := client.Do(req) - if err != nil { - return nil, "", fmt.Errorf("error sending request: %v", err) - } - defer resp.Body.Close() - conn, err := upgrader.NewConnection(resp) - if err != nil { - return nil, "", err - } - return conn, resp.Header.Get(httpstream.HeaderProtocolVersion), nil -} - // Stream opens a protocol streamer to the server and streams until a client closes // the connection or the server disconnects. func (e *streamExecutor) Stream(options StreamOptions) error { @@ -155,14 +107,11 @@ func (e *streamExecutor) Stream(options StreamOptions) error { return fmt.Errorf("error creating request: %v", err) } - conn, protocol, err := NegotiateSPDYConnection( + conn, protocol, err := spdy.Negotiate( e.upgrader, &http.Client{Transport: e.transport}, req, - remotecommand.StreamProtocolV4Name, - remotecommand.StreamProtocolV3Name, - remotecommand.StreamProtocolV2Name, - remotecommand.StreamProtocolV1Name, + e.protocols..., ) if err != nil { return err diff --git a/staging/src/k8s.io/client-go/transport/spdy/BUILD b/staging/src/k8s.io/client-go/transport/spdy/BUILD new file mode 100644 index 00000000000..184f58fbe20 --- /dev/null +++ b/staging/src/k8s.io/client-go/transport/spdy/BUILD @@ -0,0 +1,19 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = ["spdy.go"], + tags = ["automanaged"], + deps = [ + "//vendor/k8s.io/apimachinery/pkg/util/httpstream:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy:go_default_library", + "//vendor/k8s.io/client-go/rest:go_default_library", + ], +) diff --git a/staging/src/k8s.io/client-go/transport/spdy/spdy.go b/staging/src/k8s.io/client-go/transport/spdy/spdy.go new file mode 100644 index 00000000000..e0eb468ba36 --- /dev/null +++ b/staging/src/k8s.io/client-go/transport/spdy/spdy.go @@ -0,0 +1,94 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package spdy + +import ( + "fmt" + "net/http" + "net/url" + + "k8s.io/apimachinery/pkg/util/httpstream" + "k8s.io/apimachinery/pkg/util/httpstream/spdy" + restclient "k8s.io/client-go/rest" +) + +// Upgrader validates a response from the server after a SPDY upgrade. +type Upgrader interface { + // NewConnection validates the response and creates a new Connection. + NewConnection(resp *http.Response) (httpstream.Connection, error) +} + +// RoundTripperFor returns a round tripper and upgrader to use with SPDY. +func RoundTripperFor(config *restclient.Config) (http.RoundTripper, Upgrader, error) { + tlsConfig, err := restclient.TLSConfigFor(config) + if err != nil { + return nil, nil, err + } + upgradeRoundTripper := spdy.NewRoundTripper(tlsConfig, true) + wrapper, err := restclient.HTTPWrappersForConfig(config, upgradeRoundTripper) + if err != nil { + return nil, nil, err + } + return wrapper, upgradeRoundTripper, nil +} + +// dialer implements the httpstream.Dialer interface. +type dialer struct { + client *http.Client + upgrader Upgrader + method string + url *url.URL +} + +var _ httpstream.Dialer = &dialer{} + +// NewDialer will create a dialer that connects to the provided URL and upgrades the connection to SPDY. +func NewDialer(upgrader Upgrader, client *http.Client, method string, url *url.URL) httpstream.Dialer { + return &dialer{ + client: client, + upgrader: upgrader, + method: method, + url: url, + } +} + +func (d *dialer) Dial(protocols ...string) (httpstream.Connection, string, error) { + req, err := http.NewRequest(d.method, d.url.String(), nil) + if err != nil { + return nil, "", fmt.Errorf("error creating request: %v", err) + } + return Negotiate(d.upgrader, d.client, req, protocols...) +} + +// Negotiate opens a connection to a remote server and attempts to negotiate +// a SPDY connection. Upon success, it returns the connection and the protocol selected by +// the server. The client transport must use the upgradeRoundTripper - see RoundTripperFor. +func Negotiate(upgrader Upgrader, client *http.Client, req *http.Request, protocols ...string) (httpstream.Connection, string, error) { + for i := range protocols { + req.Header.Add(httpstream.HeaderProtocolVersion, protocols[i]) + } + resp, err := client.Do(req) + if err != nil { + return nil, "", fmt.Errorf("error sending request: %v", err) + } + defer resp.Body.Close() + conn, err := upgrader.NewConnection(resp) + if err != nil { + return nil, "", err + } + return conn, resp.Header.Get(httpstream.HeaderProtocolVersion), nil +}