diff --git a/pkg/kubectl/proxy/proxy_server.go b/pkg/kubectl/proxy/proxy_server.go index 3e323c8dc41..f127c697896 100644 --- a/pkg/kubectl/proxy/proxy_server.go +++ b/pkg/kubectl/proxy/proxy_server.go @@ -158,7 +158,7 @@ func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) { // makeUpgradeTransport creates a transport that explicitly bypasses HTTP2 support // for proxy connections that must upgrade. -func makeUpgradeTransport(config *rest.Config) (http.RoundTripper, error) { +func makeUpgradeTransport(config *rest.Config) (proxy.UpgradeRequestRoundTripper, error) { transportConfig, err := config.TransportConfig() if err != nil { return nil, err @@ -170,7 +170,11 @@ func makeUpgradeTransport(config *rest.Config) (http.RoundTripper, error) { rt := utilnet.SetOldTransportDefaults(&http.Transport{ TLSClientConfig: tlsConfig, }) - return transport.HTTPWrappersForConfig(transportConfig, rt) + upgrader, err := transport.HTTPWrappersForConfig(transportConfig, proxy.MirrorRequest) + if err != nil { + return nil, err + } + return proxy.NewUpgradeRequestRoundTripper(rt, upgrader), nil } // NewServer creates and installs a new Server. diff --git a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go index d443915d889..70896907e66 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go @@ -17,9 +17,11 @@ limitations under the License. package proxy import ( + "bytes" "context" "fmt" "io" + "io/ioutil" "net" "net/http" "net/http/httputil" @@ -37,6 +39,19 @@ import ( "github.com/mxk/go-flowrate/flowrate" ) +// UpgradeRequestRoundTripper provides an additional method to decorate a request +// with any authentication or other protocol level information prior to performing +// an upgrade on the server. Any response will be handled by the intercepting +// proxy. +type UpgradeRequestRoundTripper interface { + http.RoundTripper + // WrapRequest takes a valid HTTP request and returns a suitably altered version + // of request with any HTTP level values required to complete the request half of + // an upgrade on the server. It does not get a chance to see the response and + // should bypass any request side logic that expects to see the response. + WrapRequest(*http.Request) (*http.Request, error) +} + // UpgradeAwareHandler is a handler for proxy requests that may require an upgrade type UpgradeAwareHandler struct { // UpgradeRequired will reject non-upgrade connections if true. @@ -48,7 +63,7 @@ type UpgradeAwareHandler struct { Transport http.RoundTripper // UpgradeTransport, if specified, will be used as the backend transport when upgrade requests are provided. // This allows clients to disable HTTP/2. - UpgradeTransport http.RoundTripper + UpgradeTransport UpgradeRequestRoundTripper // WrapTransport indicates whether the provided Transport should be wrapped with default proxy transport behavior (URL rewriting, X-Forwarded-* header setting) WrapTransport bool // InterceptRedirects determines whether the proxy should sniff backend responses for redirects, @@ -90,6 +105,60 @@ func (r simpleResponder) Error(w http.ResponseWriter, req *http.Request, err err r.responder.Error(err) } +// upgradeRequestRoundTripper implements proxy.UpgradeRequestRoundTripper. +type upgradeRequestRoundTripper struct { + http.RoundTripper + upgrader http.RoundTripper +} + +var ( + _ UpgradeRequestRoundTripper = &upgradeRequestRoundTripper{} + _ utilnet.RoundTripperWrapper = &upgradeRequestRoundTripper{} +) + +// WrappedRoundTripper returns the round tripper that a caller would use. +func (rt *upgradeRequestRoundTripper) WrappedRoundTripper() http.RoundTripper { + return rt.RoundTripper +} + +// WriteToRequest calls the nested upgrader and then copies the returned request +// fields onto the passed request. +func (rt *upgradeRequestRoundTripper) WrapRequest(req *http.Request) (*http.Request, error) { + resp, err := rt.upgrader.RoundTrip(req) + if err != nil { + return nil, err + } + return resp.Request, nil +} + +// onewayRoundTripper captures the provided request - which is assumed to have +// been modified by other round trippers - and then returns a fake response. +type onewayRoundTripper struct{} + +// RoundTrip returns a simple 200 OK response that captures the provided request. +func (onewayRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + return &http.Response{ + Status: "200 OK", + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(&bytes.Buffer{}), + Request: req, + }, nil +} + +// MirrorRequest is a round tripper that can be called to get back the calling request as +// the core round tripper in a chain. +var MirrorRequest http.RoundTripper = onewayRoundTripper{} + +// NewUpgradeRequestRoundTripper takes two round trippers - one for the underlying TCP connection, and +// one that is able to write headers to an HTTP request. The request rt is used to set the request headers +// and that is written to the underlying connection rt. +func NewUpgradeRequestRoundTripper(connection, request http.RoundTripper) UpgradeRequestRoundTripper { + return &upgradeRequestRoundTripper{ + RoundTripper: connection, + upgrader: request, + } +} + // NewUpgradeAwareHandler creates a new proxy handler with a default flush interval. Responder is required for returning // errors to the caller. func NewUpgradeAwareHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder ErrorResponder) *UpgradeAwareHandler { @@ -260,10 +329,14 @@ func (h *UpgradeAwareHandler) Dial(req *http.Request) (net.Conn, error) { } func (h *UpgradeAwareHandler) DialForUpgrade(req *http.Request) (net.Conn, error) { - if h.UpgradeTransport != nil { - return dial(req, h.UpgradeTransport) + if h.UpgradeTransport == nil { + return dial(req, h.Transport) } - return dial(req, h.Transport) + updatedReq, err := h.UpgradeTransport.WrapRequest(req) + if err != nil { + return nil, err + } + return dial(updatedReq, h.UpgradeTransport) } // dial dials the backend at req.URL and writes req to it. diff --git a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware_test.go b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware_test.go index 985d2208548..78ce9870c77 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware_test.go @@ -333,6 +333,12 @@ func TestServeHTTP(t *testing.T) { } } +type RoundTripperFunc func(req *http.Request) (*http.Response, error) + +func (fn RoundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) { + return fn(req) +} + func TestProxyUpgrade(t *testing.T) { localhostPool := x509.NewCertPool() @@ -341,8 +347,10 @@ func TestProxyUpgrade(t *testing.T) { } testcases := map[string]struct { - ServerFunc func(http.Handler) *httptest.Server - ProxyTransport http.RoundTripper + ServerFunc func(http.Handler) *httptest.Server + ProxyTransport http.RoundTripper + UpgradeTransport UpgradeRequestRoundTripper + ExpectedAuth string }{ "http": { ServerFunc: httptest.NewServer, @@ -393,6 +401,30 @@ func TestProxyUpgrade(t *testing.T) { }, ProxyTransport: utilnet.SetTransportDefaults(&http.Transport{Dial: net.Dial, TLSClientConfig: &tls.Config{RootCAs: localhostPool}}), }, + "https (valid hostname + RootCAs + custom dialer + bearer token)": { + ServerFunc: func(h http.Handler) *httptest.Server { + cert, err := tls.X509KeyPair(localhostCert, localhostKey) + if err != nil { + t.Errorf("https (valid hostname): proxy_test: %v", err) + } + ts := httptest.NewUnstartedServer(h) + ts.TLS = &tls.Config{ + Certificates: []tls.Certificate{cert}, + } + ts.StartTLS() + return ts + }, + ProxyTransport: utilnet.SetTransportDefaults(&http.Transport{Dial: net.Dial, TLSClientConfig: &tls.Config{RootCAs: localhostPool}}), + UpgradeTransport: NewUpgradeRequestRoundTripper( + utilnet.SetOldTransportDefaults(&http.Transport{Dial: net.Dial, TLSClientConfig: &tls.Config{RootCAs: localhostPool}}), + RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + req = utilnet.CloneRequest(req) + req.Header.Set("Authorization", "Bearer 1234") + return MirrorRequest.RoundTrip(req) + }), + ), + ExpectedAuth: "Bearer 1234", + }, } for k, tc := range testcases { @@ -406,6 +438,12 @@ func TestProxyUpgrade(t *testing.T) { func() { // Cleanup after each test case. backend := http.NewServeMux() backend.Handle("/hello", websocket.Handler(func(ws *websocket.Conn) { + if ws.Request().Header.Get("Authorization") != tc.ExpectedAuth { + t.Errorf("%s: unexpected headers on request: %v", k, ws.Request().Header) + defer ws.Close() + ws.Write([]byte("you failed")) + return + } defer ws.Close() body := make([]byte, 5) ws.Read(body) @@ -422,6 +460,7 @@ func TestProxyUpgrade(t *testing.T) { proxyHandler := &UpgradeAwareHandler{ Location: serverURL, Transport: tc.ProxyTransport, + UpgradeTransport: tc.UpgradeTransport, InterceptRedirects: redirect, Responder: &noErrorsAllowed{t: t}, }