Move SPDY specific code into its own package

This commit is contained in:
Clayton Coleman 2017-07-07 18:22:39 -04:00
parent 12c7874c0d
commit cf026a3314
No known key found for this signature in database
GPG Key ID: 3D16906B4F1C5CB3
12 changed files with 173 additions and 135 deletions

View File

@ -446,6 +446,7 @@ staging/src/k8s.io/client-go/rest/watch
staging/src/k8s.io/client-go/tools/auth
staging/src/k8s.io/client-go/tools/metrics
staging/src/k8s.io/client-go/tools/remotecommand
staging/src/k8s.io/client-go/transport/spdy
staging/src/k8s.io/client-go/util/cert
staging/src/k8s.io/client-go/util/homedir
staging/src/k8s.io/client-go/util/workqueue

View File

@ -40,6 +40,7 @@ go_test(
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/portforward:go_default_library",
"//vendor/k8s.io/client-go/tools/remotecommand:go_default_library",
"//vendor/k8s.io/client-go/transport/spdy:go_default_library",
"//vendor/k8s.io/client-go/util/testing:go_default_library",
],
)

View File

@ -33,7 +33,7 @@ import (
"k8s.io/apimachinery/pkg/types"
restclient "k8s.io/client-go/rest"
. "k8s.io/client-go/tools/portforward"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/client-go/transport/spdy"
"k8s.io/kubernetes/pkg/kubelet/server/portforward"
)
@ -131,16 +131,17 @@ func TestForwardPorts(t *testing.T) {
for testName, test := range tests {
server := httptest.NewServer(fakePortForwardServer(t, testName, test.serverSends, test.clientSends))
url, _ := url.Parse(server.URL)
exec, err := remotecommand.NewSPDYExecutor(&restclient.Config{}, "POST", url)
transport, upgrader, err := spdy.RoundTripperFor(&restclient.Config{})
if err != nil {
t.Fatal(err)
}
url, _ := url.Parse(server.URL)
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url)
stopChan := make(chan struct{}, 1)
readyChan := make(chan struct{})
pf, err := New(exec, test.ports, stopChan, readyChan, os.Stdout, os.Stderr)
pf, err := New(dialer, test.ports, stopChan, readyChan, os.Stdout, os.Stderr)
if err != nil {
t.Fatalf("%s: unexpected error calling New: %v", testName, err)
}
@ -201,17 +202,18 @@ func TestForwardPortsReturnsErrorWhenAllBindsFailed(t *testing.T) {
server := httptest.NewServer(fakePortForwardServer(t, "allBindsFailed", nil, nil))
defer server.Close()
url, _ := url.Parse(server.URL)
exec, err := remotecommand.NewSPDYExecutor(&restclient.Config{}, "POST", url)
transport, upgrader, err := spdy.RoundTripperFor(&restclient.Config{})
if err != nil {
t.Fatal(err)
}
url, _ := url.Parse(server.URL)
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url)
stopChan1 := make(chan struct{}, 1)
defer close(stopChan1)
readyChan1 := make(chan struct{})
pf1, err := New(exec, []string{"5555"}, stopChan1, readyChan1, os.Stdout, os.Stderr)
pf1, err := New(dialer, []string{"5555"}, stopChan1, readyChan1, os.Stdout, os.Stderr)
if err != nil {
t.Fatalf("error creating pf1: %v", err)
}
@ -220,7 +222,7 @@ func TestForwardPortsReturnsErrorWhenAllBindsFailed(t *testing.T) {
stopChan2 := make(chan struct{}, 1)
readyChan2 := make(chan struct{})
pf2, err := New(exec, []string{"5555"}, stopChan2, readyChan2, os.Stdout, os.Stderr)
pf2, err := New(dialer, []string{"5555"}, stopChan2, readyChan2, os.Stdout, os.Stderr)
if err != nil {
t.Fatalf("error creating pf2: %v", err)
}

View File

@ -37,6 +37,7 @@ import (
remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand"
restclient "k8s.io/client-go/rest"
remoteclient "k8s.io/client-go/tools/remotecommand"
"k8s.io/client-go/transport/spdy"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
@ -255,17 +256,16 @@ func TestStream(t *testing.T) {
conf := &restclient.Config{
Host: server.URL,
}
e, err := remoteclient.NewSPDYExecutor(conf, "POST", req.URL())
e, err := remoteclient.NewSPDYExecutorForProtocols(conf, "POST", req.URL(), testCase.ClientProtocols...)
if err != nil {
t.Errorf("%s: unexpected error: %v", name, err)
continue
}
err = e.Stream(remoteclient.StreamOptions{
SupportedProtocols: testCase.ClientProtocols,
Stdin: streamIn,
Stdout: streamOut,
Stderr: streamErr,
Tty: testCase.Tty,
Stdin: streamIn,
Stdout: streamOut,
Stderr: streamErr,
Tty: testCase.Tty,
})
hasErr := err != nil
@ -311,11 +311,13 @@ type fakeUpgrader struct {
conn httpstream.Connection
err, connErr error
checkResponse bool
called bool
t *testing.T
}
func (u *fakeUpgrader) RoundTrip(req *http.Request) (*http.Response, error) {
u.called = true
u.req = req
return u.resp, u.err
}
@ -344,44 +346,16 @@ func TestDial(t *testing.T) {
Body: ioutil.NopCloser(&bytes.Buffer{}),
},
}
var called bool
testFn := func(rt http.RoundTripper) http.RoundTripper {
if rt != upgrader {
t.Fatalf("unexpected round tripper: %#v", rt)
}
called = true
return rt
}
exec, err := newStreamExecutor(upgrader, testFn, "POST", &url.URL{Host: "something.com", Scheme: "https"})
if err != nil {
t.Fatal(err)
}
conn, protocol, err := exec.Dial("protocol1")
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: upgrader}, "POST", &url.URL{Host: "something.com", Scheme: "https"})
conn, protocol, err := dialer.Dial("protocol1")
if err != nil {
t.Fatal(err)
}
if conn != upgrader.conn {
t.Errorf("unexpected connection: %#v", conn)
}
if !called {
t.Errorf("wrapper not called")
if !upgrader.called {
t.Errorf("request not called")
}
_ = protocol
}
// newStreamExecutor upgrades the request so that it supports multiplexed bidirectional
// streams. This method takes a stream upgrader and an optional function that is invoked
// to wrap the round tripper. This method may be used by clients that are lower level than
// Kubernetes clients or need to provide their own upgrade round tripper.
func newStreamExecutor(upgrader httpstream.UpgradeRoundTripper, fn func(http.RoundTripper) http.RoundTripper, method string, url *url.URL) (StreamExecutor, error) {
rt := http.RoundTripper(upgrader)
if fn != nil {
rt = fn(rt)
}
return &streamExecutor{
upgrader: upgrader,
transport: rt,
method: method,
url: url,
}, nil
}

View File

@ -145,6 +145,7 @@ go_library(
"//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",
"//vendor/k8s.io/client-go/tools/portforward:go_default_library",
"//vendor/k8s.io/client-go/tools/remotecommand:go_default_library",
"//vendor/k8s.io/client-go/transport/spdy:go_default_library",
],
)

View File

@ -29,7 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/client-go/transport/spdy"
"k8s.io/kubernetes/pkg/api"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
@ -103,14 +103,11 @@ type defaultPortForwarder struct {
}
func (f *defaultPortForwarder) ForwardPorts(method string, url *url.URL, opts PortForwardOptions) error {
transport, upgrader, err := remotecommand.SPDYRoundTripperFor(opts.Config)
if err != nil {
return err
}
dialer, err := remotecommand.NewSPDYDialer(upgrader, &http.Client{Transport: transport}, method, url)
transport, upgrader, err := spdy.RoundTripperFor(opts.Config)
if err != nil {
return err
}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, method, url)
fw, err := portforward.New(dialer, opts.Ports, opts.StopChannel, opts.ReadyChannel, f.cmdOut, f.cmdErr)
if err != nil {
return err

View File

@ -45,9 +45,9 @@ go_test(
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/remotecommand:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/tools/remotecommand:go_default_library",
"//vendor/k8s.io/client-go/transport/spdy:go_default_library",
],
)

View File

@ -30,9 +30,9 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/client-go/transport/spdy"
"k8s.io/kubernetes/pkg/api"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
kubeletportforward "k8s.io/kubernetes/pkg/kubelet/server/portforward"
@ -237,9 +237,10 @@ func TestServePortForward(t *testing.T) {
reqURL, err := url.Parse(resp.Url)
require.NoError(t, err)
exec, err := remotecommand.NewSPDYExecutor(&restclient.Config{}, "POST", reqURL)
transport, upgrader, err := spdy.RoundTripperFor(&restclient.Config{})
require.NoError(t, err)
streamConn, _, err := exec.Dial(kubeletportforward.ProtocolV1Name)
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", reqURL)
streamConn, _, err := dialer.Dial(kubeletportforward.ProtocolV1Name)
require.NoError(t, err)
defer streamConn.Close()
@ -301,11 +302,10 @@ func runRemoteCommandTest(t *testing.T, commandType string) {
require.NoError(t, err)
opts := remotecommand.StreamOptions{
SupportedProtocols: remotecommandconsts.SupportedStreamingProtocols,
Stdin: stdinR,
Stdout: stdoutW,
Stderr: stderrW,
Tty: false,
Stdin: stdinR,
Stdout: stdoutW,
Stderr: stderrW,
Tty: false,
}
require.NoError(t, exec.Stream(opts))
}()

View File

@ -41,11 +41,11 @@ go_library(
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/httpstream:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/remotecommand:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/transport:go_default_library",
"//vendor/k8s.io/client-go/transport/spdy:go_default_library",
"//vendor/k8s.io/client-go/util/exec:go_default_library",
],
)

View File

@ -25,10 +25,10 @@ import (
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/httpstream/spdy"
"k8s.io/apimachinery/pkg/util/remotecommand"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/transport"
spdy "k8s.io/client-go/transport/spdy"
)
// StreamOptions holds information pertaining to the current streaming session: supported stream
@ -51,12 +51,6 @@ type Executor interface {
Stream(options StreamOptions) error
}
// SPDYUpgrader validates a response from the server after a SPDY upgrade.
type SPDYUpgrader interface {
// NewConnection validates the response and creates a new Connection.
NewConnection(resp *http.Response) (httpstream.Connection, error)
}
type streamCreator interface {
CreateStream(headers http.Header) (httpstream.Stream, error)
}
@ -67,17 +61,31 @@ type streamProtocolHandler interface {
// streamExecutor handles transporting standard shell streams over an httpstream connection.
type streamExecutor struct {
upgrader SPDYUpgrader
upgrader spdy.Upgrader
transport http.RoundTripper
method string
url *url.URL
method string
url *url.URL
protocols []string
}
// NewSPDYExecutor connects to the provided server and upgrades the connection to
// multiplexed bidirectional streams.
func NewSPDYExecutor(config *restclient.Config, method string, url *url.URL) (Executor, error) {
wrapper, upgradeRoundTripper, err := SPDYRoundTripperFor(config)
return NewSPDYExecutorForProtocols(
config, method, url,
remotecommand.StreamProtocolV4Name,
remotecommand.StreamProtocolV3Name,
remotecommand.StreamProtocolV2Name,
remotecommand.StreamProtocolV1Name,
)
}
// NewSPDYExecutorForProtocols connects to the provided server and upgrades the connection to
// multiplexed bidirectional streams using only the provided protocols. Exposed for testing, most
// callers should use NewSPDYExecutor.
func NewSPDYExecutorForProtocols(config *restclient.Config, method string, url *url.URL, protocols ...string) (Executor, error) {
wrapper, upgradeRoundTripper, err := spdy.RoundTripperFor(config)
if err != nil {
return nil, err
}
@ -87,66 +95,10 @@ func NewSPDYExecutor(config *restclient.Config, method string, url *url.URL) (Ex
transport: wrapper,
method: method,
url: url,
protocols: protocols,
}, nil
}
type spdyDialer struct {
client *http.Client
upgrader SPDYUpgrader
method string
url *url.URL
}
func NewSPDYDialer(upgrader SPDYUpgrader, client *http.Client, method string, url *url.URL) (httpstream.Dialer, error) {
return &spdyDialer{
client: client,
upgrader: upgrader,
method: method,
url: url,
}, nil
}
func (d *spdyDialer) Dial(protocols ...string) (httpstream.Connection, string, error) {
req, err := http.NewRequest(d.method, d.url.String(), nil)
if err != nil {
return nil, "", fmt.Errorf("error creating request: %v", err)
}
return NegotiateSPDYConnection(d.upgrader, d.client, req, protocols...)
}
// SPDYRoundTripperFor returns a round tripper to use with SPDY.
func SPDYRoundTripperFor(config *restclient.Config) (http.RoundTripper, SPDYUpgrader, error) {
tlsConfig, err := restclient.TLSConfigFor(config)
if err != nil {
return nil, nil, err
}
upgradeRoundTripper := spdy.NewRoundTripper(tlsConfig, true)
wrapper, err := restclient.HTTPWrappersForConfig(config, upgradeRoundTripper)
if err != nil {
return nil, nil, err
}
return wrapper, upgradeRoundTripper, nil
}
// NegotiateSPDYConnection opens a connection to a remote server and attempts to negotiate
// a SPDY connection. Upon success, it returns the connection and the protocol selected by
// the server. The client transport must use the upgradeRoundTripper - see SPDYRoundTripperFor.
func NegotiateSPDYConnection(upgrader SPDYUpgrader, client *http.Client, req *http.Request, protocols ...string) (httpstream.Connection, string, error) {
for i := range protocols {
req.Header.Add(httpstream.HeaderProtocolVersion, protocols[i])
}
resp, err := client.Do(req)
if err != nil {
return nil, "", fmt.Errorf("error sending request: %v", err)
}
defer resp.Body.Close()
conn, err := upgrader.NewConnection(resp)
if err != nil {
return nil, "", err
}
return conn, resp.Header.Get(httpstream.HeaderProtocolVersion), nil
}
// Stream opens a protocol streamer to the server and streams until a client closes
// the connection or the server disconnects.
func (e *streamExecutor) Stream(options StreamOptions) error {
@ -155,14 +107,11 @@ func (e *streamExecutor) Stream(options StreamOptions) error {
return fmt.Errorf("error creating request: %v", err)
}
conn, protocol, err := NegotiateSPDYConnection(
conn, protocol, err := spdy.Negotiate(
e.upgrader,
&http.Client{Transport: e.transport},
req,
remotecommand.StreamProtocolV4Name,
remotecommand.StreamProtocolV3Name,
remotecommand.StreamProtocolV2Name,
remotecommand.StreamProtocolV1Name,
e.protocols...,
)
if err != nil {
return err

View File

@ -0,0 +1,19 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = ["spdy.go"],
tags = ["automanaged"],
deps = [
"//vendor/k8s.io/apimachinery/pkg/util/httpstream:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
],
)

View File

@ -0,0 +1,94 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package spdy
import (
"fmt"
"net/http"
"net/url"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/httpstream/spdy"
restclient "k8s.io/client-go/rest"
)
// Upgrader validates a response from the server after a SPDY upgrade.
type Upgrader interface {
// NewConnection validates the response and creates a new Connection.
NewConnection(resp *http.Response) (httpstream.Connection, error)
}
// RoundTripperFor returns a round tripper and upgrader to use with SPDY.
func RoundTripperFor(config *restclient.Config) (http.RoundTripper, Upgrader, error) {
tlsConfig, err := restclient.TLSConfigFor(config)
if err != nil {
return nil, nil, err
}
upgradeRoundTripper := spdy.NewRoundTripper(tlsConfig, true)
wrapper, err := restclient.HTTPWrappersForConfig(config, upgradeRoundTripper)
if err != nil {
return nil, nil, err
}
return wrapper, upgradeRoundTripper, nil
}
// dialer implements the httpstream.Dialer interface.
type dialer struct {
client *http.Client
upgrader Upgrader
method string
url *url.URL
}
var _ httpstream.Dialer = &dialer{}
// NewDialer will create a dialer that connects to the provided URL and upgrades the connection to SPDY.
func NewDialer(upgrader Upgrader, client *http.Client, method string, url *url.URL) httpstream.Dialer {
return &dialer{
client: client,
upgrader: upgrader,
method: method,
url: url,
}
}
func (d *dialer) Dial(protocols ...string) (httpstream.Connection, string, error) {
req, err := http.NewRequest(d.method, d.url.String(), nil)
if err != nil {
return nil, "", fmt.Errorf("error creating request: %v", err)
}
return Negotiate(d.upgrader, d.client, req, protocols...)
}
// Negotiate opens a connection to a remote server and attempts to negotiate
// a SPDY connection. Upon success, it returns the connection and the protocol selected by
// the server. The client transport must use the upgradeRoundTripper - see RoundTripperFor.
func Negotiate(upgrader Upgrader, client *http.Client, req *http.Request, protocols ...string) (httpstream.Connection, string, error) {
for i := range protocols {
req.Header.Add(httpstream.HeaderProtocolVersion, protocols[i])
}
resp, err := client.Do(req)
if err != nil {
return nil, "", fmt.Errorf("error sending request: %v", err)
}
defer resp.Body.Close()
conn, err := upgrader.NewConnection(resp)
if err != nil {
return nil, "", err
}
return conn, resp.Header.Get(httpstream.HeaderProtocolVersion), nil
}