diff --git a/build/visible_to/BUILD b/build/visible_to/BUILD index ccba17fd2e1..df805b5a3e3 100644 --- a/build/visible_to/BUILD +++ b/build/visible_to/BUILD @@ -360,6 +360,7 @@ package_group( packages = [ "//pkg/kubectl", "//pkg/kubectl/cmd", + "//pkg/kubectl/proxy", ], ) diff --git a/pkg/kubectl/BUILD b/pkg/kubectl/BUILD index 9be20c52b5c..b2c14d17622 100644 --- a/pkg/kubectl/BUILD +++ b/pkg/kubectl/BUILD @@ -18,7 +18,6 @@ go_test( "generate_test.go", "kubectl_test.go", "namespace_test.go", - "proxy_server_test.go", "quota_test.go", "resource_filter_test.go", "rolebinding_test.go", @@ -96,7 +95,6 @@ go_library( "kubectl.go", "namespace.go", "pdb.go", - "proxy_server.go", "quota.go", "resource_filter.go", "rolebinding.go", @@ -194,6 +192,7 @@ filegroup( "//pkg/kubectl/cmd:all-srcs", "//pkg/kubectl/metricsutil:all-srcs", "//pkg/kubectl/plugins:all-srcs", + "//pkg/kubectl/proxy:all-srcs", "//pkg/kubectl/resource:all-srcs", "//pkg/kubectl/testing:all-srcs", "//pkg/kubectl/util:all-srcs", diff --git a/pkg/kubectl/cmd/BUILD b/pkg/kubectl/cmd/BUILD index 2f3d2196528..a062a31ad7f 100644 --- a/pkg/kubectl/cmd/BUILD +++ b/pkg/kubectl/cmd/BUILD @@ -91,6 +91,7 @@ go_library( "//pkg/kubectl/cmd/util/openapi:go_default_library", "//pkg/kubectl/metricsutil:go_default_library", "//pkg/kubectl/plugins:go_default_library", + "//pkg/kubectl/proxy:go_default_library", "//pkg/kubectl/resource:go_default_library", "//pkg/kubectl/util:go_default_library", "//pkg/kubectl/util/term:go_default_library", diff --git a/pkg/kubectl/cmd/proxy.go b/pkg/kubectl/cmd/proxy.go index 667841d6618..979d8f8aa23 100644 --- a/pkg/kubectl/cmd/proxy.go +++ b/pkg/kubectl/cmd/proxy.go @@ -26,9 +26,9 @@ import ( "github.com/golang/glog" "github.com/spf13/cobra" - "k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/pkg/kubectl/cmd/templates" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" + "k8s.io/kubernetes/pkg/kubectl/proxy" "k8s.io/kubernetes/pkg/util/i18n" ) @@ -83,10 +83,10 @@ func NewCmdProxy(f cmdutil.Factory, out io.Writer) *cobra.Command { cmd.Flags().StringP("www", "w", "", "Also serve static files from the given directory under the specified prefix.") cmd.Flags().StringP("www-prefix", "P", "/static/", "Prefix to serve static files under, if static file directory is specified.") cmd.Flags().StringP("api-prefix", "", "/", "Prefix to serve the proxied API under.") - cmd.Flags().String("accept-paths", kubectl.DefaultPathAcceptRE, "Regular expression for paths that the proxy should accept.") - cmd.Flags().String("reject-paths", kubectl.DefaultPathRejectRE, "Regular expression for paths that the proxy should reject. Paths specified here will be rejected even accepted by --accept-paths.") - cmd.Flags().String("accept-hosts", kubectl.DefaultHostAcceptRE, "Regular expression for hosts that the proxy should accept.") - cmd.Flags().String("reject-methods", kubectl.DefaultMethodRejectRE, "Regular expression for HTTP methods that the proxy should reject (example --reject-methods='POST,PUT,PATCH'). ") + cmd.Flags().String("accept-paths", proxy.DefaultPathAcceptRE, "Regular expression for paths that the proxy should accept.") + cmd.Flags().String("reject-paths", proxy.DefaultPathRejectRE, "Regular expression for paths that the proxy should reject. Paths specified here will be rejected even accepted by --accept-paths.") + cmd.Flags().String("accept-hosts", proxy.DefaultHostAcceptRE, "Regular expression for hosts that the proxy should accept.") + cmd.Flags().String("reject-methods", proxy.DefaultMethodRejectRE, "Regular expression for HTTP methods that the proxy should reject (example --reject-methods='POST,PUT,PATCH'). ") cmd.Flags().IntP("port", "p", defaultPort, "The port on which to run the proxy. Set to 0 to pick a random port.") cmd.Flags().StringP("address", "", "127.0.0.1", "The IP address on which to serve on.") cmd.Flags().Bool("disable-filter", false, "If true, disable request filtering in the proxy. This is dangerous, and can leave you vulnerable to XSRF attacks, when used with an accessible port.") @@ -126,11 +126,11 @@ func RunProxy(f cmdutil.Factory, out io.Writer, cmd *cobra.Command) error { if !strings.HasSuffix(apiProxyPrefix, "/") { apiProxyPrefix += "/" } - filter := &kubectl.FilterServer{ - AcceptPaths: kubectl.MakeRegexpArrayOrDie(cmdutil.GetFlagString(cmd, "accept-paths")), - RejectPaths: kubectl.MakeRegexpArrayOrDie(cmdutil.GetFlagString(cmd, "reject-paths")), - AcceptHosts: kubectl.MakeRegexpArrayOrDie(cmdutil.GetFlagString(cmd, "accept-hosts")), - RejectMethods: kubectl.MakeRegexpArrayOrDie(cmdutil.GetFlagString(cmd, "reject-methods")), + filter := &proxy.FilterServer{ + AcceptPaths: proxy.MakeRegexpArrayOrDie(cmdutil.GetFlagString(cmd, "accept-paths")), + RejectPaths: proxy.MakeRegexpArrayOrDie(cmdutil.GetFlagString(cmd, "reject-paths")), + AcceptHosts: proxy.MakeRegexpArrayOrDie(cmdutil.GetFlagString(cmd, "accept-hosts")), + RejectMethods: proxy.MakeRegexpArrayOrDie(cmdutil.GetFlagString(cmd, "reject-methods")), } if cmdutil.GetFlagBool(cmd, "disable-filter") { if path == "" { @@ -139,7 +139,7 @@ func RunProxy(f cmdutil.Factory, out io.Writer, cmd *cobra.Command) error { filter = nil } - server, err := kubectl.NewProxyServer(staticDir, apiProxyPrefix, staticPrefix, filter, clientConfig) + server, err := proxy.NewServer(staticDir, apiProxyPrefix, staticPrefix, filter, clientConfig) // Separate listening from serving so we can report the bound port // when it is chosen by os (eg: port == 0) @@ -152,7 +152,7 @@ func RunProxy(f cmdutil.Factory, out io.Writer, cmd *cobra.Command) error { if err != nil { glog.Fatal(err) } - fmt.Fprintf(out, "Starting to serve on %s", l.Addr().String()) + fmt.Fprintf(out, "Starting to serve on %s\n", l.Addr().String()) glog.Fatal(server.ServeOnListener(l)) return nil } diff --git a/pkg/kubectl/proxy/BUILD b/pkg/kubectl/proxy/BUILD new file mode 100644 index 00000000000..53dee7eaef9 --- /dev/null +++ b/pkg/kubectl/proxy/BUILD @@ -0,0 +1,47 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_test( + name = "go_default_test", + srcs = ["proxy_server_test.go"], + library = ":go_default_library", + tags = ["automanaged"], + deps = [ + "//vendor/k8s.io/apimachinery/pkg/util/proxy:go_default_library", + "//vendor/k8s.io/client-go/rest:go_default_library", + ], +) + +go_library( + name = "go_default_library", + srcs = ["proxy_server.go"], + tags = ["automanaged"], + deps = [ + "//pkg/kubectl/util:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/proxy:go_default_library", + "//vendor/k8s.io/client-go/rest:go_default_library", + "//vendor/k8s.io/client-go/transport:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/pkg/kubectl/proxy_server.go b/pkg/kubectl/proxy/proxy_server.go similarity index 69% rename from pkg/kubectl/proxy_server.go rename to pkg/kubectl/proxy/proxy_server.go index f1712287ee9..3e323c8dc41 100644 --- a/pkg/kubectl/proxy_server.go +++ b/pkg/kubectl/proxy/proxy_server.go @@ -14,13 +14,12 @@ See the License for the specific language governing permissions and limitations under the License. */ -package kubectl +package proxy import ( "fmt" "net" "net/http" - "net/http/httputil" "net/url" "os" "regexp" @@ -28,19 +27,26 @@ import ( "time" "github.com/golang/glog" - restclient "k8s.io/client-go/rest" + utilnet "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apimachinery/pkg/util/proxy" + "k8s.io/client-go/rest" + "k8s.io/client-go/transport" "k8s.io/kubernetes/pkg/kubectl/util" ) const ( - DefaultHostAcceptRE = "^localhost$,^127\\.0\\.0\\.1$,^\\[::1\\]$" - DefaultPathAcceptRE = "^.*" - DefaultPathRejectRE = "^/api/.*/pods/.*/exec,^/api/.*/pods/.*/attach" + // DefaultHostAcceptRE is the default value for which hosts to accept. + DefaultHostAcceptRE = "^localhost$,^127\\.0\\.0\\.1$,^\\[::1\\]$" + // DefaultPathAcceptRE is the default path to accept. + DefaultPathAcceptRE = "^.*" + // DefaultPathRejectRE is the default set of paths to reject. + DefaultPathRejectRE = "^/api/.*/pods/.*/exec,^/api/.*/pods/.*/attach" + // DefaultMethodRejectRE is the set of HTTP methods to reject by default. DefaultMethodRejectRE = "^$" ) var ( - // The reverse proxy will periodically flush the io writer at this frequency. + // ReverseProxyFlushInterval is the frequency to flush the reverse proxy. // Only matters for long poll connections like the one used to watch. With an // interval of 0 the reverse proxy will buffer content sent on any connection // with transfer-encoding=chunked. @@ -63,7 +69,7 @@ type FilterServer struct { delegate http.Handler } -// Splits a comma separated list of regexps into an array of Regexp objects. +// MakeRegexpArray splits a comma separated list of regexps into an array of Regexp objects. func MakeRegexpArray(str string) ([]*regexp.Regexp, error) { parts := strings.Split(str, ",") result := make([]*regexp.Regexp, len(parts)) @@ -77,6 +83,7 @@ func MakeRegexpArray(str string) ([]*regexp.Regexp, error) { return result, nil } +// MakeRegexpArrayOrDie creates an array of regular expression objects from a string or exits. func MakeRegexpArrayOrDie(str string) []*regexp.Regexp { result, err := MakeRegexpArray(str) if err != nil { @@ -137,15 +144,38 @@ func (f *FilterServer) ServeHTTP(rw http.ResponseWriter, req *http.Request) { rw.Write([]byte("

Unauthorized

")) } -// ProxyServer is a http.Handler which proxies Kubernetes APIs to remote API server. -type ProxyServer struct { +// Server is a http.Handler which proxies Kubernetes APIs to remote API server. +type Server struct { handler http.Handler } -// NewProxyServer creates and installs a new ProxyServer. -// It automatically registers the created ProxyServer to http.DefaultServeMux. +type responder struct{} + +func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) { + glog.Errorf("Error while proxying request: %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) +} + +// makeUpgradeTransport creates a transport that explicitly bypasses HTTP2 support +// for proxy connections that must upgrade. +func makeUpgradeTransport(config *rest.Config) (http.RoundTripper, error) { + transportConfig, err := config.TransportConfig() + if err != nil { + return nil, err + } + tlsConfig, err := transport.TLSConfigFor(transportConfig) + if err != nil { + return nil, err + } + rt := utilnet.SetOldTransportDefaults(&http.Transport{ + TLSClientConfig: tlsConfig, + }) + return transport.HTTPWrappersForConfig(transportConfig, rt) +} + +// NewServer creates and installs a new Server. // 'filter', if non-nil, protects requests to the api only. -func NewProxyServer(filebase string, apiProxyPrefix string, staticPrefix string, filter *FilterServer, cfg *restclient.Config) (*ProxyServer, error) { +func NewServer(filebase string, apiProxyPrefix string, staticPrefix string, filter *FilterServer, cfg *rest.Config) (*Server, error) { host := cfg.Host if !strings.HasSuffix(host, "/") { host = host + "/" @@ -154,10 +184,20 @@ func NewProxyServer(filebase string, apiProxyPrefix string, staticPrefix string, if err != nil { return nil, err } - proxy := newProxy(target) - if proxy.Transport, err = restclient.TransportFor(cfg); err != nil { + + responder := &responder{} + transport, err := rest.TransportFor(cfg) + if err != nil { return nil, err } + upgradeTransport, err := makeUpgradeTransport(cfg) + if err != nil { + return nil, err + } + proxy := proxy.NewUpgradeAwareHandler(target, transport, false, false, responder) + proxy.UpgradeTransport = upgradeTransport + proxy.UseRequestLocation = true + proxyServer := http.Handler(proxy) if filter != nil { proxyServer = filter.HandlerFor(proxyServer) @@ -174,16 +214,16 @@ func NewProxyServer(filebase string, apiProxyPrefix string, staticPrefix string, // serving their working directory by default. mux.Handle(staticPrefix, newFileHandler(staticPrefix, filebase)) } - return &ProxyServer{handler: mux}, nil + return &Server{handler: mux}, nil } // Listen is a simple wrapper around net.Listen. -func (s *ProxyServer) Listen(address string, port int) (net.Listener, error) { +func (s *Server) Listen(address string, port int) (net.Listener, error) { return net.Listen("tcp", fmt.Sprintf("%s:%d", address, port)) } // ListenUnix does net.Listen for a unix socket -func (s *ProxyServer) ListenUnix(path string) (net.Listener, error) { +func (s *Server) ListenUnix(path string) (net.Listener, error) { // Remove any socket, stale or not, but fall through for other files fi, err := os.Stat(path) if err == nil && (fi.Mode()&os.ModeSocket) != 0 { @@ -196,23 +236,14 @@ func (s *ProxyServer) ListenUnix(path string) (net.Listener, error) { return l, err } -// Serve starts the server using given listener, loops forever. -func (s *ProxyServer) ServeOnListener(l net.Listener) error { +// ServeOnListener starts the server using given listener, loops forever. +func (s *Server) ServeOnListener(l net.Listener) error { server := http.Server{ Handler: s.handler, } return server.Serve(l) } -func newProxy(target *url.URL) *httputil.ReverseProxy { - director := func(req *http.Request) { - req.URL.Scheme = target.Scheme - req.URL.Host = target.Host - req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path) - } - return &httputil.ReverseProxy{Director: director, FlushInterval: ReverseProxyFlushInterval} -} - func newFileHandler(prefix, base string) http.Handler { return http.StripPrefix(prefix, http.FileServer(http.Dir(base))) } diff --git a/pkg/kubectl/proxy_server_test.go b/pkg/kubectl/proxy/proxy_server_test.go similarity index 96% rename from pkg/kubectl/proxy_server_test.go rename to pkg/kubectl/proxy/proxy_server_test.go index 394259070a7..4a85501e2c1 100644 --- a/pkg/kubectl/proxy_server_test.go +++ b/pkg/kubectl/proxy/proxy_server_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package kubectl +package proxy import ( "fmt" @@ -27,7 +27,8 @@ import ( "strings" "testing" - restclient "k8s.io/client-go/rest" + "k8s.io/apimachinery/pkg/util/proxy" + "k8s.io/client-go/rest" ) func TestAccept(t *testing.T) { @@ -340,6 +341,12 @@ func TestFileServing(t *testing.T) { } } +func newProxy(target *url.URL) http.Handler { + p := proxy.NewUpgradeAwareHandler(target, http.DefaultTransport, false, false, &responder{}) + p.UseRequestLocation = true + return p +} + func TestAPIRequests(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { b, err := ioutil.ReadAll(r.Body) @@ -353,6 +360,7 @@ func TestAPIRequests(t *testing.T) { // httptest.NewServer should always generate a valid URL. target, _ := url.Parse(ts.URL) + target.Path = "/" proxy := newProxy(target) tests := []struct{ method, body string }{ @@ -404,13 +412,13 @@ func TestPathHandling(t *testing.T) { {"/custom/", "/custom/api/v1/pods/", "/api/v1/pods/"}, } - cc := &restclient.Config{ + cc := &rest.Config{ Host: ts.URL, } for _, item := range table { func() { - p, err := NewProxyServer("", item.prefix, "/not/used/for/this/test", nil, cc) + p, err := NewServer("", item.prefix, "/not/used/for/this/test", nil, cc) if err != nil { t.Fatalf("%#v: %v", item, err) } diff --git a/pkg/kubelet/util/BUILD b/pkg/kubelet/util/BUILD index abfb8491465..b8942ee512c 100644 --- a/pkg/kubelet/util/BUILD +++ b/pkg/kubelet/util/BUILD @@ -8,6 +8,14 @@ load( "go_test", ) +go_test( + name = "go_default_test", + srcs = ["util_test.go"], + library = ":go_default_library", + tags = ["automanaged"], + deps = ["//vendor/github.com/stretchr/testify/assert:go_default_library"], +) + go_library( name = "go_default_library", srcs = [ @@ -23,14 +31,6 @@ go_library( ], ) -go_test( - name = "go_default_test", - srcs = ["util_test.go"], - library = ":go_default_library", - tags = ["automanaged"], - deps = ["//vendor/github.com/stretchr/testify/assert:go_default_library"], -) - filegroup( name = "package-srcs", srcs = glob(["**"]), diff --git a/pkg/registry/core/node/rest/proxy.go b/pkg/registry/core/node/rest/proxy.go index 7535acade1a..5da6392c60a 100644 --- a/pkg/registry/core/node/rest/proxy.go +++ b/pkg/registry/core/node/rest/proxy.go @@ -76,7 +76,7 @@ func (r *ProxyREST) Connect(ctx genericapirequest.Context, id string, opts runti } func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder rest.Responder) *proxy.UpgradeAwareHandler { - handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, responder) + handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder)) handler.MaxBytesPerSec = capabilities.Get().PerConnectionBandwidthLimitBytesPerSec return handler } diff --git a/pkg/registry/core/pod/rest/subresources.go b/pkg/registry/core/pod/rest/subresources.go index ec6f7ac4857..76e789a5a6f 100644 --- a/pkg/registry/core/pod/rest/subresources.go +++ b/pkg/registry/core/pod/rest/subresources.go @@ -192,7 +192,7 @@ func (r *PortForwardREST) Connect(ctx genericapirequest.Context, name string, op } func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired, interceptRedirects bool, responder rest.Responder) *proxy.UpgradeAwareHandler { - handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, responder) + handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder)) handler.InterceptRedirects = interceptRedirects && utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StreamingProxyRedirects) handler.MaxBytesPerSec = capabilities.Get().PerConnectionBandwidthLimitBytesPerSec return handler diff --git a/pkg/registry/core/service/proxy.go b/pkg/registry/core/service/proxy.go index 7875e58a69a..4b24847478e 100644 --- a/pkg/registry/core/service/proxy.go +++ b/pkg/registry/core/service/proxy.go @@ -72,7 +72,7 @@ func (r *ProxyREST) Connect(ctx genericapirequest.Context, id string, opts runti } func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder rest.Responder) *proxy.UpgradeAwareHandler { - handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, responder) + handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder)) handler.MaxBytesPerSec = capabilities.Get().PerConnectionBandwidthLimitBytesPerSec return handler } diff --git a/staging/src/k8s.io/apimachinery/pkg/util/net/http.go b/staging/src/k8s.io/apimachinery/pkg/util/net/http.go index adb80813be2..a48f16543c9 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/net/http.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/net/http.go @@ -277,6 +277,13 @@ func NewProxierWithNoProxyCIDR(delegate func(req *http.Request) (*url.URL, error } } +// DialerFunc implements Dialer for the provided function. +type DialerFunc func(req *http.Request) (net.Conn, error) + +func (fn DialerFunc) Dial(req *http.Request) (net.Conn, error) { + return fn(req) +} + // Dialer dials a host and writes a request to it. type Dialer interface { // Dial connects to the host specified by req's URL, writes the request to the connection, and diff --git a/staging/src/k8s.io/apimachinery/pkg/util/proxy/dial.go b/staging/src/k8s.io/apimachinery/pkg/util/proxy/dial.go index e7373190940..3da7e965f53 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/proxy/dial.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/proxy/dial.go @@ -32,7 +32,10 @@ import ( func DialURL(url *url.URL, transport http.RoundTripper) (net.Conn, error) { dialAddr := netutil.CanonicalAddr(url) - dialer, _ := utilnet.DialerFor(transport) + dialer, err := utilnet.DialerFor(transport) + if err != nil { + glog.V(5).Infof("Unable to unwrap transport %T to get dialer: %v", transport, err) + } switch url.Scheme { case "http": @@ -45,7 +48,10 @@ func DialURL(url *url.URL, transport http.RoundTripper) (net.Conn, error) { var tlsConfig *tls.Config var tlsConn *tls.Conn var err error - tlsConfig, _ = utilnet.TLSClientConfig(transport) + tlsConfig, err = utilnet.TLSClientConfig(transport) + if err != nil { + glog.V(5).Infof("Unable to unwrap transport %T to get at TLS config: %v", transport, err) + } if dialer != nil { // We have a dialer; use it to open the connection, then diff --git a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go index ff04578e29a..d443915d889 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go @@ -39,18 +39,29 @@ import ( // UpgradeAwareHandler is a handler for proxy requests that may require an upgrade type UpgradeAwareHandler struct { + // UpgradeRequired will reject non-upgrade connections if true. UpgradeRequired bool - Location *url.URL + // Location is the location of the upstream proxy. It is used as the location to Dial on the upstream server + // for upgrade requests unless UseRequestLocationOnUpgrade is true. + Location *url.URL // Transport provides an optional round tripper to use to proxy. If nil, the default proxy transport is used Transport http.RoundTripper + // UpgradeTransport, if specified, will be used as the backend transport when upgrade requests are provided. + // This allows clients to disable HTTP/2. + UpgradeTransport http.RoundTripper // WrapTransport indicates whether the provided Transport should be wrapped with default proxy transport behavior (URL rewriting, X-Forwarded-* header setting) WrapTransport bool // InterceptRedirects determines whether the proxy should sniff backend responses for redirects, // following them as necessary. InterceptRedirects bool - FlushInterval time.Duration - MaxBytesPerSec int64 - Responder ErrorResponder + // UseRequestLocation will use the incoming request URL when talking to the backend server. + UseRequestLocation bool + // FlushInterval controls how often the standard HTTP proxy will flush content from the upstream. + FlushInterval time.Duration + // MaxBytesPerSec controls the maximum rate for an upstream connection. No rate is imposed if the value is zero. + MaxBytesPerSec int64 + // Responder is passed errors that occur while setting up proxying. + Responder ErrorResponder } const defaultFlushInterval = 200 * time.Millisecond @@ -58,9 +69,27 @@ const defaultFlushInterval = 200 * time.Millisecond // ErrorResponder abstracts error reporting to the proxy handler to remove the need to hardcode a particular // error format. type ErrorResponder interface { + Error(w http.ResponseWriter, req *http.Request, err error) +} + +// SimpleErrorResponder is the legacy implementation of ErrorResponder for callers that only +// service a single request/response per proxy. +type SimpleErrorResponder interface { Error(err error) } +func NewErrorResponder(r SimpleErrorResponder) ErrorResponder { + return simpleResponder{r} +} + +type simpleResponder struct { + responder SimpleErrorResponder +} + +func (r simpleResponder) Error(w http.ResponseWriter, req *http.Request, err error) { + r.responder.Error(err) +} + // NewUpgradeAwareHandler creates a new proxy handler with a default flush interval. Responder is required for returning // errors to the caller. func NewUpgradeAwareHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder ErrorResponder) *UpgradeAwareHandler { @@ -83,7 +112,7 @@ func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request return } if h.UpgradeRequired { - h.Responder.Error(errors.NewBadRequest("Upgrade request required")) + h.Responder.Error(w, req, errors.NewBadRequest("Upgrade request required")) return } @@ -117,7 +146,9 @@ func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request // WithContext creates a shallow clone of the request with the new context. newReq := req.WithContext(context.Background()) newReq.Header = utilnet.CloneHeader(req.Header) - newReq.URL = &loc + if !h.UseRequestLocation { + newReq.URL = &loc + } proxy := httputil.NewSingleHostReverseProxy(&url.URL{Scheme: h.Location.Scheme, Host: h.Location.Host}) proxy.Transport = h.Transport @@ -128,6 +159,7 @@ func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request // tryUpgrade returns true if the request was handled. func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Request) bool { if !httpstream.IsUpgradeRequest(req) { + glog.V(6).Infof("Request was not an upgrade") return false } @@ -137,18 +169,28 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques err error ) + location := *h.Location + if h.UseRequestLocation { + location = *req.URL + location.Scheme = h.Location.Scheme + location.Host = h.Location.Host + } + clone := utilnet.CloneRequest(req) // Only append X-Forwarded-For in the upgrade path, since httputil.NewSingleHostReverseProxy // handles this in the non-upgrade path. utilnet.AppendForwardedForHeader(clone) if h.InterceptRedirects { - backendConn, rawResponse, err = utilnet.ConnectWithRedirects(req.Method, h.Location, clone.Header, req.Body, h) + glog.V(6).Infof("Connecting to backend proxy (intercepting redirects) %s\n Headers: %v", &location, clone.Header) + backendConn, rawResponse, err = utilnet.ConnectWithRedirects(req.Method, &location, clone.Header, req.Body, utilnet.DialerFunc(h.DialForUpgrade)) } else { - clone.URL = h.Location - backendConn, err = h.Dial(clone) + glog.V(6).Infof("Connecting to backend proxy (direct dial) %s\n Headers: %v", &location, clone.Header) + clone.URL = &location + backendConn, err = h.DialForUpgrade(clone) } if err != nil { - h.Responder.Error(err) + glog.V(6).Infof("Proxy connection error: %v", err) + h.Responder.Error(w, req, err) return true } defer backendConn.Close() @@ -157,18 +199,21 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques // hijacking should be the last step in the upgrade. requestHijacker, ok := w.(http.Hijacker) if !ok { - h.Responder.Error(fmt.Errorf("request connection cannot be hijacked: %T", w)) + glog.V(6).Infof("Unable to hijack response writer: %T", w) + h.Responder.Error(w, req, fmt.Errorf("request connection cannot be hijacked: %T", w)) return true } requestHijackedConn, _, err := requestHijacker.Hijack() if err != nil { - h.Responder.Error(fmt.Errorf("error hijacking request connection: %v", err)) + glog.V(6).Infof("Unable to hijack response: %v", err) + h.Responder.Error(w, req, fmt.Errorf("error hijacking connection: %v", err)) return true } defer requestHijackedConn.Close() // Forward raw response bytes back to client. if len(rawResponse) > 0 { + glog.V(6).Infof("Writing %d bytes to hijacked connection", len(rawResponse)) if _, err = requestHijackedConn.Write(rawResponse); err != nil { utilruntime.HandleError(fmt.Errorf("Error proxying response from backend to client: %v", err)) } @@ -210,9 +255,20 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques return true } -// Dial dials the backend at req.URL and writes req to it. func (h *UpgradeAwareHandler) Dial(req *http.Request) (net.Conn, error) { - conn, err := DialURL(req.URL, h.Transport) + return dial(req, h.Transport) +} + +func (h *UpgradeAwareHandler) DialForUpgrade(req *http.Request) (net.Conn, error) { + if h.UpgradeTransport != nil { + return dial(req, h.UpgradeTransport) + } + return dial(req, h.Transport) +} + +// dial dials the backend at req.URL and writes req to it. +func dial(req *http.Request, transport http.RoundTripper) (net.Conn, error) { + conn, err := DialURL(req.URL, transport) if err != nil { return nil, fmt.Errorf("error dialing backend: %v", err) } diff --git a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware_test.go b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware_test.go index 3e6f11237d4..985d2208548 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware_test.go @@ -55,18 +55,14 @@ type fakeResponder struct { w http.ResponseWriter } -func (r *fakeResponder) Error(err error) { +func (r *fakeResponder) Error(w http.ResponseWriter, req *http.Request, err error) { if r.called { r.t.Errorf("Error responder called again!\nprevious error: %v\nnew error: %v", r.err, err) } - if r.w != nil { - r.w.WriteHeader(fakeStatusCode) - _, writeErr := r.w.Write([]byte(err.Error())) - assert.NoError(r.t, writeErr) - } else { - r.t.Logf("No ResponseWriter set") - } + w.WriteHeader(fakeStatusCode) + _, writeErr := w.Write([]byte(err.Error())) + assert.NoError(r.t, writeErr) r.called = true r.err = err @@ -459,7 +455,7 @@ type noErrorsAllowed struct { t *testing.T } -func (r *noErrorsAllowed) Error(err error) { +func (r *noErrorsAllowed) Error(w http.ResponseWriter, req *http.Request, err error) { r.t.Error(err) } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go index 945051582df..7c179ab7cf3 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go @@ -181,7 +181,7 @@ func (r *responder) Object(statusCode int, obj runtime.Object) { responsewriters.WriteRawJSON(statusCode, obj, r.w) } -func (r *responder) Error(err error) { +func (r *responder) Error(_ http.ResponseWriter, _ *http.Request, err error) { http.Error(r.w, err.Error(), http.StatusInternalServerError) } diff --git a/test/e2e/kubectl/kubectl.go b/test/e2e/kubectl/kubectl.go index 8f15cfdf7b6..0a5d54d1073 100644 --- a/test/e2e/kubectl/kubectl.go +++ b/test/e2e/kubectl/kubectl.go @@ -437,9 +437,6 @@ var _ = SIGDescribe("Kubectl client", func() { }) It("should support exec through an HTTP proxy", func() { - // Note: We are skipping local since we want to verify an apiserver with HTTPS. - // At this time local only supports plain HTTP. - framework.SkipIfProviderIs("local") // Fail if the variable isn't set if framework.TestContext.Host == "" { framework.Failf("--host variable must be set to the full URI to the api server on e2e run.") @@ -473,6 +470,32 @@ var _ = SIGDescribe("Kubectl client", func() { } }) + It("should support exec through kubectl proxy", func() { + // Fail if the variable isn't set + if framework.TestContext.Host == "" { + framework.Failf("--host variable must be set to the full URI to the api server on e2e run.") + } + + By("Starting kubectl proxy") + port, proxyCmd, err := startProxyServer() + framework.ExpectNoError(err) + defer framework.TryKill(proxyCmd) + + //proxyLogs.Reset() + host := fmt.Sprintf("--server=http://127.0.0.1:%d", port) + By("Running kubectl via kubectl proxy using " + host) + output := framework.NewKubectlCommand( + host, fmt.Sprintf("--namespace=%s", ns), + "exec", "nginx", "echo", "running", "in", "container", + ).ExecOrDie() + + // Verify we got the normal output captured by the exec server + expectedExecOutput := "running in container\n" + if output != expectedExecOutput { + framework.Failf("Unexpected kubectl exec output. Wanted %q, got %q", expectedExecOutput, output) + } + }) + It("should return command exit codes", func() { framework.SkipUnlessKubectlVersionGTE(kubectlContainerExitCodeVersion) nsFlag := fmt.Sprintf("--namespace=%v", ns) @@ -1758,7 +1781,7 @@ func getAPIVersions(apiEndpoint string) (*metav1.APIVersions, error) { func startProxyServer() (int, *exec.Cmd, error) { // Specifying port 0 indicates we want the os to pick a random port. - cmd := framework.KubectlCmd("proxy", "-p", "0") + cmd := framework.KubectlCmd("proxy", "-p", "0", "--disable-filter") stdout, stderr, err := framework.StartCmdAndStreamOutput(cmd) if err != nil { return -1, nil, err