mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
Merge pull request #49534 from smarterclayton/separate_proxy
Automatic merge from submit-queue Support exec/attach/portforward in `kubectl proxy` Use the UpgradeAwareProxy shared code in kubectl proxy. Provide a separate transport for those requests that does not have HTTP/2 enabled. Refactor the code to be a bit cleaner in places and to better separate changes. Fixes #32026 ```release-note `kubectl proxy` will now correctly handle the `exec`, `attach`, and `portforward` commands. You must pass `--disable-filter` to the command in order to allow these endpoints. ```
This commit is contained in:
commit
da549596c4
@ -360,6 +360,7 @@ package_group(
|
||||
packages = [
|
||||
"//pkg/kubectl",
|
||||
"//pkg/kubectl/cmd",
|
||||
"//pkg/kubectl/proxy",
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -18,7 +18,6 @@ go_test(
|
||||
"generate_test.go",
|
||||
"kubectl_test.go",
|
||||
"namespace_test.go",
|
||||
"proxy_server_test.go",
|
||||
"quota_test.go",
|
||||
"resource_filter_test.go",
|
||||
"rolebinding_test.go",
|
||||
@ -96,7 +95,6 @@ go_library(
|
||||
"kubectl.go",
|
||||
"namespace.go",
|
||||
"pdb.go",
|
||||
"proxy_server.go",
|
||||
"quota.go",
|
||||
"resource_filter.go",
|
||||
"rolebinding.go",
|
||||
@ -194,6 +192,7 @@ filegroup(
|
||||
"//pkg/kubectl/cmd:all-srcs",
|
||||
"//pkg/kubectl/metricsutil:all-srcs",
|
||||
"//pkg/kubectl/plugins:all-srcs",
|
||||
"//pkg/kubectl/proxy:all-srcs",
|
||||
"//pkg/kubectl/resource:all-srcs",
|
||||
"//pkg/kubectl/testing:all-srcs",
|
||||
"//pkg/kubectl/util:all-srcs",
|
||||
|
@ -91,6 +91,7 @@ go_library(
|
||||
"//pkg/kubectl/cmd/util/openapi:go_default_library",
|
||||
"//pkg/kubectl/metricsutil:go_default_library",
|
||||
"//pkg/kubectl/plugins:go_default_library",
|
||||
"//pkg/kubectl/proxy:go_default_library",
|
||||
"//pkg/kubectl/resource:go_default_library",
|
||||
"//pkg/kubectl/util:go_default_library",
|
||||
"//pkg/kubectl/util/term:go_default_library",
|
||||
|
@ -26,9 +26,9 @@ import (
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/spf13/cobra"
|
||||
"k8s.io/kubernetes/pkg/kubectl"
|
||||
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
|
||||
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
||||
"k8s.io/kubernetes/pkg/kubectl/proxy"
|
||||
"k8s.io/kubernetes/pkg/util/i18n"
|
||||
)
|
||||
|
||||
@ -83,10 +83,10 @@ func NewCmdProxy(f cmdutil.Factory, out io.Writer) *cobra.Command {
|
||||
cmd.Flags().StringP("www", "w", "", "Also serve static files from the given directory under the specified prefix.")
|
||||
cmd.Flags().StringP("www-prefix", "P", "/static/", "Prefix to serve static files under, if static file directory is specified.")
|
||||
cmd.Flags().StringP("api-prefix", "", "/", "Prefix to serve the proxied API under.")
|
||||
cmd.Flags().String("accept-paths", kubectl.DefaultPathAcceptRE, "Regular expression for paths that the proxy should accept.")
|
||||
cmd.Flags().String("reject-paths", kubectl.DefaultPathRejectRE, "Regular expression for paths that the proxy should reject. Paths specified here will be rejected even accepted by --accept-paths.")
|
||||
cmd.Flags().String("accept-hosts", kubectl.DefaultHostAcceptRE, "Regular expression for hosts that the proxy should accept.")
|
||||
cmd.Flags().String("reject-methods", kubectl.DefaultMethodRejectRE, "Regular expression for HTTP methods that the proxy should reject (example --reject-methods='POST,PUT,PATCH'). ")
|
||||
cmd.Flags().String("accept-paths", proxy.DefaultPathAcceptRE, "Regular expression for paths that the proxy should accept.")
|
||||
cmd.Flags().String("reject-paths", proxy.DefaultPathRejectRE, "Regular expression for paths that the proxy should reject. Paths specified here will be rejected even accepted by --accept-paths.")
|
||||
cmd.Flags().String("accept-hosts", proxy.DefaultHostAcceptRE, "Regular expression for hosts that the proxy should accept.")
|
||||
cmd.Flags().String("reject-methods", proxy.DefaultMethodRejectRE, "Regular expression for HTTP methods that the proxy should reject (example --reject-methods='POST,PUT,PATCH'). ")
|
||||
cmd.Flags().IntP("port", "p", defaultPort, "The port on which to run the proxy. Set to 0 to pick a random port.")
|
||||
cmd.Flags().StringP("address", "", "127.0.0.1", "The IP address on which to serve on.")
|
||||
cmd.Flags().Bool("disable-filter", false, "If true, disable request filtering in the proxy. This is dangerous, and can leave you vulnerable to XSRF attacks, when used with an accessible port.")
|
||||
@ -126,11 +126,11 @@ func RunProxy(f cmdutil.Factory, out io.Writer, cmd *cobra.Command) error {
|
||||
if !strings.HasSuffix(apiProxyPrefix, "/") {
|
||||
apiProxyPrefix += "/"
|
||||
}
|
||||
filter := &kubectl.FilterServer{
|
||||
AcceptPaths: kubectl.MakeRegexpArrayOrDie(cmdutil.GetFlagString(cmd, "accept-paths")),
|
||||
RejectPaths: kubectl.MakeRegexpArrayOrDie(cmdutil.GetFlagString(cmd, "reject-paths")),
|
||||
AcceptHosts: kubectl.MakeRegexpArrayOrDie(cmdutil.GetFlagString(cmd, "accept-hosts")),
|
||||
RejectMethods: kubectl.MakeRegexpArrayOrDie(cmdutil.GetFlagString(cmd, "reject-methods")),
|
||||
filter := &proxy.FilterServer{
|
||||
AcceptPaths: proxy.MakeRegexpArrayOrDie(cmdutil.GetFlagString(cmd, "accept-paths")),
|
||||
RejectPaths: proxy.MakeRegexpArrayOrDie(cmdutil.GetFlagString(cmd, "reject-paths")),
|
||||
AcceptHosts: proxy.MakeRegexpArrayOrDie(cmdutil.GetFlagString(cmd, "accept-hosts")),
|
||||
RejectMethods: proxy.MakeRegexpArrayOrDie(cmdutil.GetFlagString(cmd, "reject-methods")),
|
||||
}
|
||||
if cmdutil.GetFlagBool(cmd, "disable-filter") {
|
||||
if path == "" {
|
||||
@ -139,7 +139,7 @@ func RunProxy(f cmdutil.Factory, out io.Writer, cmd *cobra.Command) error {
|
||||
filter = nil
|
||||
}
|
||||
|
||||
server, err := kubectl.NewProxyServer(staticDir, apiProxyPrefix, staticPrefix, filter, clientConfig)
|
||||
server, err := proxy.NewServer(staticDir, apiProxyPrefix, staticPrefix, filter, clientConfig)
|
||||
|
||||
// Separate listening from serving so we can report the bound port
|
||||
// when it is chosen by os (eg: port == 0)
|
||||
@ -152,7 +152,7 @@ func RunProxy(f cmdutil.Factory, out io.Writer, cmd *cobra.Command) error {
|
||||
if err != nil {
|
||||
glog.Fatal(err)
|
||||
}
|
||||
fmt.Fprintf(out, "Starting to serve on %s", l.Addr().String())
|
||||
fmt.Fprintf(out, "Starting to serve on %s\n", l.Addr().String())
|
||||
glog.Fatal(server.ServeOnListener(l))
|
||||
return nil
|
||||
}
|
||||
|
47
pkg/kubectl/proxy/BUILD
Normal file
47
pkg/kubectl/proxy/BUILD
Normal file
@ -0,0 +1,47 @@
|
||||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
licenses(["notice"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
"go_test",
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["proxy_server_test.go"],
|
||||
library = ":go_default_library",
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/proxy:go_default_library",
|
||||
"//vendor/k8s.io/client-go/rest:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["proxy_server.go"],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/kubectl/util:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/proxy:go_default_library",
|
||||
"//vendor/k8s.io/client-go/rest:go_default_library",
|
||||
"//vendor/k8s.io/client-go/transport:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
)
|
@ -14,13 +14,12 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kubectl
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"net/url"
|
||||
"os"
|
||||
"regexp"
|
||||
@ -28,19 +27,26 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
"k8s.io/apimachinery/pkg/util/proxy"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/transport"
|
||||
"k8s.io/kubernetes/pkg/kubectl/util"
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultHostAcceptRE = "^localhost$,^127\\.0\\.0\\.1$,^\\[::1\\]$"
|
||||
DefaultPathAcceptRE = "^.*"
|
||||
DefaultPathRejectRE = "^/api/.*/pods/.*/exec,^/api/.*/pods/.*/attach"
|
||||
// DefaultHostAcceptRE is the default value for which hosts to accept.
|
||||
DefaultHostAcceptRE = "^localhost$,^127\\.0\\.0\\.1$,^\\[::1\\]$"
|
||||
// DefaultPathAcceptRE is the default path to accept.
|
||||
DefaultPathAcceptRE = "^.*"
|
||||
// DefaultPathRejectRE is the default set of paths to reject.
|
||||
DefaultPathRejectRE = "^/api/.*/pods/.*/exec,^/api/.*/pods/.*/attach"
|
||||
// DefaultMethodRejectRE is the set of HTTP methods to reject by default.
|
||||
DefaultMethodRejectRE = "^$"
|
||||
)
|
||||
|
||||
var (
|
||||
// The reverse proxy will periodically flush the io writer at this frequency.
|
||||
// ReverseProxyFlushInterval is the frequency to flush the reverse proxy.
|
||||
// Only matters for long poll connections like the one used to watch. With an
|
||||
// interval of 0 the reverse proxy will buffer content sent on any connection
|
||||
// with transfer-encoding=chunked.
|
||||
@ -63,7 +69,7 @@ type FilterServer struct {
|
||||
delegate http.Handler
|
||||
}
|
||||
|
||||
// Splits a comma separated list of regexps into an array of Regexp objects.
|
||||
// MakeRegexpArray splits a comma separated list of regexps into an array of Regexp objects.
|
||||
func MakeRegexpArray(str string) ([]*regexp.Regexp, error) {
|
||||
parts := strings.Split(str, ",")
|
||||
result := make([]*regexp.Regexp, len(parts))
|
||||
@ -77,6 +83,7 @@ func MakeRegexpArray(str string) ([]*regexp.Regexp, error) {
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// MakeRegexpArrayOrDie creates an array of regular expression objects from a string or exits.
|
||||
func MakeRegexpArrayOrDie(str string) []*regexp.Regexp {
|
||||
result, err := MakeRegexpArray(str)
|
||||
if err != nil {
|
||||
@ -137,15 +144,38 @@ func (f *FilterServer) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
||||
rw.Write([]byte("<h3>Unauthorized</h3>"))
|
||||
}
|
||||
|
||||
// ProxyServer is a http.Handler which proxies Kubernetes APIs to remote API server.
|
||||
type ProxyServer struct {
|
||||
// Server is a http.Handler which proxies Kubernetes APIs to remote API server.
|
||||
type Server struct {
|
||||
handler http.Handler
|
||||
}
|
||||
|
||||
// NewProxyServer creates and installs a new ProxyServer.
|
||||
// It automatically registers the created ProxyServer to http.DefaultServeMux.
|
||||
type responder struct{}
|
||||
|
||||
func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) {
|
||||
glog.Errorf("Error while proxying request: %v", err)
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
// makeUpgradeTransport creates a transport that explicitly bypasses HTTP2 support
|
||||
// for proxy connections that must upgrade.
|
||||
func makeUpgradeTransport(config *rest.Config) (http.RoundTripper, error) {
|
||||
transportConfig, err := config.TransportConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tlsConfig, err := transport.TLSConfigFor(transportConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rt := utilnet.SetOldTransportDefaults(&http.Transport{
|
||||
TLSClientConfig: tlsConfig,
|
||||
})
|
||||
return transport.HTTPWrappersForConfig(transportConfig, rt)
|
||||
}
|
||||
|
||||
// NewServer creates and installs a new Server.
|
||||
// 'filter', if non-nil, protects requests to the api only.
|
||||
func NewProxyServer(filebase string, apiProxyPrefix string, staticPrefix string, filter *FilterServer, cfg *restclient.Config) (*ProxyServer, error) {
|
||||
func NewServer(filebase string, apiProxyPrefix string, staticPrefix string, filter *FilterServer, cfg *rest.Config) (*Server, error) {
|
||||
host := cfg.Host
|
||||
if !strings.HasSuffix(host, "/") {
|
||||
host = host + "/"
|
||||
@ -154,10 +184,20 @@ func NewProxyServer(filebase string, apiProxyPrefix string, staticPrefix string,
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
proxy := newProxy(target)
|
||||
if proxy.Transport, err = restclient.TransportFor(cfg); err != nil {
|
||||
|
||||
responder := &responder{}
|
||||
transport, err := rest.TransportFor(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
upgradeTransport, err := makeUpgradeTransport(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
proxy := proxy.NewUpgradeAwareHandler(target, transport, false, false, responder)
|
||||
proxy.UpgradeTransport = upgradeTransport
|
||||
proxy.UseRequestLocation = true
|
||||
|
||||
proxyServer := http.Handler(proxy)
|
||||
if filter != nil {
|
||||
proxyServer = filter.HandlerFor(proxyServer)
|
||||
@ -174,16 +214,16 @@ func NewProxyServer(filebase string, apiProxyPrefix string, staticPrefix string,
|
||||
// serving their working directory by default.
|
||||
mux.Handle(staticPrefix, newFileHandler(staticPrefix, filebase))
|
||||
}
|
||||
return &ProxyServer{handler: mux}, nil
|
||||
return &Server{handler: mux}, nil
|
||||
}
|
||||
|
||||
// Listen is a simple wrapper around net.Listen.
|
||||
func (s *ProxyServer) Listen(address string, port int) (net.Listener, error) {
|
||||
func (s *Server) Listen(address string, port int) (net.Listener, error) {
|
||||
return net.Listen("tcp", fmt.Sprintf("%s:%d", address, port))
|
||||
}
|
||||
|
||||
// ListenUnix does net.Listen for a unix socket
|
||||
func (s *ProxyServer) ListenUnix(path string) (net.Listener, error) {
|
||||
func (s *Server) ListenUnix(path string) (net.Listener, error) {
|
||||
// Remove any socket, stale or not, but fall through for other files
|
||||
fi, err := os.Stat(path)
|
||||
if err == nil && (fi.Mode()&os.ModeSocket) != 0 {
|
||||
@ -196,23 +236,14 @@ func (s *ProxyServer) ListenUnix(path string) (net.Listener, error) {
|
||||
return l, err
|
||||
}
|
||||
|
||||
// Serve starts the server using given listener, loops forever.
|
||||
func (s *ProxyServer) ServeOnListener(l net.Listener) error {
|
||||
// ServeOnListener starts the server using given listener, loops forever.
|
||||
func (s *Server) ServeOnListener(l net.Listener) error {
|
||||
server := http.Server{
|
||||
Handler: s.handler,
|
||||
}
|
||||
return server.Serve(l)
|
||||
}
|
||||
|
||||
func newProxy(target *url.URL) *httputil.ReverseProxy {
|
||||
director := func(req *http.Request) {
|
||||
req.URL.Scheme = target.Scheme
|
||||
req.URL.Host = target.Host
|
||||
req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)
|
||||
}
|
||||
return &httputil.ReverseProxy{Director: director, FlushInterval: ReverseProxyFlushInterval}
|
||||
}
|
||||
|
||||
func newFileHandler(prefix, base string) http.Handler {
|
||||
return http.StripPrefix(prefix, http.FileServer(http.Dir(base)))
|
||||
}
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kubectl
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@ -27,7 +27,8 @@ import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/apimachinery/pkg/util/proxy"
|
||||
"k8s.io/client-go/rest"
|
||||
)
|
||||
|
||||
func TestAccept(t *testing.T) {
|
||||
@ -340,6 +341,12 @@ func TestFileServing(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func newProxy(target *url.URL) http.Handler {
|
||||
p := proxy.NewUpgradeAwareHandler(target, http.DefaultTransport, false, false, &responder{})
|
||||
p.UseRequestLocation = true
|
||||
return p
|
||||
}
|
||||
|
||||
func TestAPIRequests(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
b, err := ioutil.ReadAll(r.Body)
|
||||
@ -353,6 +360,7 @@ func TestAPIRequests(t *testing.T) {
|
||||
|
||||
// httptest.NewServer should always generate a valid URL.
|
||||
target, _ := url.Parse(ts.URL)
|
||||
target.Path = "/"
|
||||
proxy := newProxy(target)
|
||||
|
||||
tests := []struct{ method, body string }{
|
||||
@ -404,13 +412,13 @@ func TestPathHandling(t *testing.T) {
|
||||
{"/custom/", "/custom/api/v1/pods/", "/api/v1/pods/"},
|
||||
}
|
||||
|
||||
cc := &restclient.Config{
|
||||
cc := &rest.Config{
|
||||
Host: ts.URL,
|
||||
}
|
||||
|
||||
for _, item := range table {
|
||||
func() {
|
||||
p, err := NewProxyServer("", item.prefix, "/not/used/for/this/test", nil, cc)
|
||||
p, err := NewServer("", item.prefix, "/not/used/for/this/test", nil, cc)
|
||||
if err != nil {
|
||||
t.Fatalf("%#v: %v", item, err)
|
||||
}
|
@ -8,6 +8,14 @@ load(
|
||||
"go_test",
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["util_test.go"],
|
||||
library = ":go_default_library",
|
||||
tags = ["automanaged"],
|
||||
deps = ["//vendor/github.com/stretchr/testify/assert:go_default_library"],
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
@ -23,14 +31,6 @@ go_library(
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["util_test.go"],
|
||||
library = ":go_default_library",
|
||||
tags = ["automanaged"],
|
||||
deps = ["//vendor/github.com/stretchr/testify/assert:go_default_library"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
|
@ -76,7 +76,7 @@ func (r *ProxyREST) Connect(ctx genericapirequest.Context, id string, opts runti
|
||||
}
|
||||
|
||||
func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder rest.Responder) *proxy.UpgradeAwareHandler {
|
||||
handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, responder)
|
||||
handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder))
|
||||
handler.MaxBytesPerSec = capabilities.Get().PerConnectionBandwidthLimitBytesPerSec
|
||||
return handler
|
||||
}
|
||||
|
@ -192,7 +192,7 @@ func (r *PortForwardREST) Connect(ctx genericapirequest.Context, name string, op
|
||||
}
|
||||
|
||||
func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired, interceptRedirects bool, responder rest.Responder) *proxy.UpgradeAwareHandler {
|
||||
handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, responder)
|
||||
handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder))
|
||||
handler.InterceptRedirects = interceptRedirects && utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StreamingProxyRedirects)
|
||||
handler.MaxBytesPerSec = capabilities.Get().PerConnectionBandwidthLimitBytesPerSec
|
||||
return handler
|
||||
|
@ -72,7 +72,7 @@ func (r *ProxyREST) Connect(ctx genericapirequest.Context, id string, opts runti
|
||||
}
|
||||
|
||||
func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder rest.Responder) *proxy.UpgradeAwareHandler {
|
||||
handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, responder)
|
||||
handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder))
|
||||
handler.MaxBytesPerSec = capabilities.Get().PerConnectionBandwidthLimitBytesPerSec
|
||||
return handler
|
||||
}
|
||||
|
@ -277,6 +277,13 @@ func NewProxierWithNoProxyCIDR(delegate func(req *http.Request) (*url.URL, error
|
||||
}
|
||||
}
|
||||
|
||||
// DialerFunc implements Dialer for the provided function.
|
||||
type DialerFunc func(req *http.Request) (net.Conn, error)
|
||||
|
||||
func (fn DialerFunc) Dial(req *http.Request) (net.Conn, error) {
|
||||
return fn(req)
|
||||
}
|
||||
|
||||
// Dialer dials a host and writes a request to it.
|
||||
type Dialer interface {
|
||||
// Dial connects to the host specified by req's URL, writes the request to the connection, and
|
||||
|
@ -32,7 +32,10 @@ import (
|
||||
func DialURL(url *url.URL, transport http.RoundTripper) (net.Conn, error) {
|
||||
dialAddr := netutil.CanonicalAddr(url)
|
||||
|
||||
dialer, _ := utilnet.DialerFor(transport)
|
||||
dialer, err := utilnet.DialerFor(transport)
|
||||
if err != nil {
|
||||
glog.V(5).Infof("Unable to unwrap transport %T to get dialer: %v", transport, err)
|
||||
}
|
||||
|
||||
switch url.Scheme {
|
||||
case "http":
|
||||
@ -45,7 +48,10 @@ func DialURL(url *url.URL, transport http.RoundTripper) (net.Conn, error) {
|
||||
var tlsConfig *tls.Config
|
||||
var tlsConn *tls.Conn
|
||||
var err error
|
||||
tlsConfig, _ = utilnet.TLSClientConfig(transport)
|
||||
tlsConfig, err = utilnet.TLSClientConfig(transport)
|
||||
if err != nil {
|
||||
glog.V(5).Infof("Unable to unwrap transport %T to get at TLS config: %v", transport, err)
|
||||
}
|
||||
|
||||
if dialer != nil {
|
||||
// We have a dialer; use it to open the connection, then
|
||||
|
@ -39,18 +39,29 @@ import (
|
||||
|
||||
// UpgradeAwareHandler is a handler for proxy requests that may require an upgrade
|
||||
type UpgradeAwareHandler struct {
|
||||
// UpgradeRequired will reject non-upgrade connections if true.
|
||||
UpgradeRequired bool
|
||||
Location *url.URL
|
||||
// Location is the location of the upstream proxy. It is used as the location to Dial on the upstream server
|
||||
// for upgrade requests unless UseRequestLocationOnUpgrade is true.
|
||||
Location *url.URL
|
||||
// Transport provides an optional round tripper to use to proxy. If nil, the default proxy transport is used
|
||||
Transport http.RoundTripper
|
||||
// UpgradeTransport, if specified, will be used as the backend transport when upgrade requests are provided.
|
||||
// This allows clients to disable HTTP/2.
|
||||
UpgradeTransport http.RoundTripper
|
||||
// WrapTransport indicates whether the provided Transport should be wrapped with default proxy transport behavior (URL rewriting, X-Forwarded-* header setting)
|
||||
WrapTransport bool
|
||||
// InterceptRedirects determines whether the proxy should sniff backend responses for redirects,
|
||||
// following them as necessary.
|
||||
InterceptRedirects bool
|
||||
FlushInterval time.Duration
|
||||
MaxBytesPerSec int64
|
||||
Responder ErrorResponder
|
||||
// UseRequestLocation will use the incoming request URL when talking to the backend server.
|
||||
UseRequestLocation bool
|
||||
// FlushInterval controls how often the standard HTTP proxy will flush content from the upstream.
|
||||
FlushInterval time.Duration
|
||||
// MaxBytesPerSec controls the maximum rate for an upstream connection. No rate is imposed if the value is zero.
|
||||
MaxBytesPerSec int64
|
||||
// Responder is passed errors that occur while setting up proxying.
|
||||
Responder ErrorResponder
|
||||
}
|
||||
|
||||
const defaultFlushInterval = 200 * time.Millisecond
|
||||
@ -58,9 +69,27 @@ const defaultFlushInterval = 200 * time.Millisecond
|
||||
// ErrorResponder abstracts error reporting to the proxy handler to remove the need to hardcode a particular
|
||||
// error format.
|
||||
type ErrorResponder interface {
|
||||
Error(w http.ResponseWriter, req *http.Request, err error)
|
||||
}
|
||||
|
||||
// SimpleErrorResponder is the legacy implementation of ErrorResponder for callers that only
|
||||
// service a single request/response per proxy.
|
||||
type SimpleErrorResponder interface {
|
||||
Error(err error)
|
||||
}
|
||||
|
||||
func NewErrorResponder(r SimpleErrorResponder) ErrorResponder {
|
||||
return simpleResponder{r}
|
||||
}
|
||||
|
||||
type simpleResponder struct {
|
||||
responder SimpleErrorResponder
|
||||
}
|
||||
|
||||
func (r simpleResponder) Error(w http.ResponseWriter, req *http.Request, err error) {
|
||||
r.responder.Error(err)
|
||||
}
|
||||
|
||||
// NewUpgradeAwareHandler creates a new proxy handler with a default flush interval. Responder is required for returning
|
||||
// errors to the caller.
|
||||
func NewUpgradeAwareHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder ErrorResponder) *UpgradeAwareHandler {
|
||||
@ -83,7 +112,7 @@ func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
|
||||
return
|
||||
}
|
||||
if h.UpgradeRequired {
|
||||
h.Responder.Error(errors.NewBadRequest("Upgrade request required"))
|
||||
h.Responder.Error(w, req, errors.NewBadRequest("Upgrade request required"))
|
||||
return
|
||||
}
|
||||
|
||||
@ -117,7 +146,9 @@ func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
|
||||
// WithContext creates a shallow clone of the request with the new context.
|
||||
newReq := req.WithContext(context.Background())
|
||||
newReq.Header = utilnet.CloneHeader(req.Header)
|
||||
newReq.URL = &loc
|
||||
if !h.UseRequestLocation {
|
||||
newReq.URL = &loc
|
||||
}
|
||||
|
||||
proxy := httputil.NewSingleHostReverseProxy(&url.URL{Scheme: h.Location.Scheme, Host: h.Location.Host})
|
||||
proxy.Transport = h.Transport
|
||||
@ -128,6 +159,7 @@ func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
|
||||
// tryUpgrade returns true if the request was handled.
|
||||
func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Request) bool {
|
||||
if !httpstream.IsUpgradeRequest(req) {
|
||||
glog.V(6).Infof("Request was not an upgrade")
|
||||
return false
|
||||
}
|
||||
|
||||
@ -137,18 +169,28 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques
|
||||
err error
|
||||
)
|
||||
|
||||
location := *h.Location
|
||||
if h.UseRequestLocation {
|
||||
location = *req.URL
|
||||
location.Scheme = h.Location.Scheme
|
||||
location.Host = h.Location.Host
|
||||
}
|
||||
|
||||
clone := utilnet.CloneRequest(req)
|
||||
// Only append X-Forwarded-For in the upgrade path, since httputil.NewSingleHostReverseProxy
|
||||
// handles this in the non-upgrade path.
|
||||
utilnet.AppendForwardedForHeader(clone)
|
||||
if h.InterceptRedirects {
|
||||
backendConn, rawResponse, err = utilnet.ConnectWithRedirects(req.Method, h.Location, clone.Header, req.Body, h)
|
||||
glog.V(6).Infof("Connecting to backend proxy (intercepting redirects) %s\n Headers: %v", &location, clone.Header)
|
||||
backendConn, rawResponse, err = utilnet.ConnectWithRedirects(req.Method, &location, clone.Header, req.Body, utilnet.DialerFunc(h.DialForUpgrade))
|
||||
} else {
|
||||
clone.URL = h.Location
|
||||
backendConn, err = h.Dial(clone)
|
||||
glog.V(6).Infof("Connecting to backend proxy (direct dial) %s\n Headers: %v", &location, clone.Header)
|
||||
clone.URL = &location
|
||||
backendConn, err = h.DialForUpgrade(clone)
|
||||
}
|
||||
if err != nil {
|
||||
h.Responder.Error(err)
|
||||
glog.V(6).Infof("Proxy connection error: %v", err)
|
||||
h.Responder.Error(w, req, err)
|
||||
return true
|
||||
}
|
||||
defer backendConn.Close()
|
||||
@ -157,18 +199,21 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques
|
||||
// hijacking should be the last step in the upgrade.
|
||||
requestHijacker, ok := w.(http.Hijacker)
|
||||
if !ok {
|
||||
h.Responder.Error(fmt.Errorf("request connection cannot be hijacked: %T", w))
|
||||
glog.V(6).Infof("Unable to hijack response writer: %T", w)
|
||||
h.Responder.Error(w, req, fmt.Errorf("request connection cannot be hijacked: %T", w))
|
||||
return true
|
||||
}
|
||||
requestHijackedConn, _, err := requestHijacker.Hijack()
|
||||
if err != nil {
|
||||
h.Responder.Error(fmt.Errorf("error hijacking request connection: %v", err))
|
||||
glog.V(6).Infof("Unable to hijack response: %v", err)
|
||||
h.Responder.Error(w, req, fmt.Errorf("error hijacking connection: %v", err))
|
||||
return true
|
||||
}
|
||||
defer requestHijackedConn.Close()
|
||||
|
||||
// Forward raw response bytes back to client.
|
||||
if len(rawResponse) > 0 {
|
||||
glog.V(6).Infof("Writing %d bytes to hijacked connection", len(rawResponse))
|
||||
if _, err = requestHijackedConn.Write(rawResponse); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Error proxying response from backend to client: %v", err))
|
||||
}
|
||||
@ -210,9 +255,20 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques
|
||||
return true
|
||||
}
|
||||
|
||||
// Dial dials the backend at req.URL and writes req to it.
|
||||
func (h *UpgradeAwareHandler) Dial(req *http.Request) (net.Conn, error) {
|
||||
conn, err := DialURL(req.URL, h.Transport)
|
||||
return dial(req, h.Transport)
|
||||
}
|
||||
|
||||
func (h *UpgradeAwareHandler) DialForUpgrade(req *http.Request) (net.Conn, error) {
|
||||
if h.UpgradeTransport != nil {
|
||||
return dial(req, h.UpgradeTransport)
|
||||
}
|
||||
return dial(req, h.Transport)
|
||||
}
|
||||
|
||||
// dial dials the backend at req.URL and writes req to it.
|
||||
func dial(req *http.Request, transport http.RoundTripper) (net.Conn, error) {
|
||||
conn, err := DialURL(req.URL, transport)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error dialing backend: %v", err)
|
||||
}
|
||||
|
@ -55,18 +55,14 @@ type fakeResponder struct {
|
||||
w http.ResponseWriter
|
||||
}
|
||||
|
||||
func (r *fakeResponder) Error(err error) {
|
||||
func (r *fakeResponder) Error(w http.ResponseWriter, req *http.Request, err error) {
|
||||
if r.called {
|
||||
r.t.Errorf("Error responder called again!\nprevious error: %v\nnew error: %v", r.err, err)
|
||||
}
|
||||
|
||||
if r.w != nil {
|
||||
r.w.WriteHeader(fakeStatusCode)
|
||||
_, writeErr := r.w.Write([]byte(err.Error()))
|
||||
assert.NoError(r.t, writeErr)
|
||||
} else {
|
||||
r.t.Logf("No ResponseWriter set")
|
||||
}
|
||||
w.WriteHeader(fakeStatusCode)
|
||||
_, writeErr := w.Write([]byte(err.Error()))
|
||||
assert.NoError(r.t, writeErr)
|
||||
|
||||
r.called = true
|
||||
r.err = err
|
||||
@ -459,7 +455,7 @@ type noErrorsAllowed struct {
|
||||
t *testing.T
|
||||
}
|
||||
|
||||
func (r *noErrorsAllowed) Error(err error) {
|
||||
func (r *noErrorsAllowed) Error(w http.ResponseWriter, req *http.Request, err error) {
|
||||
r.t.Error(err)
|
||||
}
|
||||
|
||||
|
@ -181,7 +181,7 @@ func (r *responder) Object(statusCode int, obj runtime.Object) {
|
||||
responsewriters.WriteRawJSON(statusCode, obj, r.w)
|
||||
}
|
||||
|
||||
func (r *responder) Error(err error) {
|
||||
func (r *responder) Error(_ http.ResponseWriter, _ *http.Request, err error) {
|
||||
http.Error(r.w, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
|
@ -437,9 +437,6 @@ var _ = SIGDescribe("Kubectl client", func() {
|
||||
})
|
||||
|
||||
It("should support exec through an HTTP proxy", func() {
|
||||
// Note: We are skipping local since we want to verify an apiserver with HTTPS.
|
||||
// At this time local only supports plain HTTP.
|
||||
framework.SkipIfProviderIs("local")
|
||||
// Fail if the variable isn't set
|
||||
if framework.TestContext.Host == "" {
|
||||
framework.Failf("--host variable must be set to the full URI to the api server on e2e run.")
|
||||
@ -473,6 +470,32 @@ var _ = SIGDescribe("Kubectl client", func() {
|
||||
}
|
||||
})
|
||||
|
||||
It("should support exec through kubectl proxy", func() {
|
||||
// Fail if the variable isn't set
|
||||
if framework.TestContext.Host == "" {
|
||||
framework.Failf("--host variable must be set to the full URI to the api server on e2e run.")
|
||||
}
|
||||
|
||||
By("Starting kubectl proxy")
|
||||
port, proxyCmd, err := startProxyServer()
|
||||
framework.ExpectNoError(err)
|
||||
defer framework.TryKill(proxyCmd)
|
||||
|
||||
//proxyLogs.Reset()
|
||||
host := fmt.Sprintf("--server=http://127.0.0.1:%d", port)
|
||||
By("Running kubectl via kubectl proxy using " + host)
|
||||
output := framework.NewKubectlCommand(
|
||||
host, fmt.Sprintf("--namespace=%s", ns),
|
||||
"exec", "nginx", "echo", "running", "in", "container",
|
||||
).ExecOrDie()
|
||||
|
||||
// Verify we got the normal output captured by the exec server
|
||||
expectedExecOutput := "running in container\n"
|
||||
if output != expectedExecOutput {
|
||||
framework.Failf("Unexpected kubectl exec output. Wanted %q, got %q", expectedExecOutput, output)
|
||||
}
|
||||
})
|
||||
|
||||
It("should return command exit codes", func() {
|
||||
framework.SkipUnlessKubectlVersionGTE(kubectlContainerExitCodeVersion)
|
||||
nsFlag := fmt.Sprintf("--namespace=%v", ns)
|
||||
@ -1758,7 +1781,7 @@ func getAPIVersions(apiEndpoint string) (*metav1.APIVersions, error) {
|
||||
|
||||
func startProxyServer() (int, *exec.Cmd, error) {
|
||||
// Specifying port 0 indicates we want the os to pick a random port.
|
||||
cmd := framework.KubectlCmd("proxy", "-p", "0")
|
||||
cmd := framework.KubectlCmd("proxy", "-p", "0", "--disable-filter")
|
||||
stdout, stderr, err := framework.StartCmdAndStreamOutput(cmd)
|
||||
if err != nil {
|
||||
return -1, nil, err
|
||||
|
Loading…
Reference in New Issue
Block a user