Move SPDY specific code into its own package

Kubernetes-commit: cf026a3314fd3513a2c4239c6f4edbbbbcb96a77
This commit is contained in:
Clayton Coleman
2017-07-07 18:22:39 -04:00
committed by Kubernetes Publisher
parent ec4003cdb5
commit 3bb413eede
4 changed files with 136 additions and 74 deletions

View File

@@ -25,10 +25,10 @@ import (
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/httpstream/spdy"
"k8s.io/apimachinery/pkg/util/remotecommand"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/transport"
spdy "k8s.io/client-go/transport/spdy"
)
// StreamOptions holds information pertaining to the current streaming session: supported stream
@@ -51,12 +51,6 @@ type Executor interface {
Stream(options StreamOptions) error
}
// SPDYUpgrader validates a response from the server after a SPDY upgrade.
type SPDYUpgrader interface {
// NewConnection validates the response and creates a new Connection.
NewConnection(resp *http.Response) (httpstream.Connection, error)
}
type streamCreator interface {
CreateStream(headers http.Header) (httpstream.Stream, error)
}
@@ -67,17 +61,31 @@ type streamProtocolHandler interface {
// streamExecutor handles transporting standard shell streams over an httpstream connection.
type streamExecutor struct {
upgrader SPDYUpgrader
upgrader spdy.Upgrader
transport http.RoundTripper
method string
url *url.URL
method string
url *url.URL
protocols []string
}
// NewSPDYExecutor connects to the provided server and upgrades the connection to
// multiplexed bidirectional streams.
func NewSPDYExecutor(config *restclient.Config, method string, url *url.URL) (Executor, error) {
wrapper, upgradeRoundTripper, err := SPDYRoundTripperFor(config)
return NewSPDYExecutorForProtocols(
config, method, url,
remotecommand.StreamProtocolV4Name,
remotecommand.StreamProtocolV3Name,
remotecommand.StreamProtocolV2Name,
remotecommand.StreamProtocolV1Name,
)
}
// NewSPDYExecutorForProtocols connects to the provided server and upgrades the connection to
// multiplexed bidirectional streams using only the provided protocols. Exposed for testing, most
// callers should use NewSPDYExecutor.
func NewSPDYExecutorForProtocols(config *restclient.Config, method string, url *url.URL, protocols ...string) (Executor, error) {
wrapper, upgradeRoundTripper, err := spdy.RoundTripperFor(config)
if err != nil {
return nil, err
}
@@ -87,66 +95,10 @@ func NewSPDYExecutor(config *restclient.Config, method string, url *url.URL) (Ex
transport: wrapper,
method: method,
url: url,
protocols: protocols,
}, nil
}
type spdyDialer struct {
client *http.Client
upgrader SPDYUpgrader
method string
url *url.URL
}
func NewSPDYDialer(upgrader SPDYUpgrader, client *http.Client, method string, url *url.URL) (httpstream.Dialer, error) {
return &spdyDialer{
client: client,
upgrader: upgrader,
method: method,
url: url,
}, nil
}
func (d *spdyDialer) Dial(protocols ...string) (httpstream.Connection, string, error) {
req, err := http.NewRequest(d.method, d.url.String(), nil)
if err != nil {
return nil, "", fmt.Errorf("error creating request: %v", err)
}
return NegotiateSPDYConnection(d.upgrader, d.client, req, protocols...)
}
// SPDYRoundTripperFor returns a round tripper to use with SPDY.
func SPDYRoundTripperFor(config *restclient.Config) (http.RoundTripper, SPDYUpgrader, error) {
tlsConfig, err := restclient.TLSConfigFor(config)
if err != nil {
return nil, nil, err
}
upgradeRoundTripper := spdy.NewRoundTripper(tlsConfig, true)
wrapper, err := restclient.HTTPWrappersForConfig(config, upgradeRoundTripper)
if err != nil {
return nil, nil, err
}
return wrapper, upgradeRoundTripper, nil
}
// NegotiateSPDYConnection opens a connection to a remote server and attempts to negotiate
// a SPDY connection. Upon success, it returns the connection and the protocol selected by
// the server. The client transport must use the upgradeRoundTripper - see SPDYRoundTripperFor.
func NegotiateSPDYConnection(upgrader SPDYUpgrader, client *http.Client, req *http.Request, protocols ...string) (httpstream.Connection, string, error) {
for i := range protocols {
req.Header.Add(httpstream.HeaderProtocolVersion, protocols[i])
}
resp, err := client.Do(req)
if err != nil {
return nil, "", fmt.Errorf("error sending request: %v", err)
}
defer resp.Body.Close()
conn, err := upgrader.NewConnection(resp)
if err != nil {
return nil, "", err
}
return conn, resp.Header.Get(httpstream.HeaderProtocolVersion), nil
}
// Stream opens a protocol streamer to the server and streams until a client closes
// the connection or the server disconnects.
func (e *streamExecutor) Stream(options StreamOptions) error {
@@ -155,14 +107,11 @@ func (e *streamExecutor) Stream(options StreamOptions) error {
return fmt.Errorf("error creating request: %v", err)
}
conn, protocol, err := NegotiateSPDYConnection(
conn, protocol, err := spdy.Negotiate(
e.upgrader,
&http.Client{Transport: e.transport},
req,
remotecommand.StreamProtocolV4Name,
remotecommand.StreamProtocolV3Name,
remotecommand.StreamProtocolV2Name,
remotecommand.StreamProtocolV1Name,
e.protocols...,
)
if err != nil {
return err