mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Allow the UpgradeAwareProxy to have an upgrade specific transport
We must disable HTTP/2 for proxying exec, attach, and portforward, but we may still want to use HTTP/2 for requests (from `kubectl proxy`)
This commit is contained in:
parent
fa009f3914
commit
1df5817df3
@ -277,6 +277,13 @@ func NewProxierWithNoProxyCIDR(delegate func(req *http.Request) (*url.URL, error
|
||||
}
|
||||
}
|
||||
|
||||
// DialerFunc implements Dialer for the provided function.
|
||||
type DialerFunc func(req *http.Request) (net.Conn, error)
|
||||
|
||||
func (fn DialerFunc) Dial(req *http.Request) (net.Conn, error) {
|
||||
return fn(req)
|
||||
}
|
||||
|
||||
// Dialer dials a host and writes a request to it.
|
||||
type Dialer interface {
|
||||
// Dial connects to the host specified by req's URL, writes the request to the connection, and
|
||||
|
@ -32,7 +32,10 @@ import (
|
||||
func DialURL(url *url.URL, transport http.RoundTripper) (net.Conn, error) {
|
||||
dialAddr := netutil.CanonicalAddr(url)
|
||||
|
||||
dialer, _ := utilnet.DialerFor(transport)
|
||||
dialer, err := utilnet.DialerFor(transport)
|
||||
if err != nil {
|
||||
glog.V(5).Infof("Unable to unwrap transport %T to get dialer: %v", transport, err)
|
||||
}
|
||||
|
||||
switch url.Scheme {
|
||||
case "http":
|
||||
@ -45,7 +48,10 @@ func DialURL(url *url.URL, transport http.RoundTripper) (net.Conn, error) {
|
||||
var tlsConfig *tls.Config
|
||||
var tlsConn *tls.Conn
|
||||
var err error
|
||||
tlsConfig, _ = utilnet.TLSClientConfig(transport)
|
||||
tlsConfig, err = utilnet.TLSClientConfig(transport)
|
||||
if err != nil {
|
||||
glog.V(5).Infof("Unable to unwrap transport %T to get at TLS config: %v", transport, err)
|
||||
}
|
||||
|
||||
if dialer != nil {
|
||||
// We have a dialer; use it to open the connection, then
|
||||
|
@ -39,18 +39,29 @@ import (
|
||||
|
||||
// UpgradeAwareHandler is a handler for proxy requests that may require an upgrade
|
||||
type UpgradeAwareHandler struct {
|
||||
// UpgradeRequired will reject non-upgrade connections if true.
|
||||
UpgradeRequired bool
|
||||
Location *url.URL
|
||||
// Location is the location of the upstream proxy. It is used as the location to Dial on the upstream server
|
||||
// for upgrade requests unless UseRequestLocationOnUpgrade is true.
|
||||
Location *url.URL
|
||||
// Transport provides an optional round tripper to use to proxy. If nil, the default proxy transport is used
|
||||
Transport http.RoundTripper
|
||||
// UpgradeTransport, if specified, will be used as the backend transport when upgrade requests are provided.
|
||||
// This allows clients to disable HTTP/2.
|
||||
UpgradeTransport http.RoundTripper
|
||||
// WrapTransport indicates whether the provided Transport should be wrapped with default proxy transport behavior (URL rewriting, X-Forwarded-* header setting)
|
||||
WrapTransport bool
|
||||
// InterceptRedirects determines whether the proxy should sniff backend responses for redirects,
|
||||
// following them as necessary.
|
||||
InterceptRedirects bool
|
||||
FlushInterval time.Duration
|
||||
MaxBytesPerSec int64
|
||||
Responder ErrorResponder
|
||||
// UseRequestLocation will use the incoming request URL when talking to the backend server.
|
||||
UseRequestLocation bool
|
||||
// FlushInterval controls how often the standard HTTP proxy will flush content from the upstream.
|
||||
FlushInterval time.Duration
|
||||
// MaxBytesPerSec controls the maximum rate for an upstream connection. No rate is imposed if the value is zero.
|
||||
MaxBytesPerSec int64
|
||||
// Responder is passed errors that occur while setting up proxying.
|
||||
Responder ErrorResponder
|
||||
}
|
||||
|
||||
const defaultFlushInterval = 200 * time.Millisecond
|
||||
@ -58,9 +69,27 @@ const defaultFlushInterval = 200 * time.Millisecond
|
||||
// ErrorResponder abstracts error reporting to the proxy handler to remove the need to hardcode a particular
|
||||
// error format.
|
||||
type ErrorResponder interface {
|
||||
Error(w http.ResponseWriter, req *http.Request, err error)
|
||||
}
|
||||
|
||||
// SimpleErrorResponder is the legacy implementation of ErrorResponder for callers that only
|
||||
// service a single request/response per proxy.
|
||||
type SimpleErrorResponder interface {
|
||||
Error(err error)
|
||||
}
|
||||
|
||||
func NewErrorResponder(r SimpleErrorResponder) ErrorResponder {
|
||||
return simpleResponder{r}
|
||||
}
|
||||
|
||||
type simpleResponder struct {
|
||||
responder SimpleErrorResponder
|
||||
}
|
||||
|
||||
func (r simpleResponder) Error(w http.ResponseWriter, req *http.Request, err error) {
|
||||
r.responder.Error(err)
|
||||
}
|
||||
|
||||
// NewUpgradeAwareHandler creates a new proxy handler with a default flush interval. Responder is required for returning
|
||||
// errors to the caller.
|
||||
func NewUpgradeAwareHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder ErrorResponder) *UpgradeAwareHandler {
|
||||
@ -83,7 +112,7 @@ func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
|
||||
return
|
||||
}
|
||||
if h.UpgradeRequired {
|
||||
h.Responder.Error(errors.NewBadRequest("Upgrade request required"))
|
||||
h.Responder.Error(w, req, errors.NewBadRequest("Upgrade request required"))
|
||||
return
|
||||
}
|
||||
|
||||
@ -117,7 +146,9 @@ func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
|
||||
// WithContext creates a shallow clone of the request with the new context.
|
||||
newReq := req.WithContext(context.Background())
|
||||
newReq.Header = utilnet.CloneHeader(req.Header)
|
||||
newReq.URL = &loc
|
||||
if !h.UseRequestLocation {
|
||||
newReq.URL = &loc
|
||||
}
|
||||
|
||||
proxy := httputil.NewSingleHostReverseProxy(&url.URL{Scheme: h.Location.Scheme, Host: h.Location.Host})
|
||||
proxy.Transport = h.Transport
|
||||
@ -128,6 +159,7 @@ func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
|
||||
// tryUpgrade returns true if the request was handled.
|
||||
func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Request) bool {
|
||||
if !httpstream.IsUpgradeRequest(req) {
|
||||
glog.V(6).Infof("Request was not an upgrade")
|
||||
return false
|
||||
}
|
||||
|
||||
@ -137,18 +169,28 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques
|
||||
err error
|
||||
)
|
||||
|
||||
location := *h.Location
|
||||
if h.UseRequestLocation {
|
||||
location = *req.URL
|
||||
location.Scheme = h.Location.Scheme
|
||||
location.Host = h.Location.Host
|
||||
}
|
||||
|
||||
clone := utilnet.CloneRequest(req)
|
||||
// Only append X-Forwarded-For in the upgrade path, since httputil.NewSingleHostReverseProxy
|
||||
// handles this in the non-upgrade path.
|
||||
utilnet.AppendForwardedForHeader(clone)
|
||||
if h.InterceptRedirects {
|
||||
backendConn, rawResponse, err = utilnet.ConnectWithRedirects(req.Method, h.Location, clone.Header, req.Body, h)
|
||||
glog.V(6).Infof("Connecting to backend proxy (intercepting redirects) %s\n Headers: %v", &location, clone.Header)
|
||||
backendConn, rawResponse, err = utilnet.ConnectWithRedirects(req.Method, &location, clone.Header, req.Body, utilnet.DialerFunc(h.DialForUpgrade))
|
||||
} else {
|
||||
clone.URL = h.Location
|
||||
backendConn, err = h.Dial(clone)
|
||||
glog.V(6).Infof("Connecting to backend proxy (direct dial) %s\n Headers: %v", &location, clone.Header)
|
||||
clone.URL = &location
|
||||
backendConn, err = h.DialForUpgrade(clone)
|
||||
}
|
||||
if err != nil {
|
||||
h.Responder.Error(err)
|
||||
glog.V(6).Infof("Proxy connection error: %v", err)
|
||||
h.Responder.Error(w, req, err)
|
||||
return true
|
||||
}
|
||||
defer backendConn.Close()
|
||||
@ -157,18 +199,21 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques
|
||||
// hijacking should be the last step in the upgrade.
|
||||
requestHijacker, ok := w.(http.Hijacker)
|
||||
if !ok {
|
||||
h.Responder.Error(fmt.Errorf("request connection cannot be hijacked: %T", w))
|
||||
glog.V(6).Infof("Unable to hijack response writer: %T", w)
|
||||
h.Responder.Error(w, req, fmt.Errorf("request connection cannot be hijacked: %T", w))
|
||||
return true
|
||||
}
|
||||
requestHijackedConn, _, err := requestHijacker.Hijack()
|
||||
if err != nil {
|
||||
h.Responder.Error(fmt.Errorf("error hijacking request connection: %v", err))
|
||||
glog.V(6).Infof("Unable to hijack response: %v", err)
|
||||
h.Responder.Error(w, req, fmt.Errorf("error hijacking connection: %v", err))
|
||||
return true
|
||||
}
|
||||
defer requestHijackedConn.Close()
|
||||
|
||||
// Forward raw response bytes back to client.
|
||||
if len(rawResponse) > 0 {
|
||||
glog.V(6).Infof("Writing %d bytes to hijacked connection", len(rawResponse))
|
||||
if _, err = requestHijackedConn.Write(rawResponse); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Error proxying response from backend to client: %v", err))
|
||||
}
|
||||
@ -210,9 +255,20 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques
|
||||
return true
|
||||
}
|
||||
|
||||
// Dial dials the backend at req.URL and writes req to it.
|
||||
func (h *UpgradeAwareHandler) Dial(req *http.Request) (net.Conn, error) {
|
||||
conn, err := DialURL(req.URL, h.Transport)
|
||||
return dial(req, h.Transport)
|
||||
}
|
||||
|
||||
func (h *UpgradeAwareHandler) DialForUpgrade(req *http.Request) (net.Conn, error) {
|
||||
if h.UpgradeTransport != nil {
|
||||
return dial(req, h.UpgradeTransport)
|
||||
}
|
||||
return dial(req, h.Transport)
|
||||
}
|
||||
|
||||
// dial dials the backend at req.URL and writes req to it.
|
||||
func dial(req *http.Request, transport http.RoundTripper) (net.Conn, error) {
|
||||
conn, err := DialURL(req.URL, transport)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error dialing backend: %v", err)
|
||||
}
|
||||
|
@ -55,18 +55,14 @@ type fakeResponder struct {
|
||||
w http.ResponseWriter
|
||||
}
|
||||
|
||||
func (r *fakeResponder) Error(err error) {
|
||||
func (r *fakeResponder) Error(w http.ResponseWriter, req *http.Request, err error) {
|
||||
if r.called {
|
||||
r.t.Errorf("Error responder called again!\nprevious error: %v\nnew error: %v", r.err, err)
|
||||
}
|
||||
|
||||
if r.w != nil {
|
||||
r.w.WriteHeader(fakeStatusCode)
|
||||
_, writeErr := r.w.Write([]byte(err.Error()))
|
||||
assert.NoError(r.t, writeErr)
|
||||
} else {
|
||||
r.t.Logf("No ResponseWriter set")
|
||||
}
|
||||
w.WriteHeader(fakeStatusCode)
|
||||
_, writeErr := w.Write([]byte(err.Error()))
|
||||
assert.NoError(r.t, writeErr)
|
||||
|
||||
r.called = true
|
||||
r.err = err
|
||||
@ -459,7 +455,7 @@ type noErrorsAllowed struct {
|
||||
t *testing.T
|
||||
}
|
||||
|
||||
func (r *noErrorsAllowed) Error(err error) {
|
||||
func (r *noErrorsAllowed) Error(w http.ResponseWriter, req *http.Request, err error) {
|
||||
r.t.Error(err)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user