From 3f1b18fbba80ba871d4e1494fbc44a551c99354b Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sat, 26 Sep 2015 20:00:39 -0400 Subject: [PATCH] Refactor exec to make attach useful without a client.Config The current executor structure is too dependent on client.Request and client.Config. In order to do an attach from the server, it needs to be possible to create an Executor from crypto/tls#TLSConfig and to bypassing having a client.Request. Changes: * remotecommand.spdyExecutor - handles upgrading a request to SPDY and getting a connection * remotecommand.NewAttach / New - moved to exec / portforward / attach since they handle requests * Remove request.Upgrade() - it's too coupled to SPDY, and can live with the spdyExecutor * Add request.VersionedParams(runtime.Object, runtime.ObjectConvertor) to handle object -> query transform --- .../unversioned/portforward/portforward.go | 27 +-- .../portforward/portforward_test.go | 49 +++-- .../remotecommand/remotecommand.go | 200 ++++++++---------- .../remotecommand/remotecommand_test.go | 83 +++++++- pkg/client/unversioned/request.go | 63 +++--- pkg/client/unversioned/request_test.go | 99 ++------- pkg/kubectl/cmd/attach.go | 14 +- pkg/kubectl/cmd/attach_test.go | 17 +- pkg/kubectl/cmd/exec.go | 22 +- pkg/kubectl/cmd/exec_test.go | 17 +- pkg/kubectl/cmd/portforward.go | 14 +- pkg/kubectl/cmd/portforward_test.go | 27 ++- pkg/util/httpstream/httpstream.go | 5 + 13 files changed, 331 insertions(+), 306 deletions(-) diff --git a/pkg/client/unversioned/portforward/portforward.go b/pkg/client/unversioned/portforward/portforward.go index 925c5de4026..36916bdcf49 100644 --- a/pkg/client/unversioned/portforward/portforward.go +++ b/pkg/client/unversioned/portforward/portforward.go @@ -29,33 +29,19 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" - client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/httpstream" - "k8s.io/kubernetes/pkg/util/httpstream/spdy" ) -type upgrader interface { - upgrade(*client.Request, *client.Config) (httpstream.Connection, error) -} - -type defaultUpgrader struct{} - -func (u *defaultUpgrader) upgrade(req *client.Request, config *client.Config) (httpstream.Connection, error) { - return req.Upgrade(config, spdy.NewRoundTripper) -} - // PortForwarder knows how to listen for local connections and forward them to // a remote pod via an upgraded HTTP request. type PortForwarder struct { - req *client.Request - config *client.Config ports []ForwardedPort stopChan <-chan struct{} + dialer httpstream.Dialer streamConn httpstream.Connection listeners []io.Closer - upgrader upgrader Ready chan struct{} requestIDLock sync.Mutex requestID int @@ -120,7 +106,7 @@ func parsePorts(ports []string) ([]ForwardedPort, error) { } // New creates a new PortForwarder. -func New(req *client.Request, config *client.Config, ports []string, stopChan <-chan struct{}) (*PortForwarder, error) { +func New(dialer httpstream.Dialer, ports []string, stopChan <-chan struct{}) (*PortForwarder, error) { if len(ports) == 0 { return nil, errors.New("You must specify at least 1 port") } @@ -128,10 +114,8 @@ func New(req *client.Request, config *client.Config, ports []string, stopChan <- if err != nil { return nil, err } - return &PortForwarder{ - req: req, - config: config, + dialer: dialer, ports: parsedPorts, stopChan: stopChan, Ready: make(chan struct{}), @@ -143,11 +127,8 @@ func New(req *client.Request, config *client.Config, ports []string, stopChan <- func (pf *PortForwarder) ForwardPorts() error { defer pf.Close() - if pf.upgrader == nil { - pf.upgrader = &defaultUpgrader{} - } var err error - pf.streamConn, err = pf.upgrader.upgrade(pf.req, pf.config) + pf.streamConn, err = pf.dialer.Dial() if err != nil { return fmt.Errorf("error upgrading connection: %s", err) } diff --git a/pkg/client/unversioned/portforward/portforward_test.go b/pkg/client/unversioned/portforward/portforward_test.go index 0bd2ed88ade..f208a9e8814 100644 --- a/pkg/client/unversioned/portforward/portforward_test.go +++ b/pkg/client/unversioned/portforward/portforward_test.go @@ -31,10 +31,23 @@ import ( "time" client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/client/unversioned/remotecommand" "k8s.io/kubernetes/pkg/kubelet" "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util/httpstream" ) +type fakeDialer struct { + dialed bool + conn httpstream.Connection + err error +} + +func (d *fakeDialer) Dial() (httpstream.Connection, error) { + d.dialed = true + return d.conn, d.err +} + func TestParsePortsAndNew(t *testing.T) { tests := []struct { input []string @@ -71,10 +84,9 @@ func TestParsePortsAndNew(t *testing.T) { t.Fatalf("%d: parsePorts: error expected=%t, got %t: %s", i, e, a, err) } - expectedRequest := &client.Request{} - expectedConfig := &client.Config{} + dialer := &fakeDialer{} expectedStopChan := make(chan struct{}) - pf, err := New(expectedRequest, expectedConfig, test.input, expectedStopChan) + pf, err := New(dialer, test.input, expectedStopChan) haveError = err != nil if e, a := test.expectNewError, haveError; e != a { t.Fatalf("%d: New: error expected=%t, got %t: %s", i, e, a, err) @@ -93,11 +105,8 @@ func TestParsePortsAndNew(t *testing.T) { } } - if e, a := expectedRequest, pf.req; e != a { - t.Fatalf("%d: req: expected %#v, got %#v", i, e, a) - } - if e, a := expectedConfig, pf.config; e != a { - t.Fatalf("%d: config: expected %#v, got %#v", i, e, a) + if dialer.dialed { + t.Fatalf("%d: expected not dialed", i) } if e, a := test.expected, pf.ports; !reflect.DeepEqual(e, a) { t.Fatalf("%d: ports: expected %#v, got %#v", i, e, a) @@ -293,17 +302,16 @@ func TestForwardPorts(t *testing.T) { for testName, test := range tests { server := httptest.NewServer(fakePortForwardServer(t, testName, test.serverSends, test.clientSends)) - url, _ := url.ParseRequestURI(server.URL) - c := client.NewRESTClient(url, "x", nil, -1, -1) - req := c.Post().Resource("testing") - conf := &client.Config{ - Host: server.URL, + url, _ := url.Parse(server.URL) + exec, err := remotecommand.NewExecutor(&client.Config{}, "POST", url) + if err != nil { + t.Fatal(err) } stopChan := make(chan struct{}, 1) - pf, err := New(req, conf, test.ports, stopChan) + pf, err := New(exec, test.ports, stopChan) if err != nil { t.Fatalf("%s: unexpected error calling New: %v", testName, err) } @@ -363,18 +371,17 @@ func TestForwardPorts(t *testing.T) { func TestForwardPortsReturnsErrorWhenAllBindsFailed(t *testing.T) { server := httptest.NewServer(fakePortForwardServer(t, "allBindsFailed", nil, nil)) defer server.Close() - url, _ := url.ParseRequestURI(server.URL) - c := client.NewRESTClient(url, "x", nil, -1, -1) - req := c.Post().Resource("testing") - conf := &client.Config{ - Host: server.URL, + url, _ := url.Parse(server.URL) + exec, err := remotecommand.NewExecutor(&client.Config{}, "POST", url) + if err != nil { + t.Fatal(err) } stopChan1 := make(chan struct{}, 1) defer close(stopChan1) - pf1, err := New(req, conf, []string{"5555"}, stopChan1) + pf1, err := New(exec, []string{"5555"}, stopChan1) if err != nil { t.Fatalf("error creating pf1: %v", err) } @@ -382,7 +389,7 @@ func TestForwardPortsReturnsErrorWhenAllBindsFailed(t *testing.T) { <-pf1.Ready stopChan2 := make(chan struct{}, 1) - pf2, err := New(&client.Request{}, &client.Config{}, []string{"5555"}, stopChan2) + pf2, err := New(exec, []string{"5555"}, stopChan2) if err != nil { t.Fatalf("error creating pf2: %v", err) } diff --git a/pkg/client/unversioned/remotecommand/remotecommand.go b/pkg/client/unversioned/remotecommand/remotecommand.go index e63af79aa0a..505e3fc9c1b 100644 --- a/pkg/client/unversioned/remotecommand/remotecommand.go +++ b/pkg/client/unversioned/remotecommand/remotecommand.go @@ -21,141 +21,127 @@ import ( "io" "io/ioutil" "net/http" + "net/url" "sync" "k8s.io/kubernetes/pkg/api" client "k8s.io/kubernetes/pkg/client/unversioned" - "k8s.io/kubernetes/pkg/conversion/queryparams" - "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/httpstream" "k8s.io/kubernetes/pkg/util/httpstream/spdy" ) -type upgrader interface { - upgrade(*client.Request, *client.Config) (httpstream.Connection, error) +// Executor is an interface for transporting shell-style streams. +type Executor interface { + // Stream initiates the transport of the standard shell streams. It will transport any + // non-nil stream to a remote system, and return an error if a problem occurs. If tty + // is set, the stderr stream is not used (raw TTY manages stdout and stderr over the + // stdout stream). + Stream(stdin io.Reader, stdout, stderr io.Writer, tty bool) error } -type defaultUpgrader struct{} - -func (u *defaultUpgrader) upgrade(req *client.Request, config *client.Config) (httpstream.Connection, error) { - return req.Upgrade(config, spdy.NewRoundTripper) +// StreamExecutor supports the ability to dial an httpstream connection and the ability to +// run a command line stream protocol over that dialer. +type StreamExecutor interface { + Executor + httpstream.Dialer } -type Streamer struct { - req *client.Request - config *client.Config - stdin io.Reader - stdout io.Writer - stderr io.Writer - tty bool +// streamExecutor handles transporting standard shell streams over an httpstream connection. +type streamExecutor struct { + upgrader httpstream.UpgradeRoundTripper + transport http.RoundTripper - upgrader upgrader + method string + url *url.URL } -// Executor executes a command on a pod container -type Executor struct { - Streamer - command []string -} - -// New creates a new RemoteCommandExecutor -func New(req *client.Request, config *client.Config, command []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) *Executor { - return &Executor{ - command: command, - Streamer: Streamer{ - req: req, - config: config, - stdin: stdin, - stdout: stdout, - stderr: stderr, - tty: tty, - }, - } -} - -type Attach struct { - Streamer -} - -// NewAttach creates a new RemoteAttach -func NewAttach(req *client.Request, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) *Attach { - return &Attach{ - Streamer: Streamer{ - req: req, - config: config, - stdin: stdin, - stdout: stdout, - stderr: stderr, - tty: tty, - }, - } -} - -// Execute sends a remote command execution request, upgrading the -// connection and creating streams to represent stdin/stdout/stderr. Data is -// copied between these streams and the supplied stdin/stdout/stderr parameters. -func (e *Attach) Execute() error { - opts := api.PodAttachOptions{ - Stdin: (e.stdin != nil), - Stdout: (e.stdout != nil), - Stderr: (!e.tty && e.stderr != nil), - TTY: e.tty, - } - - if err := e.setupRequestParameters(&opts); err != nil { - return err - } - - return e.doStream() -} - -// Execute sends a remote command execution request, upgrading the -// connection and creating streams to represent stdin/stdout/stderr. Data is -// copied between these streams and the supplied stdin/stdout/stderr parameters. -func (e *Executor) Execute() error { - opts := api.PodExecOptions{ - Stdin: (e.stdin != nil), - Stdout: (e.stdout != nil), - Stderr: (!e.tty && e.stderr != nil), - TTY: e.tty, - Command: e.command, - } - - if err := e.setupRequestParameters(&opts); err != nil { - return err - } - - return e.doStream() -} - -func (e *Streamer) setupRequestParameters(obj runtime.Object) error { - versioned, err := api.Scheme.ConvertToVersion(obj, e.config.Version) +// NewExecutor connects to the provided server and upgrades the connection to +// multiplexed bidirectional streams. The current implementation uses SPDY, +// but this could be replaced with HTTP/2 once it's available, or something else. +// TODO: the common code between this and portforward could be abstracted. +func NewExecutor(config *client.Config, method string, url *url.URL) (StreamExecutor, error) { + tlsConfig, err := client.TLSConfigFor(config) if err != nil { - return err + return nil, err } - params, err := queryparams.Convert(versioned) + + upgradeRoundTripper := spdy.NewRoundTripper(tlsConfig) + wrapper, err := client.HTTPWrappersForConfig(config, upgradeRoundTripper) if err != nil { - return err + return nil, err } - for k, v := range params { - for _, vv := range v { - e.req.Param(k, vv) - } - } - return nil + + return &streamExecutor{ + upgrader: upgradeRoundTripper, + transport: wrapper, + method: method, + url: url, + }, nil } -func (e *Streamer) doStream() error { - if e.upgrader == nil { - e.upgrader = &defaultUpgrader{} +// NewStreamExecutor upgrades the request so that it supports multiplexed bidirectional +// streams. This method takes a stream upgrader and an optional function that is invoked +// to wrap the round tripper. This method may be used by clients that are lower level than +// Kubernetes clients or need to provide their own upgrade round tripper. +func NewStreamExecutor(upgrader httpstream.UpgradeRoundTripper, fn func(http.RoundTripper) http.RoundTripper, method string, url *url.URL) (StreamExecutor, error) { + var rt http.RoundTripper = upgrader + if fn != nil { + rt = fn(rt) } - conn, err := e.upgrader.upgrade(e.req, e.config) + return &streamExecutor{ + upgrader: upgrader, + transport: rt, + method: method, + url: url, + }, nil +} + +// Dial opens a connection to a remote server and attempts to negotiate a SPDY connection. +func (e *streamExecutor) Dial() (httpstream.Connection, error) { + client := &http.Client{Transport: e.transport} + + req, err := http.NewRequest(e.method, e.url.String(), nil) + if err != nil { + return nil, fmt.Errorf("error creating request: %s", err) + } + + resp, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("error sending request: %s", err) + } + defer resp.Body.Close() + + // TODO: handle protocol selection in the future + return e.upgrader.NewConnection(resp) +} + +// Stream opens a protocol streamer to the server and streams until a client closes +// the connection or the server disconnects. +func (e *streamExecutor) Stream(stdin io.Reader, stdout, stderr io.Writer, tty bool) error { + conn, err := e.Dial() if err != nil { return err } defer conn.Close() + // TODO: negotiate protocols + streamer := &streamProtocol{ + stdin: stdin, + stdout: stdout, + stderr: stderr, + tty: tty, + } + return streamer.stream(conn) +} +type streamProtocol struct { + stdin io.Reader + stdout io.Writer + stderr io.Writer + tty bool +} + +func (e *streamProtocol) stream(conn httpstream.Connection) error { headers := http.Header{} // set up error stream diff --git a/pkg/client/unversioned/remotecommand/remotecommand_test.go b/pkg/client/unversioned/remotecommand/remotecommand_test.go index 9870ee259bc..56cb3372bfb 100644 --- a/pkg/client/unversioned/remotecommand/remotecommand_test.go +++ b/pkg/client/unversioned/remotecommand/remotecommand_test.go @@ -20,6 +20,7 @@ import ( "bytes" "fmt" "io" + "io/ioutil" "net/http" "net/http/httptest" "net/url" @@ -183,12 +184,17 @@ func TestRequestExecuteRemoteCommand(t *testing.T) { url, _ := url.ParseRequestURI(server.URL) c := client.NewRESTClient(url, "x", nil, -1, -1) req := c.Post().Resource("testing") - + req.Param("command", "ls") + req.Param("command", "/") conf := &client.Config{ Host: server.URL, } - e := New(req, conf, []string{"ls", "/"}, strings.NewReader(strings.Repeat(testCase.Stdin, testCase.MessageCount)), localOut, localErr, testCase.Tty) - err := e.Execute() + e, err := NewExecutor(conf, "POST", req.URL()) + if err != nil { + t.Errorf("%d: unexpected error: %v", i, err) + continue + } + err = e.Stream(strings.NewReader(strings.Repeat(testCase.Stdin, testCase.MessageCount)), localOut, localErr, testCase.Tty) hasErr := err != nil if len(testCase.Error) > 0 { @@ -263,8 +269,12 @@ func TestRequestAttachRemoteCommand(t *testing.T) { conf := &client.Config{ Host: server.URL, } - e := NewAttach(req, conf, strings.NewReader(testCase.Stdin), localOut, localErr, testCase.Tty) - err := e.Execute() + e, err := NewExecutor(conf, "POST", req.URL()) + if err != nil { + t.Errorf("%d: unexpected error: %v", i, err) + continue + } + err = e.Stream(strings.NewReader(testCase.Stdin), localOut, localErr, testCase.Tty) hasErr := err != nil if len(testCase.Error) > 0 { @@ -301,3 +311,66 @@ func TestRequestAttachRemoteCommand(t *testing.T) { server.Close() } } + +type fakeUpgrader struct { + req *http.Request + resp *http.Response + conn httpstream.Connection + err, connErr error + checkResponse bool + + t *testing.T +} + +func (u *fakeUpgrader) RoundTrip(req *http.Request) (*http.Response, error) { + u.req = req + return u.resp, u.err +} + +func (u *fakeUpgrader) NewConnection(resp *http.Response) (httpstream.Connection, error) { + if u.checkResponse && u.resp != resp { + u.t.Errorf("response objects passed did not match: %#v", resp) + } + return u.conn, u.connErr +} + +type fakeConnection struct { + httpstream.Connection +} + +// Dial is the common functionality between any stream based upgrader, regardless of protocol. +// This method ensures that someone can use a generic stream executor without being dependent +// on the core Kube client config behavior. +func TestDial(t *testing.T) { + upgrader := &fakeUpgrader{ + t: t, + checkResponse: true, + conn: &fakeConnection{}, + resp: &http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(&bytes.Buffer{}), + }, + } + var called bool + testFn := func(rt http.RoundTripper) http.RoundTripper { + if rt != upgrader { + t.Fatalf("unexpected round tripper: %#v", rt) + } + called = true + return rt + } + exec, err := NewStreamExecutor(upgrader, testFn, "POST", &url.URL{Host: "something.com", Scheme: "https"}) + if err != nil { + t.Fatal(err) + } + conn, err := exec.Dial() + if err != nil { + t.Fatal(err) + } + if conn != upgrader.conn { + t.Errorf("unexpected connection: %#v", conn) + } + if !called { + t.Errorf("wrapper not called") + } +} diff --git a/pkg/client/unversioned/request.go b/pkg/client/unversioned/request.go index afe491dcbf4..67df03ca8ac 100644 --- a/pkg/client/unversioned/request.go +++ b/pkg/client/unversioned/request.go @@ -18,7 +18,6 @@ package unversioned import ( "bytes" - "crypto/tls" "fmt" "io" "io/ioutil" @@ -35,11 +34,11 @@ import ( "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/metrics" + "k8s.io/kubernetes/pkg/conversion/queryparams" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" - "k8s.io/kubernetes/pkg/util/httpstream" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/watch" watchjson "k8s.io/kubernetes/pkg/watch/json" @@ -406,6 +405,31 @@ func (r *Request) Param(paramName, s string) *Request { return r.setParam(paramName, s) } +// VersionedParams will take the provided object, serialize it to a map[string][]string using the +// implicit RESTClient API version and the provided object convertor, and then add those as parameters +// to the request. Use this to provide versioned query parameters from client libraries. +func (r *Request) VersionedParams(obj runtime.Object, convertor runtime.ObjectConvertor) *Request { + if r.err != nil { + return r + } + versioned, err := convertor.ConvertToVersion(obj, r.apiVersion) + if err != nil { + r.err = err + return r + } + params, err := queryparams.Convert(versioned) + if err != nil { + r.err = err + return r + } + for k, v := range params { + for _, vv := range v { + r.setParam(k, vv) + } + } + return r +} + func (r *Request) setParam(paramName, value string) *Request { if specialParams.Has(paramName) { r.err = fmt.Errorf("must set %v through the corresponding function, not directly.", paramName) @@ -613,41 +637,6 @@ func (r *Request) Stream() (io.ReadCloser, error) { } } -// Upgrade upgrades the request so that it supports multiplexed bidirectional -// streams. The current implementation uses SPDY, but this could be replaced -// with HTTP/2 once it's available, or something else. -func (r *Request) Upgrade(config *Config, newRoundTripperFunc func(*tls.Config) httpstream.UpgradeRoundTripper) (httpstream.Connection, error) { - if r.err != nil { - return nil, r.err - } - - tlsConfig, err := TLSConfigFor(config) - if err != nil { - return nil, err - } - - upgradeRoundTripper := newRoundTripperFunc(tlsConfig) - wrapper, err := HTTPWrappersForConfig(config, upgradeRoundTripper) - if err != nil { - return nil, err - } - - r.client = &http.Client{Transport: wrapper} - - req, err := http.NewRequest(r.verb, r.URL().String(), nil) - if err != nil { - return nil, fmt.Errorf("Error creating request: %s", err) - } - - resp, err := r.client.Do(req) - if err != nil { - return nil, fmt.Errorf("Error sending request: %s", err) - } - defer resp.Body.Close() - - return upgradeRoundTripper.NewConnection(resp) -} - // request connects to the server and invokes the provided function when a server response is // received. It handles retry behavior and up front validation of requests. It wil invoke // fn at most once. It will return an error if a problem occurred prior to connecting to the diff --git a/pkg/client/unversioned/request_test.go b/pkg/client/unversioned/request_test.go index 56998dfcf5a..c73c5123307 100644 --- a/pkg/client/unversioned/request_test.go +++ b/pkg/client/unversioned/request_test.go @@ -18,7 +18,6 @@ package unversioned import ( "bytes" - "crypto/tls" "encoding/base64" "errors" "io" @@ -158,6 +157,22 @@ func TestRequestParam(t *testing.T) { } } +func TestRequestVersionedParams(t *testing.T) { + r := (&Request{}).Param("foo", "a") + if !api.Semantic.DeepDerivative(r.params, url.Values{"foo": []string{"a"}}) { + t.Errorf("should have set a param: %#v", r) + } + r.VersionedParams(&api.PodLogOptions{Follow: true, Container: "bar"}, api.Scheme) + + if !api.Semantic.DeepDerivative(r.params, url.Values{ + "foo": []string{"a"}, + "container": []string{"bar"}, + "follow": []string{"1"}, + }) { + t.Errorf("should have set a param: %#v", r) + } +} + func TestRequestURI(t *testing.T) { r := (&Request{}).Param("foo", "a") r.Prefix("other") @@ -595,88 +610,6 @@ func (f *fakeUpgradeRoundTripper) NewConnection(resp *http.Response) (httpstream return f.conn, nil } -func TestRequestUpgrade(t *testing.T) { - uri, _ := url.Parse("http://localhost/") - testCases := []struct { - Request *Request - Config *Config - RoundTripper *fakeUpgradeRoundTripper - Err bool - AuthBasicHeader bool - AuthBearerHeader bool - }{ - { - Request: &Request{err: errors.New("bail")}, - Err: true, - }, - { - Request: &Request{}, - Config: &Config{ - TLSClientConfig: TLSClientConfig{ - CAFile: "foo", - }, - Insecure: true, - }, - Err: true, - }, - { - Request: &Request{}, - Config: &Config{ - Username: "u", - Password: "p", - BearerToken: "b", - }, - Err: true, - }, - { - Request: NewRequest(nil, "", uri, testapi.Default.Version(), testapi.Default.Codec()), - Config: &Config{ - Username: "u", - Password: "p", - }, - AuthBasicHeader: true, - Err: false, - }, - { - Request: NewRequest(nil, "", uri, testapi.Default.Version(), testapi.Default.Codec()), - Config: &Config{ - BearerToken: "b", - }, - AuthBearerHeader: true, - Err: false, - }, - } - for i, testCase := range testCases { - r := testCase.Request - rt := &fakeUpgradeRoundTripper{} - expectedConn := &fakeUpgradeConnection{} - conn, err := r.Upgrade(testCase.Config, func(config *tls.Config) httpstream.UpgradeRoundTripper { - rt.conn = expectedConn - return rt - }) - _ = conn - hasErr := err != nil - if hasErr != testCase.Err { - t.Errorf("%d: expected %t, got %t: %v", i, testCase.Err, hasErr, r.err) - } - if testCase.Err { - continue - } - - if testCase.AuthBasicHeader && !strings.Contains(rt.req.Header.Get("Authorization"), "Basic") { - t.Errorf("%d: expected basic auth header, got: %s", i, rt.req.Header.Get("Authorization")) - } - - if testCase.AuthBearerHeader && !strings.Contains(rt.req.Header.Get("Authorization"), "Bearer") { - t.Errorf("%d: expected bearer auth header, got: %s", i, rt.req.Header.Get("Authorization")) - } - - if e, a := expectedConn, conn; e != a { - t.Errorf("%d: conn: expected %#v, got %#v", i, e, a) - } - } -} - func TestRequestDo(t *testing.T) { testCases := []struct { Request *Request diff --git a/pkg/kubectl/cmd/attach.go b/pkg/kubectl/cmd/attach.go index b56fdb5a409..6dcd579ccc9 100644 --- a/pkg/kubectl/cmd/attach.go +++ b/pkg/kubectl/cmd/attach.go @@ -19,6 +19,7 @@ package cmd import ( "fmt" "io" + "net/url" "os" "os/signal" "syscall" @@ -72,15 +73,18 @@ func NewCmdAttach(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer) // RemoteAttach defines the interface accepted by the Attach command - provided for test stubbing type RemoteAttach interface { - Attach(req *client.Request, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error + Attach(method string, url *url.URL, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error } // DefaultRemoteAttach is the standard implementation of attaching type DefaultRemoteAttach struct{} -func (*DefaultRemoteAttach) Attach(req *client.Request, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { - attach := remotecommand.NewAttach(req, config, stdin, stdout, stderr, tty) - return attach.Execute() +func (*DefaultRemoteAttach) Attach(method string, url *url.URL, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { + exec, err := remotecommand.NewExecutor(config, method, url) + if err != nil { + return err + } + return exec.Stream(stdin, stdout, stderr, tty) } // AttachOptions declare the arguments accepted by the Exec command @@ -201,7 +205,7 @@ func (p *AttachOptions) Run() error { SubResource("attach"). Param("container", p.GetContainerName(pod)) - return p.Attach.Attach(req, p.Config, stdin, p.Out, p.Err, tty) + return p.Attach.Attach("POST", req.URL(), p.Config, stdin, p.Out, p.Err, tty) } // GetContainerName returns the name of the container to attach to, with a fallback. diff --git a/pkg/kubectl/cmd/attach_test.go b/pkg/kubectl/cmd/attach_test.go index 6c66d1946d5..acf44470fd9 100644 --- a/pkg/kubectl/cmd/attach_test.go +++ b/pkg/kubectl/cmd/attach_test.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "net/http" + "net/url" "testing" "github.com/spf13/cobra" @@ -32,12 +33,14 @@ import ( ) type fakeRemoteAttach struct { - req *client.Request + method string + url *url.URL attachErr error } -func (f *fakeRemoteAttach) Attach(req *client.Request, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { - f.req = req +func (f *fakeRemoteAttach) Attach(method string, url *url.URL, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { + f.method = method + f.url = url return f.attachErr } @@ -173,10 +176,16 @@ func TestAttach(t *testing.T) { t.Errorf("%s: Unexpected error: %v", test.name, err) continue } - if !test.attachErr && ex.req.URL().Path != test.attachPath { + if test.attachErr { + continue + } + if ex.url.Path != test.attachPath { t.Errorf("%s: Did not get expected path for exec request", test.name) continue } + if ex.method != "POST" { + t.Errorf("%s: Did not get method for attach request: %s", test.name, ex.method) + } } } diff --git a/pkg/kubectl/cmd/exec.go b/pkg/kubectl/cmd/exec.go index 9d921ed41cf..a0c0c88924b 100644 --- a/pkg/kubectl/cmd/exec.go +++ b/pkg/kubectl/cmd/exec.go @@ -19,6 +19,7 @@ package cmd import ( "fmt" "io" + "net/url" "os" "os/signal" "syscall" @@ -73,15 +74,18 @@ func NewCmdExec(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer) * // RemoteExecutor defines the interface accepted by the Exec command - provided for test stubbing type RemoteExecutor interface { - Execute(req *client.Request, config *client.Config, command []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error + Execute(method string, url *url.URL, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error } // DefaultRemoteExecutor is the standard implementation of remote command execution type DefaultRemoteExecutor struct{} -func (*DefaultRemoteExecutor) Execute(req *client.Request, config *client.Config, command []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { - executor := remotecommand.New(req, config, command, stdin, stdout, stderr, tty) - return executor.Execute() +func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { + exec, err := remotecommand.NewExecutor(config, method, url) + if err != nil { + return err + } + return exec.Stream(stdin, stdout, stderr, tty) } // ExecOptions declare the arguments accepted by the Exec command @@ -220,6 +224,14 @@ func (p *ExecOptions) Run() error { Namespace(pod.Namespace). SubResource("exec"). Param("container", containerName) + req.VersionedParams(&api.PodExecOptions{ + Container: containerName, + Command: p.Command, + Stdin: stdin != nil, + Stdout: p.Out != nil, + Stderr: p.Err != nil, + TTY: tty, + }, api.Scheme) - return p.Executor.Execute(req, p.Config, p.Command, stdin, p.Out, p.Err, tty) + return p.Executor.Execute("POST", req.URL(), p.Config, stdin, p.Out, p.Err, tty) } diff --git a/pkg/kubectl/cmd/exec_test.go b/pkg/kubectl/cmd/exec_test.go index b9a5ace0035..dd7b85d2ffe 100644 --- a/pkg/kubectl/cmd/exec_test.go +++ b/pkg/kubectl/cmd/exec_test.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "net/http" + "net/url" "reflect" "testing" @@ -33,12 +34,14 @@ import ( ) type fakeRemoteExecutor struct { - req *client.Request + method string + url *url.URL execErr error } -func (f *fakeRemoteExecutor) Execute(req *client.Request, config *client.Config, command []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { - f.req = req +func (f *fakeRemoteExecutor) Execute(method string, url *url.URL, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { + f.method = method + f.url = url return f.execErr } @@ -198,10 +201,16 @@ func TestExec(t *testing.T) { t.Errorf("%s: Unexpected error: %v", test.name, err) continue } - if !test.execErr && ex.req.URL().Path != test.execPath { + if test.execErr { + continue + } + if ex.url.Path != test.execPath { t.Errorf("%s: Did not get expected path for exec request", test.name) continue } + if ex.method != "POST" { + t.Errorf("%s: Did not get method for exec request: %s", test.name, ex.method) + } } } diff --git a/pkg/kubectl/cmd/portforward.go b/pkg/kubectl/cmd/portforward.go index 75b11603a8f..942a1c5009e 100644 --- a/pkg/kubectl/cmd/portforward.go +++ b/pkg/kubectl/cmd/portforward.go @@ -17,6 +17,7 @@ limitations under the License. package cmd import ( + "net/url" "os" "os/signal" @@ -25,6 +26,7 @@ import ( "k8s.io/kubernetes/pkg/api" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/portforward" + "k8s.io/kubernetes/pkg/client/unversioned/remotecommand" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" ) @@ -60,13 +62,17 @@ func NewCmdPortForward(f *cmdutil.Factory) *cobra.Command { } type portForwarder interface { - ForwardPorts(req *client.Request, config *client.Config, ports []string, stopChan <-chan struct{}) error + ForwardPorts(method string, url *url.URL, config *client.Config, ports []string, stopChan <-chan struct{}) error } type defaultPortForwarder struct{} -func (*defaultPortForwarder) ForwardPorts(req *client.Request, config *client.Config, ports []string, stopChan <-chan struct{}) error { - fw, err := portforward.New(req, config, ports, stopChan) +func (*defaultPortForwarder) ForwardPorts(method string, url *url.URL, config *client.Config, ports []string, stopChan <-chan struct{}) error { + dialer, err := remotecommand.NewExecutor(config, method, url) + if err != nil { + return err + } + fw, err := portforward.New(dialer, ports, stopChan) if err != nil { return err } @@ -130,5 +136,5 @@ func RunPortForward(f *cmdutil.Factory, cmd *cobra.Command, args []string, fw po Name(pod.Name). SubResource("portforward") - return fw.ForwardPorts(req, config, args, stopCh) + return fw.ForwardPorts("POST", req.URL(), config, args, stopCh) } diff --git a/pkg/kubectl/cmd/portforward_test.go b/pkg/kubectl/cmd/portforward_test.go index 231f123f37e..7092eb7aac8 100644 --- a/pkg/kubectl/cmd/portforward_test.go +++ b/pkg/kubectl/cmd/portforward_test.go @@ -19,6 +19,7 @@ package cmd import ( "fmt" "net/http" + "net/url" "testing" "github.com/spf13/cobra" @@ -30,12 +31,14 @@ import ( ) type fakePortForwarder struct { - req *client.Request - pfErr error + method string + url *url.URL + pfErr error } -func (f *fakePortForwarder) ForwardPorts(req *client.Request, config *client.Config, ports []string, stopChan <-chan struct{}) error { - f.req = req +func (f *fakePortForwarder) ForwardPorts(method string, url *url.URL, config *client.Config, ports []string, stopChan <-chan struct{}) error { + f.method = method + f.url = url return f.pfErr } @@ -92,12 +95,20 @@ func TestPortForward(t *testing.T) { if test.pfErr && err != ff.pfErr { t.Errorf("%s: Unexpected exec error: %v", test.name, err) } - if !test.pfErr && ff.req.URL().Path != test.pfPath { - t.Errorf("%s: Did not get expected path for portforward request", test.name) - } if !test.pfErr && err != nil { t.Errorf("%s: Unexpected error: %v", test.name, err) } + if test.pfErr { + continue + } + + if ff.url.Path != test.pfPath { + t.Errorf("%s: Did not get expected path for portforward request", test.name) + } + if ff.method != "POST" { + t.Errorf("%s: Did not get method for attach request: %s", test.name, ff.method) + } + } } @@ -154,7 +165,7 @@ func TestPortForwardWithPFlag(t *testing.T) { if test.pfErr && err != ff.pfErr { t.Errorf("%s: Unexpected exec error: %v", test.name, err) } - if !test.pfErr && ff.req.URL().Path != test.pfPath { + if !test.pfErr && ff.url.Path != test.pfPath { t.Errorf("%s: Did not get expected path for portforward request", test.name) } if !test.pfErr && err != nil { diff --git a/pkg/util/httpstream/httpstream.go b/pkg/util/httpstream/httpstream.go index b61af6224d5..fff6840f279 100644 --- a/pkg/util/httpstream/httpstream.go +++ b/pkg/util/httpstream/httpstream.go @@ -37,6 +37,11 @@ type NewStreamHandler func(Stream) error // performs no other logic. func NoOpNewStreamHandler(stream Stream) error { return nil } +// Dialer knows how to open a streaming connection to a server. +type Dialer interface { + Dial() (Connection, error) +} + // UpgradeRoundTripper is a type of http.RoundTripper that is able to upgrade // HTTP requests to support multiplexed bidirectional streams. After RoundTrip() // is invoked, if the upgrade is successful, clients may retrieve the upgraded