diff --git a/pkg/client/unversioned/portforward/portforward.go b/pkg/client/unversioned/portforward/portforward.go index 925c5de4026..36916bdcf49 100644 --- a/pkg/client/unversioned/portforward/portforward.go +++ b/pkg/client/unversioned/portforward/portforward.go @@ -29,33 +29,19 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" - client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/httpstream" - "k8s.io/kubernetes/pkg/util/httpstream/spdy" ) -type upgrader interface { - upgrade(*client.Request, *client.Config) (httpstream.Connection, error) -} - -type defaultUpgrader struct{} - -func (u *defaultUpgrader) upgrade(req *client.Request, config *client.Config) (httpstream.Connection, error) { - return req.Upgrade(config, spdy.NewRoundTripper) -} - // PortForwarder knows how to listen for local connections and forward them to // a remote pod via an upgraded HTTP request. type PortForwarder struct { - req *client.Request - config *client.Config ports []ForwardedPort stopChan <-chan struct{} + dialer httpstream.Dialer streamConn httpstream.Connection listeners []io.Closer - upgrader upgrader Ready chan struct{} requestIDLock sync.Mutex requestID int @@ -120,7 +106,7 @@ func parsePorts(ports []string) ([]ForwardedPort, error) { } // New creates a new PortForwarder. -func New(req *client.Request, config *client.Config, ports []string, stopChan <-chan struct{}) (*PortForwarder, error) { +func New(dialer httpstream.Dialer, ports []string, stopChan <-chan struct{}) (*PortForwarder, error) { if len(ports) == 0 { return nil, errors.New("You must specify at least 1 port") } @@ -128,10 +114,8 @@ func New(req *client.Request, config *client.Config, ports []string, stopChan <- if err != nil { return nil, err } - return &PortForwarder{ - req: req, - config: config, + dialer: dialer, ports: parsedPorts, stopChan: stopChan, Ready: make(chan struct{}), @@ -143,11 +127,8 @@ func New(req *client.Request, config *client.Config, ports []string, stopChan <- func (pf *PortForwarder) ForwardPorts() error { defer pf.Close() - if pf.upgrader == nil { - pf.upgrader = &defaultUpgrader{} - } var err error - pf.streamConn, err = pf.upgrader.upgrade(pf.req, pf.config) + pf.streamConn, err = pf.dialer.Dial() if err != nil { return fmt.Errorf("error upgrading connection: %s", err) } diff --git a/pkg/client/unversioned/portforward/portforward_test.go b/pkg/client/unversioned/portforward/portforward_test.go index 0bd2ed88ade..f208a9e8814 100644 --- a/pkg/client/unversioned/portforward/portforward_test.go +++ b/pkg/client/unversioned/portforward/portforward_test.go @@ -31,10 +31,23 @@ import ( "time" client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/client/unversioned/remotecommand" "k8s.io/kubernetes/pkg/kubelet" "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util/httpstream" ) +type fakeDialer struct { + dialed bool + conn httpstream.Connection + err error +} + +func (d *fakeDialer) Dial() (httpstream.Connection, error) { + d.dialed = true + return d.conn, d.err +} + func TestParsePortsAndNew(t *testing.T) { tests := []struct { input []string @@ -71,10 +84,9 @@ func TestParsePortsAndNew(t *testing.T) { t.Fatalf("%d: parsePorts: error expected=%t, got %t: %s", i, e, a, err) } - expectedRequest := &client.Request{} - expectedConfig := &client.Config{} + dialer := &fakeDialer{} expectedStopChan := make(chan struct{}) - pf, err := New(expectedRequest, expectedConfig, test.input, expectedStopChan) + pf, err := New(dialer, test.input, expectedStopChan) haveError = err != nil if e, a := test.expectNewError, haveError; e != a { t.Fatalf("%d: New: error expected=%t, got %t: %s", i, e, a, err) @@ -93,11 +105,8 @@ func TestParsePortsAndNew(t *testing.T) { } } - if e, a := expectedRequest, pf.req; e != a { - t.Fatalf("%d: req: expected %#v, got %#v", i, e, a) - } - if e, a := expectedConfig, pf.config; e != a { - t.Fatalf("%d: config: expected %#v, got %#v", i, e, a) + if dialer.dialed { + t.Fatalf("%d: expected not dialed", i) } if e, a := test.expected, pf.ports; !reflect.DeepEqual(e, a) { t.Fatalf("%d: ports: expected %#v, got %#v", i, e, a) @@ -293,17 +302,16 @@ func TestForwardPorts(t *testing.T) { for testName, test := range tests { server := httptest.NewServer(fakePortForwardServer(t, testName, test.serverSends, test.clientSends)) - url, _ := url.ParseRequestURI(server.URL) - c := client.NewRESTClient(url, "x", nil, -1, -1) - req := c.Post().Resource("testing") - conf := &client.Config{ - Host: server.URL, + url, _ := url.Parse(server.URL) + exec, err := remotecommand.NewExecutor(&client.Config{}, "POST", url) + if err != nil { + t.Fatal(err) } stopChan := make(chan struct{}, 1) - pf, err := New(req, conf, test.ports, stopChan) + pf, err := New(exec, test.ports, stopChan) if err != nil { t.Fatalf("%s: unexpected error calling New: %v", testName, err) } @@ -363,18 +371,17 @@ func TestForwardPorts(t *testing.T) { func TestForwardPortsReturnsErrorWhenAllBindsFailed(t *testing.T) { server := httptest.NewServer(fakePortForwardServer(t, "allBindsFailed", nil, nil)) defer server.Close() - url, _ := url.ParseRequestURI(server.URL) - c := client.NewRESTClient(url, "x", nil, -1, -1) - req := c.Post().Resource("testing") - conf := &client.Config{ - Host: server.URL, + url, _ := url.Parse(server.URL) + exec, err := remotecommand.NewExecutor(&client.Config{}, "POST", url) + if err != nil { + t.Fatal(err) } stopChan1 := make(chan struct{}, 1) defer close(stopChan1) - pf1, err := New(req, conf, []string{"5555"}, stopChan1) + pf1, err := New(exec, []string{"5555"}, stopChan1) if err != nil { t.Fatalf("error creating pf1: %v", err) } @@ -382,7 +389,7 @@ func TestForwardPortsReturnsErrorWhenAllBindsFailed(t *testing.T) { <-pf1.Ready stopChan2 := make(chan struct{}, 1) - pf2, err := New(&client.Request{}, &client.Config{}, []string{"5555"}, stopChan2) + pf2, err := New(exec, []string{"5555"}, stopChan2) if err != nil { t.Fatalf("error creating pf2: %v", err) } diff --git a/pkg/client/unversioned/remotecommand/remotecommand.go b/pkg/client/unversioned/remotecommand/remotecommand.go index e63af79aa0a..505e3fc9c1b 100644 --- a/pkg/client/unversioned/remotecommand/remotecommand.go +++ b/pkg/client/unversioned/remotecommand/remotecommand.go @@ -21,141 +21,127 @@ import ( "io" "io/ioutil" "net/http" + "net/url" "sync" "k8s.io/kubernetes/pkg/api" client "k8s.io/kubernetes/pkg/client/unversioned" - "k8s.io/kubernetes/pkg/conversion/queryparams" - "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/httpstream" "k8s.io/kubernetes/pkg/util/httpstream/spdy" ) -type upgrader interface { - upgrade(*client.Request, *client.Config) (httpstream.Connection, error) +// Executor is an interface for transporting shell-style streams. +type Executor interface { + // Stream initiates the transport of the standard shell streams. It will transport any + // non-nil stream to a remote system, and return an error if a problem occurs. If tty + // is set, the stderr stream is not used (raw TTY manages stdout and stderr over the + // stdout stream). + Stream(stdin io.Reader, stdout, stderr io.Writer, tty bool) error } -type defaultUpgrader struct{} - -func (u *defaultUpgrader) upgrade(req *client.Request, config *client.Config) (httpstream.Connection, error) { - return req.Upgrade(config, spdy.NewRoundTripper) +// StreamExecutor supports the ability to dial an httpstream connection and the ability to +// run a command line stream protocol over that dialer. +type StreamExecutor interface { + Executor + httpstream.Dialer } -type Streamer struct { - req *client.Request - config *client.Config - stdin io.Reader - stdout io.Writer - stderr io.Writer - tty bool +// streamExecutor handles transporting standard shell streams over an httpstream connection. +type streamExecutor struct { + upgrader httpstream.UpgradeRoundTripper + transport http.RoundTripper - upgrader upgrader + method string + url *url.URL } -// Executor executes a command on a pod container -type Executor struct { - Streamer - command []string -} - -// New creates a new RemoteCommandExecutor -func New(req *client.Request, config *client.Config, command []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) *Executor { - return &Executor{ - command: command, - Streamer: Streamer{ - req: req, - config: config, - stdin: stdin, - stdout: stdout, - stderr: stderr, - tty: tty, - }, - } -} - -type Attach struct { - Streamer -} - -// NewAttach creates a new RemoteAttach -func NewAttach(req *client.Request, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) *Attach { - return &Attach{ - Streamer: Streamer{ - req: req, - config: config, - stdin: stdin, - stdout: stdout, - stderr: stderr, - tty: tty, - }, - } -} - -// Execute sends a remote command execution request, upgrading the -// connection and creating streams to represent stdin/stdout/stderr. Data is -// copied between these streams and the supplied stdin/stdout/stderr parameters. -func (e *Attach) Execute() error { - opts := api.PodAttachOptions{ - Stdin: (e.stdin != nil), - Stdout: (e.stdout != nil), - Stderr: (!e.tty && e.stderr != nil), - TTY: e.tty, - } - - if err := e.setupRequestParameters(&opts); err != nil { - return err - } - - return e.doStream() -} - -// Execute sends a remote command execution request, upgrading the -// connection and creating streams to represent stdin/stdout/stderr. Data is -// copied between these streams and the supplied stdin/stdout/stderr parameters. -func (e *Executor) Execute() error { - opts := api.PodExecOptions{ - Stdin: (e.stdin != nil), - Stdout: (e.stdout != nil), - Stderr: (!e.tty && e.stderr != nil), - TTY: e.tty, - Command: e.command, - } - - if err := e.setupRequestParameters(&opts); err != nil { - return err - } - - return e.doStream() -} - -func (e *Streamer) setupRequestParameters(obj runtime.Object) error { - versioned, err := api.Scheme.ConvertToVersion(obj, e.config.Version) +// NewExecutor connects to the provided server and upgrades the connection to +// multiplexed bidirectional streams. The current implementation uses SPDY, +// but this could be replaced with HTTP/2 once it's available, or something else. +// TODO: the common code between this and portforward could be abstracted. +func NewExecutor(config *client.Config, method string, url *url.URL) (StreamExecutor, error) { + tlsConfig, err := client.TLSConfigFor(config) if err != nil { - return err + return nil, err } - params, err := queryparams.Convert(versioned) + + upgradeRoundTripper := spdy.NewRoundTripper(tlsConfig) + wrapper, err := client.HTTPWrappersForConfig(config, upgradeRoundTripper) if err != nil { - return err + return nil, err } - for k, v := range params { - for _, vv := range v { - e.req.Param(k, vv) - } - } - return nil + + return &streamExecutor{ + upgrader: upgradeRoundTripper, + transport: wrapper, + method: method, + url: url, + }, nil } -func (e *Streamer) doStream() error { - if e.upgrader == nil { - e.upgrader = &defaultUpgrader{} +// NewStreamExecutor upgrades the request so that it supports multiplexed bidirectional +// streams. This method takes a stream upgrader and an optional function that is invoked +// to wrap the round tripper. This method may be used by clients that are lower level than +// Kubernetes clients or need to provide their own upgrade round tripper. +func NewStreamExecutor(upgrader httpstream.UpgradeRoundTripper, fn func(http.RoundTripper) http.RoundTripper, method string, url *url.URL) (StreamExecutor, error) { + var rt http.RoundTripper = upgrader + if fn != nil { + rt = fn(rt) } - conn, err := e.upgrader.upgrade(e.req, e.config) + return &streamExecutor{ + upgrader: upgrader, + transport: rt, + method: method, + url: url, + }, nil +} + +// Dial opens a connection to a remote server and attempts to negotiate a SPDY connection. +func (e *streamExecutor) Dial() (httpstream.Connection, error) { + client := &http.Client{Transport: e.transport} + + req, err := http.NewRequest(e.method, e.url.String(), nil) + if err != nil { + return nil, fmt.Errorf("error creating request: %s", err) + } + + resp, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("error sending request: %s", err) + } + defer resp.Body.Close() + + // TODO: handle protocol selection in the future + return e.upgrader.NewConnection(resp) +} + +// Stream opens a protocol streamer to the server and streams until a client closes +// the connection or the server disconnects. +func (e *streamExecutor) Stream(stdin io.Reader, stdout, stderr io.Writer, tty bool) error { + conn, err := e.Dial() if err != nil { return err } defer conn.Close() + // TODO: negotiate protocols + streamer := &streamProtocol{ + stdin: stdin, + stdout: stdout, + stderr: stderr, + tty: tty, + } + return streamer.stream(conn) +} +type streamProtocol struct { + stdin io.Reader + stdout io.Writer + stderr io.Writer + tty bool +} + +func (e *streamProtocol) stream(conn httpstream.Connection) error { headers := http.Header{} // set up error stream diff --git a/pkg/client/unversioned/remotecommand/remotecommand_test.go b/pkg/client/unversioned/remotecommand/remotecommand_test.go index 9870ee259bc..56cb3372bfb 100644 --- a/pkg/client/unversioned/remotecommand/remotecommand_test.go +++ b/pkg/client/unversioned/remotecommand/remotecommand_test.go @@ -20,6 +20,7 @@ import ( "bytes" "fmt" "io" + "io/ioutil" "net/http" "net/http/httptest" "net/url" @@ -183,12 +184,17 @@ func TestRequestExecuteRemoteCommand(t *testing.T) { url, _ := url.ParseRequestURI(server.URL) c := client.NewRESTClient(url, "x", nil, -1, -1) req := c.Post().Resource("testing") - + req.Param("command", "ls") + req.Param("command", "/") conf := &client.Config{ Host: server.URL, } - e := New(req, conf, []string{"ls", "/"}, strings.NewReader(strings.Repeat(testCase.Stdin, testCase.MessageCount)), localOut, localErr, testCase.Tty) - err := e.Execute() + e, err := NewExecutor(conf, "POST", req.URL()) + if err != nil { + t.Errorf("%d: unexpected error: %v", i, err) + continue + } + err = e.Stream(strings.NewReader(strings.Repeat(testCase.Stdin, testCase.MessageCount)), localOut, localErr, testCase.Tty) hasErr := err != nil if len(testCase.Error) > 0 { @@ -263,8 +269,12 @@ func TestRequestAttachRemoteCommand(t *testing.T) { conf := &client.Config{ Host: server.URL, } - e := NewAttach(req, conf, strings.NewReader(testCase.Stdin), localOut, localErr, testCase.Tty) - err := e.Execute() + e, err := NewExecutor(conf, "POST", req.URL()) + if err != nil { + t.Errorf("%d: unexpected error: %v", i, err) + continue + } + err = e.Stream(strings.NewReader(testCase.Stdin), localOut, localErr, testCase.Tty) hasErr := err != nil if len(testCase.Error) > 0 { @@ -301,3 +311,66 @@ func TestRequestAttachRemoteCommand(t *testing.T) { server.Close() } } + +type fakeUpgrader struct { + req *http.Request + resp *http.Response + conn httpstream.Connection + err, connErr error + checkResponse bool + + t *testing.T +} + +func (u *fakeUpgrader) RoundTrip(req *http.Request) (*http.Response, error) { + u.req = req + return u.resp, u.err +} + +func (u *fakeUpgrader) NewConnection(resp *http.Response) (httpstream.Connection, error) { + if u.checkResponse && u.resp != resp { + u.t.Errorf("response objects passed did not match: %#v", resp) + } + return u.conn, u.connErr +} + +type fakeConnection struct { + httpstream.Connection +} + +// Dial is the common functionality between any stream based upgrader, regardless of protocol. +// This method ensures that someone can use a generic stream executor without being dependent +// on the core Kube client config behavior. +func TestDial(t *testing.T) { + upgrader := &fakeUpgrader{ + t: t, + checkResponse: true, + conn: &fakeConnection{}, + resp: &http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(&bytes.Buffer{}), + }, + } + var called bool + testFn := func(rt http.RoundTripper) http.RoundTripper { + if rt != upgrader { + t.Fatalf("unexpected round tripper: %#v", rt) + } + called = true + return rt + } + exec, err := NewStreamExecutor(upgrader, testFn, "POST", &url.URL{Host: "something.com", Scheme: "https"}) + if err != nil { + t.Fatal(err) + } + conn, err := exec.Dial() + if err != nil { + t.Fatal(err) + } + if conn != upgrader.conn { + t.Errorf("unexpected connection: %#v", conn) + } + if !called { + t.Errorf("wrapper not called") + } +} diff --git a/pkg/client/unversioned/request.go b/pkg/client/unversioned/request.go index afe491dcbf4..67df03ca8ac 100644 --- a/pkg/client/unversioned/request.go +++ b/pkg/client/unversioned/request.go @@ -18,7 +18,6 @@ package unversioned import ( "bytes" - "crypto/tls" "fmt" "io" "io/ioutil" @@ -35,11 +34,11 @@ import ( "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/metrics" + "k8s.io/kubernetes/pkg/conversion/queryparams" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" - "k8s.io/kubernetes/pkg/util/httpstream" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/watch" watchjson "k8s.io/kubernetes/pkg/watch/json" @@ -406,6 +405,31 @@ func (r *Request) Param(paramName, s string) *Request { return r.setParam(paramName, s) } +// VersionedParams will take the provided object, serialize it to a map[string][]string using the +// implicit RESTClient API version and the provided object convertor, and then add those as parameters +// to the request. Use this to provide versioned query parameters from client libraries. +func (r *Request) VersionedParams(obj runtime.Object, convertor runtime.ObjectConvertor) *Request { + if r.err != nil { + return r + } + versioned, err := convertor.ConvertToVersion(obj, r.apiVersion) + if err != nil { + r.err = err + return r + } + params, err := queryparams.Convert(versioned) + if err != nil { + r.err = err + return r + } + for k, v := range params { + for _, vv := range v { + r.setParam(k, vv) + } + } + return r +} + func (r *Request) setParam(paramName, value string) *Request { if specialParams.Has(paramName) { r.err = fmt.Errorf("must set %v through the corresponding function, not directly.", paramName) @@ -613,41 +637,6 @@ func (r *Request) Stream() (io.ReadCloser, error) { } } -// Upgrade upgrades the request so that it supports multiplexed bidirectional -// streams. The current implementation uses SPDY, but this could be replaced -// with HTTP/2 once it's available, or something else. -func (r *Request) Upgrade(config *Config, newRoundTripperFunc func(*tls.Config) httpstream.UpgradeRoundTripper) (httpstream.Connection, error) { - if r.err != nil { - return nil, r.err - } - - tlsConfig, err := TLSConfigFor(config) - if err != nil { - return nil, err - } - - upgradeRoundTripper := newRoundTripperFunc(tlsConfig) - wrapper, err := HTTPWrappersForConfig(config, upgradeRoundTripper) - if err != nil { - return nil, err - } - - r.client = &http.Client{Transport: wrapper} - - req, err := http.NewRequest(r.verb, r.URL().String(), nil) - if err != nil { - return nil, fmt.Errorf("Error creating request: %s", err) - } - - resp, err := r.client.Do(req) - if err != nil { - return nil, fmt.Errorf("Error sending request: %s", err) - } - defer resp.Body.Close() - - return upgradeRoundTripper.NewConnection(resp) -} - // request connects to the server and invokes the provided function when a server response is // received. It handles retry behavior and up front validation of requests. It wil invoke // fn at most once. It will return an error if a problem occurred prior to connecting to the diff --git a/pkg/client/unversioned/request_test.go b/pkg/client/unversioned/request_test.go index 56998dfcf5a..c73c5123307 100644 --- a/pkg/client/unversioned/request_test.go +++ b/pkg/client/unversioned/request_test.go @@ -18,7 +18,6 @@ package unversioned import ( "bytes" - "crypto/tls" "encoding/base64" "errors" "io" @@ -158,6 +157,22 @@ func TestRequestParam(t *testing.T) { } } +func TestRequestVersionedParams(t *testing.T) { + r := (&Request{}).Param("foo", "a") + if !api.Semantic.DeepDerivative(r.params, url.Values{"foo": []string{"a"}}) { + t.Errorf("should have set a param: %#v", r) + } + r.VersionedParams(&api.PodLogOptions{Follow: true, Container: "bar"}, api.Scheme) + + if !api.Semantic.DeepDerivative(r.params, url.Values{ + "foo": []string{"a"}, + "container": []string{"bar"}, + "follow": []string{"1"}, + }) { + t.Errorf("should have set a param: %#v", r) + } +} + func TestRequestURI(t *testing.T) { r := (&Request{}).Param("foo", "a") r.Prefix("other") @@ -595,88 +610,6 @@ func (f *fakeUpgradeRoundTripper) NewConnection(resp *http.Response) (httpstream return f.conn, nil } -func TestRequestUpgrade(t *testing.T) { - uri, _ := url.Parse("http://localhost/") - testCases := []struct { - Request *Request - Config *Config - RoundTripper *fakeUpgradeRoundTripper - Err bool - AuthBasicHeader bool - AuthBearerHeader bool - }{ - { - Request: &Request{err: errors.New("bail")}, - Err: true, - }, - { - Request: &Request{}, - Config: &Config{ - TLSClientConfig: TLSClientConfig{ - CAFile: "foo", - }, - Insecure: true, - }, - Err: true, - }, - { - Request: &Request{}, - Config: &Config{ - Username: "u", - Password: "p", - BearerToken: "b", - }, - Err: true, - }, - { - Request: NewRequest(nil, "", uri, testapi.Default.Version(), testapi.Default.Codec()), - Config: &Config{ - Username: "u", - Password: "p", - }, - AuthBasicHeader: true, - Err: false, - }, - { - Request: NewRequest(nil, "", uri, testapi.Default.Version(), testapi.Default.Codec()), - Config: &Config{ - BearerToken: "b", - }, - AuthBearerHeader: true, - Err: false, - }, - } - for i, testCase := range testCases { - r := testCase.Request - rt := &fakeUpgradeRoundTripper{} - expectedConn := &fakeUpgradeConnection{} - conn, err := r.Upgrade(testCase.Config, func(config *tls.Config) httpstream.UpgradeRoundTripper { - rt.conn = expectedConn - return rt - }) - _ = conn - hasErr := err != nil - if hasErr != testCase.Err { - t.Errorf("%d: expected %t, got %t: %v", i, testCase.Err, hasErr, r.err) - } - if testCase.Err { - continue - } - - if testCase.AuthBasicHeader && !strings.Contains(rt.req.Header.Get("Authorization"), "Basic") { - t.Errorf("%d: expected basic auth header, got: %s", i, rt.req.Header.Get("Authorization")) - } - - if testCase.AuthBearerHeader && !strings.Contains(rt.req.Header.Get("Authorization"), "Bearer") { - t.Errorf("%d: expected bearer auth header, got: %s", i, rt.req.Header.Get("Authorization")) - } - - if e, a := expectedConn, conn; e != a { - t.Errorf("%d: conn: expected %#v, got %#v", i, e, a) - } - } -} - func TestRequestDo(t *testing.T) { testCases := []struct { Request *Request diff --git a/pkg/kubectl/cmd/attach.go b/pkg/kubectl/cmd/attach.go index b56fdb5a409..6dcd579ccc9 100644 --- a/pkg/kubectl/cmd/attach.go +++ b/pkg/kubectl/cmd/attach.go @@ -19,6 +19,7 @@ package cmd import ( "fmt" "io" + "net/url" "os" "os/signal" "syscall" @@ -72,15 +73,18 @@ func NewCmdAttach(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer) // RemoteAttach defines the interface accepted by the Attach command - provided for test stubbing type RemoteAttach interface { - Attach(req *client.Request, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error + Attach(method string, url *url.URL, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error } // DefaultRemoteAttach is the standard implementation of attaching type DefaultRemoteAttach struct{} -func (*DefaultRemoteAttach) Attach(req *client.Request, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { - attach := remotecommand.NewAttach(req, config, stdin, stdout, stderr, tty) - return attach.Execute() +func (*DefaultRemoteAttach) Attach(method string, url *url.URL, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { + exec, err := remotecommand.NewExecutor(config, method, url) + if err != nil { + return err + } + return exec.Stream(stdin, stdout, stderr, tty) } // AttachOptions declare the arguments accepted by the Exec command @@ -201,7 +205,7 @@ func (p *AttachOptions) Run() error { SubResource("attach"). Param("container", p.GetContainerName(pod)) - return p.Attach.Attach(req, p.Config, stdin, p.Out, p.Err, tty) + return p.Attach.Attach("POST", req.URL(), p.Config, stdin, p.Out, p.Err, tty) } // GetContainerName returns the name of the container to attach to, with a fallback. diff --git a/pkg/kubectl/cmd/attach_test.go b/pkg/kubectl/cmd/attach_test.go index 6c66d1946d5..acf44470fd9 100644 --- a/pkg/kubectl/cmd/attach_test.go +++ b/pkg/kubectl/cmd/attach_test.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "net/http" + "net/url" "testing" "github.com/spf13/cobra" @@ -32,12 +33,14 @@ import ( ) type fakeRemoteAttach struct { - req *client.Request + method string + url *url.URL attachErr error } -func (f *fakeRemoteAttach) Attach(req *client.Request, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { - f.req = req +func (f *fakeRemoteAttach) Attach(method string, url *url.URL, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { + f.method = method + f.url = url return f.attachErr } @@ -173,10 +176,16 @@ func TestAttach(t *testing.T) { t.Errorf("%s: Unexpected error: %v", test.name, err) continue } - if !test.attachErr && ex.req.URL().Path != test.attachPath { + if test.attachErr { + continue + } + if ex.url.Path != test.attachPath { t.Errorf("%s: Did not get expected path for exec request", test.name) continue } + if ex.method != "POST" { + t.Errorf("%s: Did not get method for attach request: %s", test.name, ex.method) + } } } diff --git a/pkg/kubectl/cmd/exec.go b/pkg/kubectl/cmd/exec.go index 9d921ed41cf..a0c0c88924b 100644 --- a/pkg/kubectl/cmd/exec.go +++ b/pkg/kubectl/cmd/exec.go @@ -19,6 +19,7 @@ package cmd import ( "fmt" "io" + "net/url" "os" "os/signal" "syscall" @@ -73,15 +74,18 @@ func NewCmdExec(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer) * // RemoteExecutor defines the interface accepted by the Exec command - provided for test stubbing type RemoteExecutor interface { - Execute(req *client.Request, config *client.Config, command []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error + Execute(method string, url *url.URL, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error } // DefaultRemoteExecutor is the standard implementation of remote command execution type DefaultRemoteExecutor struct{} -func (*DefaultRemoteExecutor) Execute(req *client.Request, config *client.Config, command []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { - executor := remotecommand.New(req, config, command, stdin, stdout, stderr, tty) - return executor.Execute() +func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { + exec, err := remotecommand.NewExecutor(config, method, url) + if err != nil { + return err + } + return exec.Stream(stdin, stdout, stderr, tty) } // ExecOptions declare the arguments accepted by the Exec command @@ -220,6 +224,14 @@ func (p *ExecOptions) Run() error { Namespace(pod.Namespace). SubResource("exec"). Param("container", containerName) + req.VersionedParams(&api.PodExecOptions{ + Container: containerName, + Command: p.Command, + Stdin: stdin != nil, + Stdout: p.Out != nil, + Stderr: p.Err != nil, + TTY: tty, + }, api.Scheme) - return p.Executor.Execute(req, p.Config, p.Command, stdin, p.Out, p.Err, tty) + return p.Executor.Execute("POST", req.URL(), p.Config, stdin, p.Out, p.Err, tty) } diff --git a/pkg/kubectl/cmd/exec_test.go b/pkg/kubectl/cmd/exec_test.go index b9a5ace0035..dd7b85d2ffe 100644 --- a/pkg/kubectl/cmd/exec_test.go +++ b/pkg/kubectl/cmd/exec_test.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "net/http" + "net/url" "reflect" "testing" @@ -33,12 +34,14 @@ import ( ) type fakeRemoteExecutor struct { - req *client.Request + method string + url *url.URL execErr error } -func (f *fakeRemoteExecutor) Execute(req *client.Request, config *client.Config, command []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { - f.req = req +func (f *fakeRemoteExecutor) Execute(method string, url *url.URL, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { + f.method = method + f.url = url return f.execErr } @@ -198,10 +201,16 @@ func TestExec(t *testing.T) { t.Errorf("%s: Unexpected error: %v", test.name, err) continue } - if !test.execErr && ex.req.URL().Path != test.execPath { + if test.execErr { + continue + } + if ex.url.Path != test.execPath { t.Errorf("%s: Did not get expected path for exec request", test.name) continue } + if ex.method != "POST" { + t.Errorf("%s: Did not get method for exec request: %s", test.name, ex.method) + } } } diff --git a/pkg/kubectl/cmd/portforward.go b/pkg/kubectl/cmd/portforward.go index 75b11603a8f..942a1c5009e 100644 --- a/pkg/kubectl/cmd/portforward.go +++ b/pkg/kubectl/cmd/portforward.go @@ -17,6 +17,7 @@ limitations under the License. package cmd import ( + "net/url" "os" "os/signal" @@ -25,6 +26,7 @@ import ( "k8s.io/kubernetes/pkg/api" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/portforward" + "k8s.io/kubernetes/pkg/client/unversioned/remotecommand" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" ) @@ -60,13 +62,17 @@ func NewCmdPortForward(f *cmdutil.Factory) *cobra.Command { } type portForwarder interface { - ForwardPorts(req *client.Request, config *client.Config, ports []string, stopChan <-chan struct{}) error + ForwardPorts(method string, url *url.URL, config *client.Config, ports []string, stopChan <-chan struct{}) error } type defaultPortForwarder struct{} -func (*defaultPortForwarder) ForwardPorts(req *client.Request, config *client.Config, ports []string, stopChan <-chan struct{}) error { - fw, err := portforward.New(req, config, ports, stopChan) +func (*defaultPortForwarder) ForwardPorts(method string, url *url.URL, config *client.Config, ports []string, stopChan <-chan struct{}) error { + dialer, err := remotecommand.NewExecutor(config, method, url) + if err != nil { + return err + } + fw, err := portforward.New(dialer, ports, stopChan) if err != nil { return err } @@ -130,5 +136,5 @@ func RunPortForward(f *cmdutil.Factory, cmd *cobra.Command, args []string, fw po Name(pod.Name). SubResource("portforward") - return fw.ForwardPorts(req, config, args, stopCh) + return fw.ForwardPorts("POST", req.URL(), config, args, stopCh) } diff --git a/pkg/kubectl/cmd/portforward_test.go b/pkg/kubectl/cmd/portforward_test.go index 231f123f37e..7092eb7aac8 100644 --- a/pkg/kubectl/cmd/portforward_test.go +++ b/pkg/kubectl/cmd/portforward_test.go @@ -19,6 +19,7 @@ package cmd import ( "fmt" "net/http" + "net/url" "testing" "github.com/spf13/cobra" @@ -30,12 +31,14 @@ import ( ) type fakePortForwarder struct { - req *client.Request - pfErr error + method string + url *url.URL + pfErr error } -func (f *fakePortForwarder) ForwardPorts(req *client.Request, config *client.Config, ports []string, stopChan <-chan struct{}) error { - f.req = req +func (f *fakePortForwarder) ForwardPorts(method string, url *url.URL, config *client.Config, ports []string, stopChan <-chan struct{}) error { + f.method = method + f.url = url return f.pfErr } @@ -92,12 +95,20 @@ func TestPortForward(t *testing.T) { if test.pfErr && err != ff.pfErr { t.Errorf("%s: Unexpected exec error: %v", test.name, err) } - if !test.pfErr && ff.req.URL().Path != test.pfPath { - t.Errorf("%s: Did not get expected path for portforward request", test.name) - } if !test.pfErr && err != nil { t.Errorf("%s: Unexpected error: %v", test.name, err) } + if test.pfErr { + continue + } + + if ff.url.Path != test.pfPath { + t.Errorf("%s: Did not get expected path for portforward request", test.name) + } + if ff.method != "POST" { + t.Errorf("%s: Did not get method for attach request: %s", test.name, ff.method) + } + } } @@ -154,7 +165,7 @@ func TestPortForwardWithPFlag(t *testing.T) { if test.pfErr && err != ff.pfErr { t.Errorf("%s: Unexpected exec error: %v", test.name, err) } - if !test.pfErr && ff.req.URL().Path != test.pfPath { + if !test.pfErr && ff.url.Path != test.pfPath { t.Errorf("%s: Did not get expected path for portforward request", test.name) } if !test.pfErr && err != nil { diff --git a/pkg/util/httpstream/httpstream.go b/pkg/util/httpstream/httpstream.go index b61af6224d5..fff6840f279 100644 --- a/pkg/util/httpstream/httpstream.go +++ b/pkg/util/httpstream/httpstream.go @@ -37,6 +37,11 @@ type NewStreamHandler func(Stream) error // performs no other logic. func NoOpNewStreamHandler(stream Stream) error { return nil } +// Dialer knows how to open a streaming connection to a server. +type Dialer interface { + Dial() (Connection, error) +} + // UpgradeRoundTripper is a type of http.RoundTripper that is able to upgrade // HTTP requests to support multiplexed bidirectional streams. After RoundTrip() // is invoked, if the upgrade is successful, clients may retrieve the upgraded