From aaebd67a63f01a35c21665f507e0fe9caec6b013 Mon Sep 17 00:00:00 2001 From: David Eads Date: Wed, 28 Aug 2019 14:56:24 -0400 Subject: [PATCH] add a timeout for proxying discovery requests --- .../pkg/util/proxy/upgradeaware.go | 9 +++- .../kube-aggregator/pkg/apiserver/BUILD | 1 + .../pkg/apiserver/handler_proxy.go | 44 ++++++++++++++++--- .../pkg/apiserver/handler_proxy_test.go | 44 ++++++++++++++++++- 4 files changed, 89 insertions(+), 9 deletions(-) diff --git a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go index ad8b160959e..4ce687393dd 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go @@ -223,7 +223,14 @@ func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request } // WithContext creates a shallow clone of the request with the new context. - newReq := req.WithContext(context.Background()) + ctx := context.Background() + // if the original request has a deadline, we should honor that deadline for our proxied request + if deadline, ok := req.Context().Deadline(); ok { + var cancelFn context.CancelFunc + ctx, cancelFn = context.WithDeadline(ctx, deadline) + defer cancelFn() + } + newReq := req.WithContext(ctx) newReq.Header = utilnet.CloneHeader(req.Header) if !h.UseRequestLocation { newReq.URL = &loc diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD index 4795cff0deb..fdeac0d8e4a 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD @@ -18,6 +18,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/proxy:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library", diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go index 0fd1a5da58f..c5a559835ae 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go @@ -18,10 +18,11 @@ package apiserver import ( "context" - "k8s.io/klog" "net/http" "net/url" + "strings" "sync/atomic" + "time" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/httpstream" @@ -35,11 +36,16 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" restclient "k8s.io/client-go/rest" "k8s.io/client-go/transport" + "k8s.io/klog" apiregistrationv1api "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper" ) -const aggregatorComponent string = "aggregator" +const ( + aggregatorComponent string = "aggregator" + + aggregatedDiscoveryTimeout = 5 * time.Second +) // proxyHandler provides a http.Handler which will proxy traffic to locations // specified by items implementing Redirector. @@ -140,11 +146,8 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { location.Path = req.URL.Path location.RawQuery = req.URL.Query().Encode() - // WithContext creates a shallow clone of the request with the new context. - newReq := req.WithContext(context.Background()) - newReq.Header = utilnet.CloneHeader(req.Header) - newReq.URL = location - newReq.Host = location.Host + newReq, cancelFn := newRequestForProxy(location, req) + defer cancelFn() if handlingInfo.proxyRoundTripper == nil { proxyError(w, req, "", http.StatusNotFound) @@ -171,6 +174,33 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { handler.ServeHTTP(w, newReq) } +// newRequestForProxy returns a shallow copy of the original request with a context that may include a timeout for discovery requests +func newRequestForProxy(location *url.URL, req *http.Request) (*http.Request, context.CancelFunc) { + newCtx := context.Background() + cancelFn := func() {} + + // if the original request has a deadline, we should honor that deadline for our proxied request + if deadline, ok := req.Context().Deadline(); ok { + newCtx, cancelFn = context.WithDeadline(newCtx, deadline) + + // trim leading and trailing slashes. Then "/apis/group/version" requests are for discovery, so if we have exactly three + // segments that we are going to proxy, we have a discovery request. + } else if len(strings.Split(strings.Trim(req.URL.Path, "/"), "/")) == 3 { + // discovery requests are used by kubectl and others to determine which resources a server has. This is a cheap call that + // should be fast for every aggregated apiserver. Latency for aggregation is expected to be low (as for all extensions) + // so forcing a short timeout here helps responsiveness of all clients. + newCtx, cancelFn = context.WithTimeout(newCtx, aggregatedDiscoveryTimeout) + } + + // WithContext creates a shallow clone of the request with the new context. + newReq := req.WithContext(newCtx) + newReq.Header = utilnet.CloneHeader(req.Header) + newReq.URL = location + newReq.Host = location.Host + + return newReq, cancelFn +} + // maybeWrapForConnectionUpgrades wraps the roundtripper for upgrades. The bool indicates if it was wrapped func maybeWrapForConnectionUpgrades(restConfig *restclient.Config, rt http.RoundTripper, req *http.Request) (http.RoundTripper, bool, error) { if !httpstream.IsUpgradeRequest(req) { diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go index 761f30a18a9..70ce3e8c630 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go @@ -20,7 +20,6 @@ import ( "crypto/tls" "fmt" "io/ioutil" - "k8s.io/utils/pointer" "net/http" "net/http/httptest" "net/http/httputil" @@ -33,10 +32,12 @@ import ( "golang.org/x/net/websocket" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/proxy" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/authentication/user" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" apiregistration "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" + "k8s.io/utils/pointer" ) type targetHTTPHandler struct { @@ -523,3 +524,44 @@ gR0TAoGACFOvhl8txfbkwLeuNeunyOPL7J4nIccthgd2ioFOr3HTou6wzN++vYTa a3OF9jH5Z7m6X1rrwn6J1+Gw9sBme38/GeGXHigsBI/8WaTvyuppyVIXOVPoTvVf VYsTwo5YgV1HzDkV+BNmBCw1GYcGXAElhJI+dCsgQuuU6TKzgl8= -----END RSA PRIVATE KEY-----`) + +func TestGetContextForNewRequest(t *testing.T) { + done := make(chan struct{}) + server := httptest.NewTLSServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) { + <-done // never return so that we're certain to return base on timeout + })) + defer server.Close() + defer close(done) + + proxyServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + location, err := url.Parse(server.URL) + if err != nil { + t.Fatal(err) + } + location.Path = req.URL.Path + + newReq, cancelFn := newRequestForProxy(location, req) + defer cancelFn() + + theproxy := proxy.NewUpgradeAwareHandler(location, server.Client().Transport, true, false, &responder{w: w}) + theproxy.ServeHTTP(w, newReq) + })) + defer proxyServer.Close() + + // normal clients will not be setting a timeout, don't set one here. Our proxy logic should construct this for us + resp, err := proxyServer.Client().Get(proxyServer.URL + "/apis/group/version") + if err != nil { + t.Fatal(err) + } + if resp.StatusCode != http.StatusServiceUnavailable { + t.Error(err) + } + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatal(err) + } + if !strings.Contains(string(body), "Error trying to reach service: 'context deadline exceeded'") { + t.Error(string(body)) + } + +}