mirror of
https://github.com/kubernetes/client-go.git
synced 2025-06-28 07:57:20 +00:00
Prepare to introduce websockets for exec and portforward
Refactor the code in remotecommand to better represent the structure of what is common between portforward and exec. Kubernetes-commit: 12c7874c0d88e9099ab2a29915d26751f0d23c2a
This commit is contained in:
parent
4d6d8e1ac1
commit
4e62d7d64d
@ -35,7 +35,6 @@ import (
|
|||||||
// protocols, input/output streams, if the client is requesting a TTY, and a terminal size queue to
|
// protocols, input/output streams, if the client is requesting a TTY, and a terminal size queue to
|
||||||
// support terminal resizing.
|
// support terminal resizing.
|
||||||
type StreamOptions struct {
|
type StreamOptions struct {
|
||||||
SupportedProtocols []string
|
|
||||||
Stdin io.Reader
|
Stdin io.Reader
|
||||||
Stdout io.Writer
|
Stdout io.Writer
|
||||||
Stderr io.Writer
|
Stderr io.Writer
|
||||||
@ -52,93 +51,10 @@ type Executor interface {
|
|||||||
Stream(options StreamOptions) error
|
Stream(options StreamOptions) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// StreamExecutor supports the ability to dial an httpstream connection and the ability to
|
// SPDYUpgrader validates a response from the server after a SPDY upgrade.
|
||||||
// run a command line stream protocol over that dialer.
|
type SPDYUpgrader interface {
|
||||||
type StreamExecutor interface {
|
// NewConnection validates the response and creates a new Connection.
|
||||||
Executor
|
NewConnection(resp *http.Response) (httpstream.Connection, error)
|
||||||
httpstream.Dialer
|
|
||||||
}
|
|
||||||
|
|
||||||
// streamExecutor handles transporting standard shell streams over an httpstream connection.
|
|
||||||
type streamExecutor struct {
|
|
||||||
upgrader httpstream.UpgradeRoundTripper
|
|
||||||
transport http.RoundTripper
|
|
||||||
|
|
||||||
method string
|
|
||||||
url *url.URL
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewExecutor connects to the provided server and upgrades the connection to
|
|
||||||
// multiplexed bidirectional streams. The current implementation uses SPDY,
|
|
||||||
// but this could be replaced with HTTP/2 once it's available, or something else.
|
|
||||||
// TODO: the common code between this and portforward could be abstracted.
|
|
||||||
func NewExecutor(config *restclient.Config, method string, url *url.URL) (StreamExecutor, error) {
|
|
||||||
tlsConfig, err := restclient.TLSConfigFor(config)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
upgradeRoundTripper := spdy.NewRoundTripper(tlsConfig, true)
|
|
||||||
wrapper, err := restclient.HTTPWrappersForConfig(config, upgradeRoundTripper)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &streamExecutor{
|
|
||||||
upgrader: upgradeRoundTripper,
|
|
||||||
transport: wrapper,
|
|
||||||
method: method,
|
|
||||||
url: url,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewStreamExecutor upgrades the request so that it supports multiplexed bidirectional
|
|
||||||
// streams. This method takes a stream upgrader and an optional function that is invoked
|
|
||||||
// to wrap the round tripper. This method may be used by clients that are lower level than
|
|
||||||
// Kubernetes clients or need to provide their own upgrade round tripper.
|
|
||||||
func NewStreamExecutor(upgrader httpstream.UpgradeRoundTripper, fn func(http.RoundTripper) http.RoundTripper, method string, url *url.URL) (StreamExecutor, error) {
|
|
||||||
rt := http.RoundTripper(upgrader)
|
|
||||||
if fn != nil {
|
|
||||||
rt = fn(rt)
|
|
||||||
}
|
|
||||||
return &streamExecutor{
|
|
||||||
upgrader: upgrader,
|
|
||||||
transport: rt,
|
|
||||||
method: method,
|
|
||||||
url: url,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Dial opens a connection to a remote server and attempts to negotiate a SPDY
|
|
||||||
// connection. Upon success, it returns the connection and the protocol
|
|
||||||
// selected by the server.
|
|
||||||
func (e *streamExecutor) Dial(protocols ...string) (httpstream.Connection, string, error) {
|
|
||||||
rt := transport.DebugWrappers(e.transport)
|
|
||||||
|
|
||||||
// TODO the client probably shouldn't be created here, as it doesn't allow
|
|
||||||
// flexibility to allow callers to configure it.
|
|
||||||
client := &http.Client{Transport: rt}
|
|
||||||
|
|
||||||
req, err := http.NewRequest(e.method, e.url.String(), nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, "", fmt.Errorf("error creating request: %v", err)
|
|
||||||
}
|
|
||||||
for i := range protocols {
|
|
||||||
req.Header.Add(httpstream.HeaderProtocolVersion, protocols[i])
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := client.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return nil, "", fmt.Errorf("error sending request: %v", err)
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
conn, err := e.upgrader.NewConnection(resp)
|
|
||||||
if err != nil {
|
|
||||||
return nil, "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
return conn, resp.Header.Get(httpstream.HeaderProtocolVersion), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type streamCreator interface {
|
type streamCreator interface {
|
||||||
@ -149,10 +65,105 @@ type streamProtocolHandler interface {
|
|||||||
stream(conn streamCreator) error
|
stream(conn streamCreator) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// streamExecutor handles transporting standard shell streams over an httpstream connection.
|
||||||
|
type streamExecutor struct {
|
||||||
|
upgrader SPDYUpgrader
|
||||||
|
transport http.RoundTripper
|
||||||
|
|
||||||
|
method string
|
||||||
|
url *url.URL
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewSPDYExecutor connects to the provided server and upgrades the connection to
|
||||||
|
// multiplexed bidirectional streams.
|
||||||
|
func NewSPDYExecutor(config *restclient.Config, method string, url *url.URL) (Executor, error) {
|
||||||
|
wrapper, upgradeRoundTripper, err := SPDYRoundTripperFor(config)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
wrapper = transport.DebugWrappers(wrapper)
|
||||||
|
return &streamExecutor{
|
||||||
|
upgrader: upgradeRoundTripper,
|
||||||
|
transport: wrapper,
|
||||||
|
method: method,
|
||||||
|
url: url,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type spdyDialer struct {
|
||||||
|
client *http.Client
|
||||||
|
upgrader SPDYUpgrader
|
||||||
|
method string
|
||||||
|
url *url.URL
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSPDYDialer(upgrader SPDYUpgrader, client *http.Client, method string, url *url.URL) (httpstream.Dialer, error) {
|
||||||
|
return &spdyDialer{
|
||||||
|
client: client,
|
||||||
|
upgrader: upgrader,
|
||||||
|
method: method,
|
||||||
|
url: url,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *spdyDialer) Dial(protocols ...string) (httpstream.Connection, string, error) {
|
||||||
|
req, err := http.NewRequest(d.method, d.url.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, "", fmt.Errorf("error creating request: %v", err)
|
||||||
|
}
|
||||||
|
return NegotiateSPDYConnection(d.upgrader, d.client, req, protocols...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SPDYRoundTripperFor returns a round tripper to use with SPDY.
|
||||||
|
func SPDYRoundTripperFor(config *restclient.Config) (http.RoundTripper, SPDYUpgrader, error) {
|
||||||
|
tlsConfig, err := restclient.TLSConfigFor(config)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
upgradeRoundTripper := spdy.NewRoundTripper(tlsConfig, true)
|
||||||
|
wrapper, err := restclient.HTTPWrappersForConfig(config, upgradeRoundTripper)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
return wrapper, upgradeRoundTripper, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NegotiateSPDYConnection opens a connection to a remote server and attempts to negotiate
|
||||||
|
// a SPDY connection. Upon success, it returns the connection and the protocol selected by
|
||||||
|
// the server. The client transport must use the upgradeRoundTripper - see SPDYRoundTripperFor.
|
||||||
|
func NegotiateSPDYConnection(upgrader SPDYUpgrader, client *http.Client, req *http.Request, protocols ...string) (httpstream.Connection, string, error) {
|
||||||
|
for i := range protocols {
|
||||||
|
req.Header.Add(httpstream.HeaderProtocolVersion, protocols[i])
|
||||||
|
}
|
||||||
|
resp, err := client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, "", fmt.Errorf("error sending request: %v", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
conn, err := upgrader.NewConnection(resp)
|
||||||
|
if err != nil {
|
||||||
|
return nil, "", err
|
||||||
|
}
|
||||||
|
return conn, resp.Header.Get(httpstream.HeaderProtocolVersion), nil
|
||||||
|
}
|
||||||
|
|
||||||
// Stream opens a protocol streamer to the server and streams until a client closes
|
// Stream opens a protocol streamer to the server and streams until a client closes
|
||||||
// the connection or the server disconnects.
|
// the connection or the server disconnects.
|
||||||
func (e *streamExecutor) Stream(options StreamOptions) error {
|
func (e *streamExecutor) Stream(options StreamOptions) error {
|
||||||
conn, protocol, err := e.Dial(options.SupportedProtocols...)
|
req, err := http.NewRequest(e.method, e.url.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error creating request: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
conn, protocol, err := NegotiateSPDYConnection(
|
||||||
|
e.upgrader,
|
||||||
|
&http.Client{Transport: e.transport},
|
||||||
|
req,
|
||||||
|
remotecommand.StreamProtocolV4Name,
|
||||||
|
remotecommand.StreamProtocolV3Name,
|
||||||
|
remotecommand.StreamProtocolV2Name,
|
||||||
|
remotecommand.StreamProtocolV1Name,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user