diff --git a/staging/src/k8s.io/apimachinery/pkg/util/net/http.go b/staging/src/k8s.io/apimachinery/pkg/util/net/http.go index adb80813be2..a48f16543c9 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/net/http.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/net/http.go @@ -277,6 +277,13 @@ func NewProxierWithNoProxyCIDR(delegate func(req *http.Request) (*url.URL, error } } +// DialerFunc implements Dialer for the provided function. +type DialerFunc func(req *http.Request) (net.Conn, error) + +func (fn DialerFunc) Dial(req *http.Request) (net.Conn, error) { + return fn(req) +} + // Dialer dials a host and writes a request to it. type Dialer interface { // Dial connects to the host specified by req's URL, writes the request to the connection, and diff --git a/staging/src/k8s.io/apimachinery/pkg/util/proxy/dial.go b/staging/src/k8s.io/apimachinery/pkg/util/proxy/dial.go index e7373190940..3da7e965f53 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/proxy/dial.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/proxy/dial.go @@ -32,7 +32,10 @@ import ( func DialURL(url *url.URL, transport http.RoundTripper) (net.Conn, error) { dialAddr := netutil.CanonicalAddr(url) - dialer, _ := utilnet.DialerFor(transport) + dialer, err := utilnet.DialerFor(transport) + if err != nil { + glog.V(5).Infof("Unable to unwrap transport %T to get dialer: %v", transport, err) + } switch url.Scheme { case "http": @@ -45,7 +48,10 @@ func DialURL(url *url.URL, transport http.RoundTripper) (net.Conn, error) { var tlsConfig *tls.Config var tlsConn *tls.Conn var err error - tlsConfig, _ = utilnet.TLSClientConfig(transport) + tlsConfig, err = utilnet.TLSClientConfig(transport) + if err != nil { + glog.V(5).Infof("Unable to unwrap transport %T to get at TLS config: %v", transport, err) + } if dialer != nil { // We have a dialer; use it to open the connection, then diff --git a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go index ff04578e29a..d443915d889 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go @@ -39,18 +39,29 @@ import ( // UpgradeAwareHandler is a handler for proxy requests that may require an upgrade type UpgradeAwareHandler struct { + // UpgradeRequired will reject non-upgrade connections if true. UpgradeRequired bool - Location *url.URL + // Location is the location of the upstream proxy. It is used as the location to Dial on the upstream server + // for upgrade requests unless UseRequestLocationOnUpgrade is true. + Location *url.URL // Transport provides an optional round tripper to use to proxy. If nil, the default proxy transport is used Transport http.RoundTripper + // UpgradeTransport, if specified, will be used as the backend transport when upgrade requests are provided. + // This allows clients to disable HTTP/2. + UpgradeTransport http.RoundTripper // WrapTransport indicates whether the provided Transport should be wrapped with default proxy transport behavior (URL rewriting, X-Forwarded-* header setting) WrapTransport bool // InterceptRedirects determines whether the proxy should sniff backend responses for redirects, // following them as necessary. InterceptRedirects bool - FlushInterval time.Duration - MaxBytesPerSec int64 - Responder ErrorResponder + // UseRequestLocation will use the incoming request URL when talking to the backend server. + UseRequestLocation bool + // FlushInterval controls how often the standard HTTP proxy will flush content from the upstream. + FlushInterval time.Duration + // MaxBytesPerSec controls the maximum rate for an upstream connection. No rate is imposed if the value is zero. + MaxBytesPerSec int64 + // Responder is passed errors that occur while setting up proxying. + Responder ErrorResponder } const defaultFlushInterval = 200 * time.Millisecond @@ -58,9 +69,27 @@ const defaultFlushInterval = 200 * time.Millisecond // ErrorResponder abstracts error reporting to the proxy handler to remove the need to hardcode a particular // error format. type ErrorResponder interface { + Error(w http.ResponseWriter, req *http.Request, err error) +} + +// SimpleErrorResponder is the legacy implementation of ErrorResponder for callers that only +// service a single request/response per proxy. +type SimpleErrorResponder interface { Error(err error) } +func NewErrorResponder(r SimpleErrorResponder) ErrorResponder { + return simpleResponder{r} +} + +type simpleResponder struct { + responder SimpleErrorResponder +} + +func (r simpleResponder) Error(w http.ResponseWriter, req *http.Request, err error) { + r.responder.Error(err) +} + // NewUpgradeAwareHandler creates a new proxy handler with a default flush interval. Responder is required for returning // errors to the caller. func NewUpgradeAwareHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder ErrorResponder) *UpgradeAwareHandler { @@ -83,7 +112,7 @@ func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request return } if h.UpgradeRequired { - h.Responder.Error(errors.NewBadRequest("Upgrade request required")) + h.Responder.Error(w, req, errors.NewBadRequest("Upgrade request required")) return } @@ -117,7 +146,9 @@ func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request // WithContext creates a shallow clone of the request with the new context. newReq := req.WithContext(context.Background()) newReq.Header = utilnet.CloneHeader(req.Header) - newReq.URL = &loc + if !h.UseRequestLocation { + newReq.URL = &loc + } proxy := httputil.NewSingleHostReverseProxy(&url.URL{Scheme: h.Location.Scheme, Host: h.Location.Host}) proxy.Transport = h.Transport @@ -128,6 +159,7 @@ func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request // tryUpgrade returns true if the request was handled. func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Request) bool { if !httpstream.IsUpgradeRequest(req) { + glog.V(6).Infof("Request was not an upgrade") return false } @@ -137,18 +169,28 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques err error ) + location := *h.Location + if h.UseRequestLocation { + location = *req.URL + location.Scheme = h.Location.Scheme + location.Host = h.Location.Host + } + clone := utilnet.CloneRequest(req) // Only append X-Forwarded-For in the upgrade path, since httputil.NewSingleHostReverseProxy // handles this in the non-upgrade path. utilnet.AppendForwardedForHeader(clone) if h.InterceptRedirects { - backendConn, rawResponse, err = utilnet.ConnectWithRedirects(req.Method, h.Location, clone.Header, req.Body, h) + glog.V(6).Infof("Connecting to backend proxy (intercepting redirects) %s\n Headers: %v", &location, clone.Header) + backendConn, rawResponse, err = utilnet.ConnectWithRedirects(req.Method, &location, clone.Header, req.Body, utilnet.DialerFunc(h.DialForUpgrade)) } else { - clone.URL = h.Location - backendConn, err = h.Dial(clone) + glog.V(6).Infof("Connecting to backend proxy (direct dial) %s\n Headers: %v", &location, clone.Header) + clone.URL = &location + backendConn, err = h.DialForUpgrade(clone) } if err != nil { - h.Responder.Error(err) + glog.V(6).Infof("Proxy connection error: %v", err) + h.Responder.Error(w, req, err) return true } defer backendConn.Close() @@ -157,18 +199,21 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques // hijacking should be the last step in the upgrade. requestHijacker, ok := w.(http.Hijacker) if !ok { - h.Responder.Error(fmt.Errorf("request connection cannot be hijacked: %T", w)) + glog.V(6).Infof("Unable to hijack response writer: %T", w) + h.Responder.Error(w, req, fmt.Errorf("request connection cannot be hijacked: %T", w)) return true } requestHijackedConn, _, err := requestHijacker.Hijack() if err != nil { - h.Responder.Error(fmt.Errorf("error hijacking request connection: %v", err)) + glog.V(6).Infof("Unable to hijack response: %v", err) + h.Responder.Error(w, req, fmt.Errorf("error hijacking connection: %v", err)) return true } defer requestHijackedConn.Close() // Forward raw response bytes back to client. if len(rawResponse) > 0 { + glog.V(6).Infof("Writing %d bytes to hijacked connection", len(rawResponse)) if _, err = requestHijackedConn.Write(rawResponse); err != nil { utilruntime.HandleError(fmt.Errorf("Error proxying response from backend to client: %v", err)) } @@ -210,9 +255,20 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques return true } -// Dial dials the backend at req.URL and writes req to it. func (h *UpgradeAwareHandler) Dial(req *http.Request) (net.Conn, error) { - conn, err := DialURL(req.URL, h.Transport) + return dial(req, h.Transport) +} + +func (h *UpgradeAwareHandler) DialForUpgrade(req *http.Request) (net.Conn, error) { + if h.UpgradeTransport != nil { + return dial(req, h.UpgradeTransport) + } + return dial(req, h.Transport) +} + +// dial dials the backend at req.URL and writes req to it. +func dial(req *http.Request, transport http.RoundTripper) (net.Conn, error) { + conn, err := DialURL(req.URL, transport) if err != nil { return nil, fmt.Errorf("error dialing backend: %v", err) } diff --git a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware_test.go b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware_test.go index 3e6f11237d4..985d2208548 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware_test.go @@ -55,18 +55,14 @@ type fakeResponder struct { w http.ResponseWriter } -func (r *fakeResponder) Error(err error) { +func (r *fakeResponder) Error(w http.ResponseWriter, req *http.Request, err error) { if r.called { r.t.Errorf("Error responder called again!\nprevious error: %v\nnew error: %v", r.err, err) } - if r.w != nil { - r.w.WriteHeader(fakeStatusCode) - _, writeErr := r.w.Write([]byte(err.Error())) - assert.NoError(r.t, writeErr) - } else { - r.t.Logf("No ResponseWriter set") - } + w.WriteHeader(fakeStatusCode) + _, writeErr := w.Write([]byte(err.Error())) + assert.NoError(r.t, writeErr) r.called = true r.err = err @@ -459,7 +455,7 @@ type noErrorsAllowed struct { t *testing.T } -func (r *noErrorsAllowed) Error(err error) { +func (r *noErrorsAllowed) Error(w http.ResponseWriter, req *http.Request, err error) { r.t.Error(err) }