mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-01 07:47:56 +00:00
Merge pull request #14618 from smarterclayton/refactor_exec
Refactor exec to make attach useful without a client.Config
This commit is contained in:
commit
149ca1ec49
@ -29,33 +29,19 @@ import (
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/httpstream"
|
||||
"k8s.io/kubernetes/pkg/util/httpstream/spdy"
|
||||
)
|
||||
|
||||
type upgrader interface {
|
||||
upgrade(*client.Request, *client.Config) (httpstream.Connection, error)
|
||||
}
|
||||
|
||||
type defaultUpgrader struct{}
|
||||
|
||||
func (u *defaultUpgrader) upgrade(req *client.Request, config *client.Config) (httpstream.Connection, error) {
|
||||
return req.Upgrade(config, spdy.NewRoundTripper)
|
||||
}
|
||||
|
||||
// PortForwarder knows how to listen for local connections and forward them to
|
||||
// a remote pod via an upgraded HTTP request.
|
||||
type PortForwarder struct {
|
||||
req *client.Request
|
||||
config *client.Config
|
||||
ports []ForwardedPort
|
||||
stopChan <-chan struct{}
|
||||
|
||||
dialer httpstream.Dialer
|
||||
streamConn httpstream.Connection
|
||||
listeners []io.Closer
|
||||
upgrader upgrader
|
||||
Ready chan struct{}
|
||||
requestIDLock sync.Mutex
|
||||
requestID int
|
||||
@ -120,7 +106,7 @@ func parsePorts(ports []string) ([]ForwardedPort, error) {
|
||||
}
|
||||
|
||||
// New creates a new PortForwarder.
|
||||
func New(req *client.Request, config *client.Config, ports []string, stopChan <-chan struct{}) (*PortForwarder, error) {
|
||||
func New(dialer httpstream.Dialer, ports []string, stopChan <-chan struct{}) (*PortForwarder, error) {
|
||||
if len(ports) == 0 {
|
||||
return nil, errors.New("You must specify at least 1 port")
|
||||
}
|
||||
@ -128,10 +114,8 @@ func New(req *client.Request, config *client.Config, ports []string, stopChan <-
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &PortForwarder{
|
||||
req: req,
|
||||
config: config,
|
||||
dialer: dialer,
|
||||
ports: parsedPorts,
|
||||
stopChan: stopChan,
|
||||
Ready: make(chan struct{}),
|
||||
@ -143,11 +127,8 @@ func New(req *client.Request, config *client.Config, ports []string, stopChan <-
|
||||
func (pf *PortForwarder) ForwardPorts() error {
|
||||
defer pf.Close()
|
||||
|
||||
if pf.upgrader == nil {
|
||||
pf.upgrader = &defaultUpgrader{}
|
||||
}
|
||||
var err error
|
||||
pf.streamConn, err = pf.upgrader.upgrade(pf.req, pf.config)
|
||||
pf.streamConn, err = pf.dialer.Dial()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error upgrading connection: %s", err)
|
||||
}
|
||||
|
@ -31,10 +31,23 @@ import (
|
||||
"time"
|
||||
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/client/unversioned/remotecommand"
|
||||
"k8s.io/kubernetes/pkg/kubelet"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/httpstream"
|
||||
)
|
||||
|
||||
type fakeDialer struct {
|
||||
dialed bool
|
||||
conn httpstream.Connection
|
||||
err error
|
||||
}
|
||||
|
||||
func (d *fakeDialer) Dial() (httpstream.Connection, error) {
|
||||
d.dialed = true
|
||||
return d.conn, d.err
|
||||
}
|
||||
|
||||
func TestParsePortsAndNew(t *testing.T) {
|
||||
tests := []struct {
|
||||
input []string
|
||||
@ -71,10 +84,9 @@ func TestParsePortsAndNew(t *testing.T) {
|
||||
t.Fatalf("%d: parsePorts: error expected=%t, got %t: %s", i, e, a, err)
|
||||
}
|
||||
|
||||
expectedRequest := &client.Request{}
|
||||
expectedConfig := &client.Config{}
|
||||
dialer := &fakeDialer{}
|
||||
expectedStopChan := make(chan struct{})
|
||||
pf, err := New(expectedRequest, expectedConfig, test.input, expectedStopChan)
|
||||
pf, err := New(dialer, test.input, expectedStopChan)
|
||||
haveError = err != nil
|
||||
if e, a := test.expectNewError, haveError; e != a {
|
||||
t.Fatalf("%d: New: error expected=%t, got %t: %s", i, e, a, err)
|
||||
@ -93,11 +105,8 @@ func TestParsePortsAndNew(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
if e, a := expectedRequest, pf.req; e != a {
|
||||
t.Fatalf("%d: req: expected %#v, got %#v", i, e, a)
|
||||
}
|
||||
if e, a := expectedConfig, pf.config; e != a {
|
||||
t.Fatalf("%d: config: expected %#v, got %#v", i, e, a)
|
||||
if dialer.dialed {
|
||||
t.Fatalf("%d: expected not dialed", i)
|
||||
}
|
||||
if e, a := test.expected, pf.ports; !reflect.DeepEqual(e, a) {
|
||||
t.Fatalf("%d: ports: expected %#v, got %#v", i, e, a)
|
||||
@ -293,17 +302,16 @@ func TestForwardPorts(t *testing.T) {
|
||||
|
||||
for testName, test := range tests {
|
||||
server := httptest.NewServer(fakePortForwardServer(t, testName, test.serverSends, test.clientSends))
|
||||
url, _ := url.ParseRequestURI(server.URL)
|
||||
c := client.NewRESTClient(url, "x", nil, -1, -1)
|
||||
req := c.Post().Resource("testing")
|
||||
|
||||
conf := &client.Config{
|
||||
Host: server.URL,
|
||||
url, _ := url.Parse(server.URL)
|
||||
exec, err := remotecommand.NewExecutor(&client.Config{}, "POST", url)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
stopChan := make(chan struct{}, 1)
|
||||
|
||||
pf, err := New(req, conf, test.ports, stopChan)
|
||||
pf, err := New(exec, test.ports, stopChan)
|
||||
if err != nil {
|
||||
t.Fatalf("%s: unexpected error calling New: %v", testName, err)
|
||||
}
|
||||
@ -363,18 +371,17 @@ func TestForwardPorts(t *testing.T) {
|
||||
func TestForwardPortsReturnsErrorWhenAllBindsFailed(t *testing.T) {
|
||||
server := httptest.NewServer(fakePortForwardServer(t, "allBindsFailed", nil, nil))
|
||||
defer server.Close()
|
||||
url, _ := url.ParseRequestURI(server.URL)
|
||||
c := client.NewRESTClient(url, "x", nil, -1, -1)
|
||||
req := c.Post().Resource("testing")
|
||||
|
||||
conf := &client.Config{
|
||||
Host: server.URL,
|
||||
url, _ := url.Parse(server.URL)
|
||||
exec, err := remotecommand.NewExecutor(&client.Config{}, "POST", url)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
stopChan1 := make(chan struct{}, 1)
|
||||
defer close(stopChan1)
|
||||
|
||||
pf1, err := New(req, conf, []string{"5555"}, stopChan1)
|
||||
pf1, err := New(exec, []string{"5555"}, stopChan1)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating pf1: %v", err)
|
||||
}
|
||||
@ -382,7 +389,7 @@ func TestForwardPortsReturnsErrorWhenAllBindsFailed(t *testing.T) {
|
||||
<-pf1.Ready
|
||||
|
||||
stopChan2 := make(chan struct{}, 1)
|
||||
pf2, err := New(&client.Request{}, &client.Config{}, []string{"5555"}, stopChan2)
|
||||
pf2, err := New(exec, []string{"5555"}, stopChan2)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating pf2: %v", err)
|
||||
}
|
||||
|
@ -21,141 +21,127 @@ import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sync"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/conversion/queryparams"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/httpstream"
|
||||
"k8s.io/kubernetes/pkg/util/httpstream/spdy"
|
||||
)
|
||||
|
||||
type upgrader interface {
|
||||
upgrade(*client.Request, *client.Config) (httpstream.Connection, error)
|
||||
// Executor is an interface for transporting shell-style streams.
|
||||
type Executor interface {
|
||||
// Stream initiates the transport of the standard shell streams. It will transport any
|
||||
// non-nil stream to a remote system, and return an error if a problem occurs. If tty
|
||||
// is set, the stderr stream is not used (raw TTY manages stdout and stderr over the
|
||||
// stdout stream).
|
||||
Stream(stdin io.Reader, stdout, stderr io.Writer, tty bool) error
|
||||
}
|
||||
|
||||
type defaultUpgrader struct{}
|
||||
|
||||
func (u *defaultUpgrader) upgrade(req *client.Request, config *client.Config) (httpstream.Connection, error) {
|
||||
return req.Upgrade(config, spdy.NewRoundTripper)
|
||||
// StreamExecutor supports the ability to dial an httpstream connection and the ability to
|
||||
// run a command line stream protocol over that dialer.
|
||||
type StreamExecutor interface {
|
||||
Executor
|
||||
httpstream.Dialer
|
||||
}
|
||||
|
||||
type Streamer struct {
|
||||
req *client.Request
|
||||
config *client.Config
|
||||
stdin io.Reader
|
||||
stdout io.Writer
|
||||
stderr io.Writer
|
||||
tty bool
|
||||
// streamExecutor handles transporting standard shell streams over an httpstream connection.
|
||||
type streamExecutor struct {
|
||||
upgrader httpstream.UpgradeRoundTripper
|
||||
transport http.RoundTripper
|
||||
|
||||
upgrader upgrader
|
||||
method string
|
||||
url *url.URL
|
||||
}
|
||||
|
||||
// Executor executes a command on a pod container
|
||||
type Executor struct {
|
||||
Streamer
|
||||
command []string
|
||||
}
|
||||
|
||||
// New creates a new RemoteCommandExecutor
|
||||
func New(req *client.Request, config *client.Config, command []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) *Executor {
|
||||
return &Executor{
|
||||
command: command,
|
||||
Streamer: Streamer{
|
||||
req: req,
|
||||
config: config,
|
||||
stdin: stdin,
|
||||
stdout: stdout,
|
||||
stderr: stderr,
|
||||
tty: tty,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
type Attach struct {
|
||||
Streamer
|
||||
}
|
||||
|
||||
// NewAttach creates a new RemoteAttach
|
||||
func NewAttach(req *client.Request, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) *Attach {
|
||||
return &Attach{
|
||||
Streamer: Streamer{
|
||||
req: req,
|
||||
config: config,
|
||||
stdin: stdin,
|
||||
stdout: stdout,
|
||||
stderr: stderr,
|
||||
tty: tty,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Execute sends a remote command execution request, upgrading the
|
||||
// connection and creating streams to represent stdin/stdout/stderr. Data is
|
||||
// copied between these streams and the supplied stdin/stdout/stderr parameters.
|
||||
func (e *Attach) Execute() error {
|
||||
opts := api.PodAttachOptions{
|
||||
Stdin: (e.stdin != nil),
|
||||
Stdout: (e.stdout != nil),
|
||||
Stderr: (!e.tty && e.stderr != nil),
|
||||
TTY: e.tty,
|
||||
}
|
||||
|
||||
if err := e.setupRequestParameters(&opts); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return e.doStream()
|
||||
}
|
||||
|
||||
// Execute sends a remote command execution request, upgrading the
|
||||
// connection and creating streams to represent stdin/stdout/stderr. Data is
|
||||
// copied between these streams and the supplied stdin/stdout/stderr parameters.
|
||||
func (e *Executor) Execute() error {
|
||||
opts := api.PodExecOptions{
|
||||
Stdin: (e.stdin != nil),
|
||||
Stdout: (e.stdout != nil),
|
||||
Stderr: (!e.tty && e.stderr != nil),
|
||||
TTY: e.tty,
|
||||
Command: e.command,
|
||||
}
|
||||
|
||||
if err := e.setupRequestParameters(&opts); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return e.doStream()
|
||||
}
|
||||
|
||||
func (e *Streamer) setupRequestParameters(obj runtime.Object) error {
|
||||
versioned, err := api.Scheme.ConvertToVersion(obj, e.config.Version)
|
||||
// NewExecutor connects to the provided server and upgrades the connection to
|
||||
// multiplexed bidirectional streams. The current implementation uses SPDY,
|
||||
// but this could be replaced with HTTP/2 once it's available, or something else.
|
||||
// TODO: the common code between this and portforward could be abstracted.
|
||||
func NewExecutor(config *client.Config, method string, url *url.URL) (StreamExecutor, error) {
|
||||
tlsConfig, err := client.TLSConfigFor(config)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
params, err := queryparams.Convert(versioned)
|
||||
|
||||
upgradeRoundTripper := spdy.NewRoundTripper(tlsConfig)
|
||||
wrapper, err := client.HTTPWrappersForConfig(config, upgradeRoundTripper)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
for k, v := range params {
|
||||
for _, vv := range v {
|
||||
e.req.Param(k, vv)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
return &streamExecutor{
|
||||
upgrader: upgradeRoundTripper,
|
||||
transport: wrapper,
|
||||
method: method,
|
||||
url: url,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (e *Streamer) doStream() error {
|
||||
if e.upgrader == nil {
|
||||
e.upgrader = &defaultUpgrader{}
|
||||
// NewStreamExecutor upgrades the request so that it supports multiplexed bidirectional
|
||||
// streams. This method takes a stream upgrader and an optional function that is invoked
|
||||
// to wrap the round tripper. This method may be used by clients that are lower level than
|
||||
// Kubernetes clients or need to provide their own upgrade round tripper.
|
||||
func NewStreamExecutor(upgrader httpstream.UpgradeRoundTripper, fn func(http.RoundTripper) http.RoundTripper, method string, url *url.URL) (StreamExecutor, error) {
|
||||
var rt http.RoundTripper = upgrader
|
||||
if fn != nil {
|
||||
rt = fn(rt)
|
||||
}
|
||||
conn, err := e.upgrader.upgrade(e.req, e.config)
|
||||
return &streamExecutor{
|
||||
upgrader: upgrader,
|
||||
transport: rt,
|
||||
method: method,
|
||||
url: url,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Dial opens a connection to a remote server and attempts to negotiate a SPDY connection.
|
||||
func (e *streamExecutor) Dial() (httpstream.Connection, error) {
|
||||
client := &http.Client{Transport: e.transport}
|
||||
|
||||
req, err := http.NewRequest(e.method, e.url.String(), nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating request: %s", err)
|
||||
}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error sending request: %s", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// TODO: handle protocol selection in the future
|
||||
return e.upgrader.NewConnection(resp)
|
||||
}
|
||||
|
||||
// Stream opens a protocol streamer to the server and streams until a client closes
|
||||
// the connection or the server disconnects.
|
||||
func (e *streamExecutor) Stream(stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
|
||||
conn, err := e.Dial()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer conn.Close()
|
||||
// TODO: negotiate protocols
|
||||
streamer := &streamProtocol{
|
||||
stdin: stdin,
|
||||
stdout: stdout,
|
||||
stderr: stderr,
|
||||
tty: tty,
|
||||
}
|
||||
return streamer.stream(conn)
|
||||
}
|
||||
|
||||
type streamProtocol struct {
|
||||
stdin io.Reader
|
||||
stdout io.Writer
|
||||
stderr io.Writer
|
||||
tty bool
|
||||
}
|
||||
|
||||
func (e *streamProtocol) stream(conn httpstream.Connection) error {
|
||||
headers := http.Header{}
|
||||
|
||||
// set up error stream
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
@ -183,12 +184,17 @@ func TestRequestExecuteRemoteCommand(t *testing.T) {
|
||||
url, _ := url.ParseRequestURI(server.URL)
|
||||
c := client.NewRESTClient(url, "x", nil, -1, -1)
|
||||
req := c.Post().Resource("testing")
|
||||
|
||||
req.Param("command", "ls")
|
||||
req.Param("command", "/")
|
||||
conf := &client.Config{
|
||||
Host: server.URL,
|
||||
}
|
||||
e := New(req, conf, []string{"ls", "/"}, strings.NewReader(strings.Repeat(testCase.Stdin, testCase.MessageCount)), localOut, localErr, testCase.Tty)
|
||||
err := e.Execute()
|
||||
e, err := NewExecutor(conf, "POST", req.URL())
|
||||
if err != nil {
|
||||
t.Errorf("%d: unexpected error: %v", i, err)
|
||||
continue
|
||||
}
|
||||
err = e.Stream(strings.NewReader(strings.Repeat(testCase.Stdin, testCase.MessageCount)), localOut, localErr, testCase.Tty)
|
||||
hasErr := err != nil
|
||||
|
||||
if len(testCase.Error) > 0 {
|
||||
@ -263,8 +269,12 @@ func TestRequestAttachRemoteCommand(t *testing.T) {
|
||||
conf := &client.Config{
|
||||
Host: server.URL,
|
||||
}
|
||||
e := NewAttach(req, conf, strings.NewReader(testCase.Stdin), localOut, localErr, testCase.Tty)
|
||||
err := e.Execute()
|
||||
e, err := NewExecutor(conf, "POST", req.URL())
|
||||
if err != nil {
|
||||
t.Errorf("%d: unexpected error: %v", i, err)
|
||||
continue
|
||||
}
|
||||
err = e.Stream(strings.NewReader(testCase.Stdin), localOut, localErr, testCase.Tty)
|
||||
hasErr := err != nil
|
||||
|
||||
if len(testCase.Error) > 0 {
|
||||
@ -301,3 +311,66 @@ func TestRequestAttachRemoteCommand(t *testing.T) {
|
||||
server.Close()
|
||||
}
|
||||
}
|
||||
|
||||
type fakeUpgrader struct {
|
||||
req *http.Request
|
||||
resp *http.Response
|
||||
conn httpstream.Connection
|
||||
err, connErr error
|
||||
checkResponse bool
|
||||
|
||||
t *testing.T
|
||||
}
|
||||
|
||||
func (u *fakeUpgrader) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
u.req = req
|
||||
return u.resp, u.err
|
||||
}
|
||||
|
||||
func (u *fakeUpgrader) NewConnection(resp *http.Response) (httpstream.Connection, error) {
|
||||
if u.checkResponse && u.resp != resp {
|
||||
u.t.Errorf("response objects passed did not match: %#v", resp)
|
||||
}
|
||||
return u.conn, u.connErr
|
||||
}
|
||||
|
||||
type fakeConnection struct {
|
||||
httpstream.Connection
|
||||
}
|
||||
|
||||
// Dial is the common functionality between any stream based upgrader, regardless of protocol.
|
||||
// This method ensures that someone can use a generic stream executor without being dependent
|
||||
// on the core Kube client config behavior.
|
||||
func TestDial(t *testing.T) {
|
||||
upgrader := &fakeUpgrader{
|
||||
t: t,
|
||||
checkResponse: true,
|
||||
conn: &fakeConnection{},
|
||||
resp: &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
Body: ioutil.NopCloser(&bytes.Buffer{}),
|
||||
},
|
||||
}
|
||||
var called bool
|
||||
testFn := func(rt http.RoundTripper) http.RoundTripper {
|
||||
if rt != upgrader {
|
||||
t.Fatalf("unexpected round tripper: %#v", rt)
|
||||
}
|
||||
called = true
|
||||
return rt
|
||||
}
|
||||
exec, err := NewStreamExecutor(upgrader, testFn, "POST", &url.URL{Host: "something.com", Scheme: "https"})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
conn, err := exec.Dial()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if conn != upgrader.conn {
|
||||
t.Errorf("unexpected connection: %#v", conn)
|
||||
}
|
||||
if !called {
|
||||
t.Errorf("wrapper not called")
|
||||
}
|
||||
}
|
||||
|
@ -18,7 +18,6 @@ package unversioned
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
@ -35,11 +34,11 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/client/metrics"
|
||||
"k8s.io/kubernetes/pkg/conversion/queryparams"
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/httpstream"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
watchjson "k8s.io/kubernetes/pkg/watch/json"
|
||||
@ -406,6 +405,31 @@ func (r *Request) Param(paramName, s string) *Request {
|
||||
return r.setParam(paramName, s)
|
||||
}
|
||||
|
||||
// VersionedParams will take the provided object, serialize it to a map[string][]string using the
|
||||
// implicit RESTClient API version and the provided object convertor, and then add those as parameters
|
||||
// to the request. Use this to provide versioned query parameters from client libraries.
|
||||
func (r *Request) VersionedParams(obj runtime.Object, convertor runtime.ObjectConvertor) *Request {
|
||||
if r.err != nil {
|
||||
return r
|
||||
}
|
||||
versioned, err := convertor.ConvertToVersion(obj, r.apiVersion)
|
||||
if err != nil {
|
||||
r.err = err
|
||||
return r
|
||||
}
|
||||
params, err := queryparams.Convert(versioned)
|
||||
if err != nil {
|
||||
r.err = err
|
||||
return r
|
||||
}
|
||||
for k, v := range params {
|
||||
for _, vv := range v {
|
||||
r.setParam(k, vv)
|
||||
}
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Request) setParam(paramName, value string) *Request {
|
||||
if specialParams.Has(paramName) {
|
||||
r.err = fmt.Errorf("must set %v through the corresponding function, not directly.", paramName)
|
||||
@ -613,41 +637,6 @@ func (r *Request) Stream() (io.ReadCloser, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// Upgrade upgrades the request so that it supports multiplexed bidirectional
|
||||
// streams. The current implementation uses SPDY, but this could be replaced
|
||||
// with HTTP/2 once it's available, or something else.
|
||||
func (r *Request) Upgrade(config *Config, newRoundTripperFunc func(*tls.Config) httpstream.UpgradeRoundTripper) (httpstream.Connection, error) {
|
||||
if r.err != nil {
|
||||
return nil, r.err
|
||||
}
|
||||
|
||||
tlsConfig, err := TLSConfigFor(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
upgradeRoundTripper := newRoundTripperFunc(tlsConfig)
|
||||
wrapper, err := HTTPWrappersForConfig(config, upgradeRoundTripper)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
r.client = &http.Client{Transport: wrapper}
|
||||
|
||||
req, err := http.NewRequest(r.verb, r.URL().String(), nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Error creating request: %s", err)
|
||||
}
|
||||
|
||||
resp, err := r.client.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Error sending request: %s", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
return upgradeRoundTripper.NewConnection(resp)
|
||||
}
|
||||
|
||||
// request connects to the server and invokes the provided function when a server response is
|
||||
// received. It handles retry behavior and up front validation of requests. It wil invoke
|
||||
// fn at most once. It will return an error if a problem occurred prior to connecting to the
|
||||
|
@ -18,7 +18,6 @@ package unversioned
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"io"
|
||||
@ -158,6 +157,22 @@ func TestRequestParam(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequestVersionedParams(t *testing.T) {
|
||||
r := (&Request{}).Param("foo", "a")
|
||||
if !api.Semantic.DeepDerivative(r.params, url.Values{"foo": []string{"a"}}) {
|
||||
t.Errorf("should have set a param: %#v", r)
|
||||
}
|
||||
r.VersionedParams(&api.PodLogOptions{Follow: true, Container: "bar"}, api.Scheme)
|
||||
|
||||
if !api.Semantic.DeepDerivative(r.params, url.Values{
|
||||
"foo": []string{"a"},
|
||||
"container": []string{"bar"},
|
||||
"follow": []string{"1"},
|
||||
}) {
|
||||
t.Errorf("should have set a param: %#v", r)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequestURI(t *testing.T) {
|
||||
r := (&Request{}).Param("foo", "a")
|
||||
r.Prefix("other")
|
||||
@ -595,88 +610,6 @@ func (f *fakeUpgradeRoundTripper) NewConnection(resp *http.Response) (httpstream
|
||||
return f.conn, nil
|
||||
}
|
||||
|
||||
func TestRequestUpgrade(t *testing.T) {
|
||||
uri, _ := url.Parse("http://localhost/")
|
||||
testCases := []struct {
|
||||
Request *Request
|
||||
Config *Config
|
||||
RoundTripper *fakeUpgradeRoundTripper
|
||||
Err bool
|
||||
AuthBasicHeader bool
|
||||
AuthBearerHeader bool
|
||||
}{
|
||||
{
|
||||
Request: &Request{err: errors.New("bail")},
|
||||
Err: true,
|
||||
},
|
||||
{
|
||||
Request: &Request{},
|
||||
Config: &Config{
|
||||
TLSClientConfig: TLSClientConfig{
|
||||
CAFile: "foo",
|
||||
},
|
||||
Insecure: true,
|
||||
},
|
||||
Err: true,
|
||||
},
|
||||
{
|
||||
Request: &Request{},
|
||||
Config: &Config{
|
||||
Username: "u",
|
||||
Password: "p",
|
||||
BearerToken: "b",
|
||||
},
|
||||
Err: true,
|
||||
},
|
||||
{
|
||||
Request: NewRequest(nil, "", uri, testapi.Default.Version(), testapi.Default.Codec()),
|
||||
Config: &Config{
|
||||
Username: "u",
|
||||
Password: "p",
|
||||
},
|
||||
AuthBasicHeader: true,
|
||||
Err: false,
|
||||
},
|
||||
{
|
||||
Request: NewRequest(nil, "", uri, testapi.Default.Version(), testapi.Default.Codec()),
|
||||
Config: &Config{
|
||||
BearerToken: "b",
|
||||
},
|
||||
AuthBearerHeader: true,
|
||||
Err: false,
|
||||
},
|
||||
}
|
||||
for i, testCase := range testCases {
|
||||
r := testCase.Request
|
||||
rt := &fakeUpgradeRoundTripper{}
|
||||
expectedConn := &fakeUpgradeConnection{}
|
||||
conn, err := r.Upgrade(testCase.Config, func(config *tls.Config) httpstream.UpgradeRoundTripper {
|
||||
rt.conn = expectedConn
|
||||
return rt
|
||||
})
|
||||
_ = conn
|
||||
hasErr := err != nil
|
||||
if hasErr != testCase.Err {
|
||||
t.Errorf("%d: expected %t, got %t: %v", i, testCase.Err, hasErr, r.err)
|
||||
}
|
||||
if testCase.Err {
|
||||
continue
|
||||
}
|
||||
|
||||
if testCase.AuthBasicHeader && !strings.Contains(rt.req.Header.Get("Authorization"), "Basic") {
|
||||
t.Errorf("%d: expected basic auth header, got: %s", i, rt.req.Header.Get("Authorization"))
|
||||
}
|
||||
|
||||
if testCase.AuthBearerHeader && !strings.Contains(rt.req.Header.Get("Authorization"), "Bearer") {
|
||||
t.Errorf("%d: expected bearer auth header, got: %s", i, rt.req.Header.Get("Authorization"))
|
||||
}
|
||||
|
||||
if e, a := expectedConn, conn; e != a {
|
||||
t.Errorf("%d: conn: expected %#v, got %#v", i, e, a)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequestDo(t *testing.T) {
|
||||
testCases := []struct {
|
||||
Request *Request
|
||||
|
@ -19,6 +19,7 @@ package cmd
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
@ -72,15 +73,18 @@ func NewCmdAttach(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer)
|
||||
|
||||
// RemoteAttach defines the interface accepted by the Attach command - provided for test stubbing
|
||||
type RemoteAttach interface {
|
||||
Attach(req *client.Request, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error
|
||||
Attach(method string, url *url.URL, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error
|
||||
}
|
||||
|
||||
// DefaultRemoteAttach is the standard implementation of attaching
|
||||
type DefaultRemoteAttach struct{}
|
||||
|
||||
func (*DefaultRemoteAttach) Attach(req *client.Request, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
|
||||
attach := remotecommand.NewAttach(req, config, stdin, stdout, stderr, tty)
|
||||
return attach.Execute()
|
||||
func (*DefaultRemoteAttach) Attach(method string, url *url.URL, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
|
||||
exec, err := remotecommand.NewExecutor(config, method, url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return exec.Stream(stdin, stdout, stderr, tty)
|
||||
}
|
||||
|
||||
// AttachOptions declare the arguments accepted by the Exec command
|
||||
@ -201,7 +205,7 @@ func (p *AttachOptions) Run() error {
|
||||
SubResource("attach").
|
||||
Param("container", p.GetContainerName(pod))
|
||||
|
||||
return p.Attach.Attach(req, p.Config, stdin, p.Out, p.Err, tty)
|
||||
return p.Attach.Attach("POST", req.URL(), p.Config, stdin, p.Out, p.Err, tty)
|
||||
}
|
||||
|
||||
// GetContainerName returns the name of the container to attach to, with a fallback.
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"testing"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
@ -32,12 +33,14 @@ import (
|
||||
)
|
||||
|
||||
type fakeRemoteAttach struct {
|
||||
req *client.Request
|
||||
method string
|
||||
url *url.URL
|
||||
attachErr error
|
||||
}
|
||||
|
||||
func (f *fakeRemoteAttach) Attach(req *client.Request, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
|
||||
f.req = req
|
||||
func (f *fakeRemoteAttach) Attach(method string, url *url.URL, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
|
||||
f.method = method
|
||||
f.url = url
|
||||
return f.attachErr
|
||||
}
|
||||
|
||||
@ -173,10 +176,16 @@ func TestAttach(t *testing.T) {
|
||||
t.Errorf("%s: Unexpected error: %v", test.name, err)
|
||||
continue
|
||||
}
|
||||
if !test.attachErr && ex.req.URL().Path != test.attachPath {
|
||||
if test.attachErr {
|
||||
continue
|
||||
}
|
||||
if ex.url.Path != test.attachPath {
|
||||
t.Errorf("%s: Did not get expected path for exec request", test.name)
|
||||
continue
|
||||
}
|
||||
if ex.method != "POST" {
|
||||
t.Errorf("%s: Did not get method for attach request: %s", test.name, ex.method)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -19,6 +19,7 @@ package cmd
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
@ -73,15 +74,18 @@ func NewCmdExec(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer) *
|
||||
|
||||
// RemoteExecutor defines the interface accepted by the Exec command - provided for test stubbing
|
||||
type RemoteExecutor interface {
|
||||
Execute(req *client.Request, config *client.Config, command []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error
|
||||
Execute(method string, url *url.URL, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error
|
||||
}
|
||||
|
||||
// DefaultRemoteExecutor is the standard implementation of remote command execution
|
||||
type DefaultRemoteExecutor struct{}
|
||||
|
||||
func (*DefaultRemoteExecutor) Execute(req *client.Request, config *client.Config, command []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
|
||||
executor := remotecommand.New(req, config, command, stdin, stdout, stderr, tty)
|
||||
return executor.Execute()
|
||||
func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
|
||||
exec, err := remotecommand.NewExecutor(config, method, url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return exec.Stream(stdin, stdout, stderr, tty)
|
||||
}
|
||||
|
||||
// ExecOptions declare the arguments accepted by the Exec command
|
||||
@ -220,6 +224,14 @@ func (p *ExecOptions) Run() error {
|
||||
Namespace(pod.Namespace).
|
||||
SubResource("exec").
|
||||
Param("container", containerName)
|
||||
req.VersionedParams(&api.PodExecOptions{
|
||||
Container: containerName,
|
||||
Command: p.Command,
|
||||
Stdin: stdin != nil,
|
||||
Stdout: p.Out != nil,
|
||||
Stderr: p.Err != nil,
|
||||
TTY: tty,
|
||||
}, api.Scheme)
|
||||
|
||||
return p.Executor.Execute(req, p.Config, p.Command, stdin, p.Out, p.Err, tty)
|
||||
return p.Executor.Execute("POST", req.URL(), p.Config, stdin, p.Out, p.Err, tty)
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
@ -33,12 +34,14 @@ import (
|
||||
)
|
||||
|
||||
type fakeRemoteExecutor struct {
|
||||
req *client.Request
|
||||
method string
|
||||
url *url.URL
|
||||
execErr error
|
||||
}
|
||||
|
||||
func (f *fakeRemoteExecutor) Execute(req *client.Request, config *client.Config, command []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
|
||||
f.req = req
|
||||
func (f *fakeRemoteExecutor) Execute(method string, url *url.URL, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
|
||||
f.method = method
|
||||
f.url = url
|
||||
return f.execErr
|
||||
}
|
||||
|
||||
@ -198,10 +201,16 @@ func TestExec(t *testing.T) {
|
||||
t.Errorf("%s: Unexpected error: %v", test.name, err)
|
||||
continue
|
||||
}
|
||||
if !test.execErr && ex.req.URL().Path != test.execPath {
|
||||
if test.execErr {
|
||||
continue
|
||||
}
|
||||
if ex.url.Path != test.execPath {
|
||||
t.Errorf("%s: Did not get expected path for exec request", test.name)
|
||||
continue
|
||||
}
|
||||
if ex.method != "POST" {
|
||||
t.Errorf("%s: Did not get method for exec request: %s", test.name, ex.method)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
"os"
|
||||
"os/signal"
|
||||
|
||||
@ -25,6 +26,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/client/unversioned/portforward"
|
||||
"k8s.io/kubernetes/pkg/client/unversioned/remotecommand"
|
||||
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
||||
)
|
||||
|
||||
@ -60,13 +62,17 @@ func NewCmdPortForward(f *cmdutil.Factory) *cobra.Command {
|
||||
}
|
||||
|
||||
type portForwarder interface {
|
||||
ForwardPorts(req *client.Request, config *client.Config, ports []string, stopChan <-chan struct{}) error
|
||||
ForwardPorts(method string, url *url.URL, config *client.Config, ports []string, stopChan <-chan struct{}) error
|
||||
}
|
||||
|
||||
type defaultPortForwarder struct{}
|
||||
|
||||
func (*defaultPortForwarder) ForwardPorts(req *client.Request, config *client.Config, ports []string, stopChan <-chan struct{}) error {
|
||||
fw, err := portforward.New(req, config, ports, stopChan)
|
||||
func (*defaultPortForwarder) ForwardPorts(method string, url *url.URL, config *client.Config, ports []string, stopChan <-chan struct{}) error {
|
||||
dialer, err := remotecommand.NewExecutor(config, method, url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fw, err := portforward.New(dialer, ports, stopChan)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -130,5 +136,5 @@ func RunPortForward(f *cmdutil.Factory, cmd *cobra.Command, args []string, fw po
|
||||
Name(pod.Name).
|
||||
SubResource("portforward")
|
||||
|
||||
return fw.ForwardPorts(req, config, args, stopCh)
|
||||
return fw.ForwardPorts("POST", req.URL(), config, args, stopCh)
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ package cmd
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"testing"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
@ -30,12 +31,14 @@ import (
|
||||
)
|
||||
|
||||
type fakePortForwarder struct {
|
||||
req *client.Request
|
||||
pfErr error
|
||||
method string
|
||||
url *url.URL
|
||||
pfErr error
|
||||
}
|
||||
|
||||
func (f *fakePortForwarder) ForwardPorts(req *client.Request, config *client.Config, ports []string, stopChan <-chan struct{}) error {
|
||||
f.req = req
|
||||
func (f *fakePortForwarder) ForwardPorts(method string, url *url.URL, config *client.Config, ports []string, stopChan <-chan struct{}) error {
|
||||
f.method = method
|
||||
f.url = url
|
||||
return f.pfErr
|
||||
}
|
||||
|
||||
@ -92,12 +95,20 @@ func TestPortForward(t *testing.T) {
|
||||
if test.pfErr && err != ff.pfErr {
|
||||
t.Errorf("%s: Unexpected exec error: %v", test.name, err)
|
||||
}
|
||||
if !test.pfErr && ff.req.URL().Path != test.pfPath {
|
||||
t.Errorf("%s: Did not get expected path for portforward request", test.name)
|
||||
}
|
||||
if !test.pfErr && err != nil {
|
||||
t.Errorf("%s: Unexpected error: %v", test.name, err)
|
||||
}
|
||||
if test.pfErr {
|
||||
continue
|
||||
}
|
||||
|
||||
if ff.url.Path != test.pfPath {
|
||||
t.Errorf("%s: Did not get expected path for portforward request", test.name)
|
||||
}
|
||||
if ff.method != "POST" {
|
||||
t.Errorf("%s: Did not get method for attach request: %s", test.name, ff.method)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -154,7 +165,7 @@ func TestPortForwardWithPFlag(t *testing.T) {
|
||||
if test.pfErr && err != ff.pfErr {
|
||||
t.Errorf("%s: Unexpected exec error: %v", test.name, err)
|
||||
}
|
||||
if !test.pfErr && ff.req.URL().Path != test.pfPath {
|
||||
if !test.pfErr && ff.url.Path != test.pfPath {
|
||||
t.Errorf("%s: Did not get expected path for portforward request", test.name)
|
||||
}
|
||||
if !test.pfErr && err != nil {
|
||||
|
@ -37,6 +37,11 @@ type NewStreamHandler func(Stream) error
|
||||
// performs no other logic.
|
||||
func NoOpNewStreamHandler(stream Stream) error { return nil }
|
||||
|
||||
// Dialer knows how to open a streaming connection to a server.
|
||||
type Dialer interface {
|
||||
Dial() (Connection, error)
|
||||
}
|
||||
|
||||
// UpgradeRoundTripper is a type of http.RoundTripper that is able to upgrade
|
||||
// HTTP requests to support multiplexed bidirectional streams. After RoundTrip()
|
||||
// is invoked, if the upgrade is successful, clients may retrieve the upgraded
|
||||
|
Loading…
Reference in New Issue
Block a user