From aaebd67a63f01a35c21665f507e0fe9caec6b013 Mon Sep 17 00:00:00 2001 From: David Eads Date: Wed, 28 Aug 2019 14:56:24 -0400 Subject: [PATCH 1/5] add a timeout for proxying discovery requests --- .../pkg/util/proxy/upgradeaware.go | 9 +++- .../kube-aggregator/pkg/apiserver/BUILD | 1 + .../pkg/apiserver/handler_proxy.go | 44 ++++++++++++++++--- .../pkg/apiserver/handler_proxy_test.go | 44 ++++++++++++++++++- 4 files changed, 89 insertions(+), 9 deletions(-) diff --git a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go index ad8b160959e..4ce687393dd 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go @@ -223,7 +223,14 @@ func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request } // WithContext creates a shallow clone of the request with the new context. - newReq := req.WithContext(context.Background()) + ctx := context.Background() + // if the original request has a deadline, we should honor that deadline for our proxied request + if deadline, ok := req.Context().Deadline(); ok { + var cancelFn context.CancelFunc + ctx, cancelFn = context.WithDeadline(ctx, deadline) + defer cancelFn() + } + newReq := req.WithContext(ctx) newReq.Header = utilnet.CloneHeader(req.Header) if !h.UseRequestLocation { newReq.URL = &loc diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD index 4795cff0deb..fdeac0d8e4a 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD @@ -18,6 +18,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/proxy:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library", diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go index 0fd1a5da58f..c5a559835ae 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go @@ -18,10 +18,11 @@ package apiserver import ( "context" - "k8s.io/klog" "net/http" "net/url" + "strings" "sync/atomic" + "time" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/httpstream" @@ -35,11 +36,16 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" restclient "k8s.io/client-go/rest" "k8s.io/client-go/transport" + "k8s.io/klog" apiregistrationv1api "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper" ) -const aggregatorComponent string = "aggregator" +const ( + aggregatorComponent string = "aggregator" + + aggregatedDiscoveryTimeout = 5 * time.Second +) // proxyHandler provides a http.Handler which will proxy traffic to locations // specified by items implementing Redirector. @@ -140,11 +146,8 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { location.Path = req.URL.Path location.RawQuery = req.URL.Query().Encode() - // WithContext creates a shallow clone of the request with the new context. - newReq := req.WithContext(context.Background()) - newReq.Header = utilnet.CloneHeader(req.Header) - newReq.URL = location - newReq.Host = location.Host + newReq, cancelFn := newRequestForProxy(location, req) + defer cancelFn() if handlingInfo.proxyRoundTripper == nil { proxyError(w, req, "", http.StatusNotFound) @@ -171,6 +174,33 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { handler.ServeHTTP(w, newReq) } +// newRequestForProxy returns a shallow copy of the original request with a context that may include a timeout for discovery requests +func newRequestForProxy(location *url.URL, req *http.Request) (*http.Request, context.CancelFunc) { + newCtx := context.Background() + cancelFn := func() {} + + // if the original request has a deadline, we should honor that deadline for our proxied request + if deadline, ok := req.Context().Deadline(); ok { + newCtx, cancelFn = context.WithDeadline(newCtx, deadline) + + // trim leading and trailing slashes. Then "/apis/group/version" requests are for discovery, so if we have exactly three + // segments that we are going to proxy, we have a discovery request. + } else if len(strings.Split(strings.Trim(req.URL.Path, "/"), "/")) == 3 { + // discovery requests are used by kubectl and others to determine which resources a server has. This is a cheap call that + // should be fast for every aggregated apiserver. Latency for aggregation is expected to be low (as for all extensions) + // so forcing a short timeout here helps responsiveness of all clients. + newCtx, cancelFn = context.WithTimeout(newCtx, aggregatedDiscoveryTimeout) + } + + // WithContext creates a shallow clone of the request with the new context. + newReq := req.WithContext(newCtx) + newReq.Header = utilnet.CloneHeader(req.Header) + newReq.URL = location + newReq.Host = location.Host + + return newReq, cancelFn +} + // maybeWrapForConnectionUpgrades wraps the roundtripper for upgrades. The bool indicates if it was wrapped func maybeWrapForConnectionUpgrades(restConfig *restclient.Config, rt http.RoundTripper, req *http.Request) (http.RoundTripper, bool, error) { if !httpstream.IsUpgradeRequest(req) { diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go index 761f30a18a9..70ce3e8c630 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go @@ -20,7 +20,6 @@ import ( "crypto/tls" "fmt" "io/ioutil" - "k8s.io/utils/pointer" "net/http" "net/http/httptest" "net/http/httputil" @@ -33,10 +32,12 @@ import ( "golang.org/x/net/websocket" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/proxy" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/authentication/user" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" apiregistration "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" + "k8s.io/utils/pointer" ) type targetHTTPHandler struct { @@ -523,3 +524,44 @@ gR0TAoGACFOvhl8txfbkwLeuNeunyOPL7J4nIccthgd2ioFOr3HTou6wzN++vYTa a3OF9jH5Z7m6X1rrwn6J1+Gw9sBme38/GeGXHigsBI/8WaTvyuppyVIXOVPoTvVf VYsTwo5YgV1HzDkV+BNmBCw1GYcGXAElhJI+dCsgQuuU6TKzgl8= -----END RSA PRIVATE KEY-----`) + +func TestGetContextForNewRequest(t *testing.T) { + done := make(chan struct{}) + server := httptest.NewTLSServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) { + <-done // never return so that we're certain to return base on timeout + })) + defer server.Close() + defer close(done) + + proxyServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + location, err := url.Parse(server.URL) + if err != nil { + t.Fatal(err) + } + location.Path = req.URL.Path + + newReq, cancelFn := newRequestForProxy(location, req) + defer cancelFn() + + theproxy := proxy.NewUpgradeAwareHandler(location, server.Client().Transport, true, false, &responder{w: w}) + theproxy.ServeHTTP(w, newReq) + })) + defer proxyServer.Close() + + // normal clients will not be setting a timeout, don't set one here. Our proxy logic should construct this for us + resp, err := proxyServer.Client().Get(proxyServer.URL + "/apis/group/version") + if err != nil { + t.Fatal(err) + } + if resp.StatusCode != http.StatusServiceUnavailable { + t.Error(err) + } + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatal(err) + } + if !strings.Contains(string(body), "Error trying to reach service: 'context deadline exceeded'") { + t.Error(string(body)) + } + +} From 2f1f997c08e68742bc1363af90e85a103ce6a3ea Mon Sep 17 00:00:00 2001 From: David Eads Date: Thu, 29 Aug 2019 13:28:38 -0400 Subject: [PATCH 2/5] add env var to allow disabling the aggregated discovery timeout --- .../pkg/apiserver/handler_proxy.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go index c5a559835ae..05923bd3cbb 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go @@ -20,6 +20,8 @@ import ( "context" "net/http" "net/url" + "os" + "strconv" "strings" "sync/atomic" "time" @@ -29,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/util/httpstream/spdy" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/proxy" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" endpointmetrics "k8s.io/apiserver/pkg/endpoints/metrics" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" @@ -47,6 +50,19 @@ const ( aggregatedDiscoveryTimeout = 5 * time.Second ) +var ( + // TODO this should be unconditionally true once we remove the env var override + enableAggregatedDiscoveryTimeout = true +) + +func init() { + disableAggregatedDiscoveryTimeout, err := strconv.ParseBool(os.Getenv("DEPRECATED_DISABLE_AGGREGATOR_DISCOVERY_TIMEOUT")) + if err != nil { + utilruntime.HandleError(err) + } + enableAggregatedDiscoveryTimeout = !disableAggregatedDiscoveryTimeout +} + // proxyHandler provides a http.Handler which will proxy traffic to locations // specified by items implementing Redirector. type proxyHandler struct { @@ -185,7 +201,7 @@ func newRequestForProxy(location *url.URL, req *http.Request) (*http.Request, co // trim leading and trailing slashes. Then "/apis/group/version" requests are for discovery, so if we have exactly three // segments that we are going to proxy, we have a discovery request. - } else if len(strings.Split(strings.Trim(req.URL.Path, "/"), "/")) == 3 { + } else if enableAggregatedDiscoveryTimeout && len(strings.Split(strings.Trim(req.URL.Path, "/"), "/")) == 3 { // discovery requests are used by kubectl and others to determine which resources a server has. This is a cheap call that // should be fast for every aggregated apiserver. Latency for aggregation is expected to be low (as for all extensions) // so forcing a short timeout here helps responsiveness of all clients. From 5521bf27c589090ae1c0872bc0adbf05d6e65a0f Mon Sep 17 00:00:00 2001 From: David Eads Date: Thu, 29 Aug 2019 16:49:36 -0400 Subject: [PATCH 3/5] add temporary feature gate to allow disabling aggregated discovery timeout --- cmd/kube-apiserver/app/aggregator.go | 12 +++--- pkg/features/kube_features.go | 9 +++++ .../pkg/apiserver/apiserver.go | 38 +++++++++++-------- .../pkg/apiserver/handler_proxy.go | 22 ++--------- .../pkg/apiserver/handler_proxy_test.go | 2 +- 5 files changed, 43 insertions(+), 40 deletions(-) diff --git a/cmd/kube-apiserver/app/aggregator.go b/cmd/kube-apiserver/app/aggregator.go index 7e360811f34..c19e5f248bb 100644 --- a/cmd/kube-apiserver/app/aggregator.go +++ b/cmd/kube-apiserver/app/aggregator.go @@ -41,7 +41,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" kubeexternalinformers "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" - "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" + v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" v1helper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper" "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1" aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver" @@ -50,6 +50,7 @@ import ( informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1" "k8s.io/kube-aggregator/pkg/controllers/autoregister" "k8s.io/kubernetes/cmd/kube-apiserver/app/options" + kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/master/controller/crdregistration" ) @@ -109,10 +110,11 @@ func createAggregatorConfig( SharedInformerFactory: externalInformers, }, ExtraConfig: aggregatorapiserver.ExtraConfig{ - ProxyClientCert: certBytes, - ProxyClientKey: keyBytes, - ServiceResolver: serviceResolver, - ProxyTransport: proxyTransport, + ProxyClientCert: certBytes, + ProxyClientKey: keyBytes, + ServiceResolver: serviceResolver, + ProxyTransport: proxyTransport, + EnableAggregatedDiscoveryTimeout: utilfeature.DefaultFeatureGate.Enabled(kubefeatures.EnableAggregatedDiscoveryTimeout), }, } diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 60522f4226d..a45a1e667e7 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -495,6 +495,13 @@ const ( // // Enables the startupProbe in kubelet worker. StartupProbe featuregate.Feature = "StartupProbe" + + // owner @deads2k + // deprecated: v1.16 + // + // Enable the aggregated discovery timeout to ensure client responsiveness. Note this feature is present + // only for backward compatibility, it will be removed in the 1.17 release. + EnableAggregatedDiscoveryTimeout featuregate.Feature = "EnableAggregatedDiscoveryTimeout" ) func init() { @@ -597,6 +604,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS apiextensionsfeatures.CustomResourcePublishOpenAPI: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, apiextensionsfeatures.CustomResourceDefaulting: {Default: true, PreRelease: featuregate.Beta}, + EnableAggregatedDiscoveryTimeout: {Default: true, PreRelease: featuregate.Deprecated}, + // features that enable backwards compatibility but are scheduled to be removed // ... HPAScaleToZero: {Default: false, PreRelease: featuregate.Alpha}, diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index ec233788466..1804266787f 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -71,6 +71,8 @@ type ExtraConfig struct { // Mechanism by which the Aggregator will resolve services. Required. ServiceResolver ServiceResolver + + EnableAggregatedDiscoveryTimeout bool } // Config represents the configuration needed to create an APIAggregator. @@ -132,6 +134,8 @@ type APIAggregator struct { // openAPIAggregationController downloads and merges OpenAPI specs. openAPIAggregationController *openapicontroller.AggregationController + + enableAggregatedDiscoveryTimeout bool } // Complete fills in any fields not set that are required to have valid data. It's mutating the receiver. @@ -172,17 +176,18 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg ) s := &APIAggregator{ - GenericAPIServer: genericServer, - delegateHandler: delegationTarget.UnprotectedHandler(), - proxyClientCert: c.ExtraConfig.ProxyClientCert, - proxyClientKey: c.ExtraConfig.ProxyClientKey, - proxyTransport: c.ExtraConfig.ProxyTransport, - proxyHandlers: map[string]*proxyHandler{}, - handledGroups: sets.String{}, - lister: informerFactory.Apiregistration().V1().APIServices().Lister(), - APIRegistrationInformers: informerFactory, - serviceResolver: c.ExtraConfig.ServiceResolver, - openAPIConfig: openAPIConfig, + GenericAPIServer: genericServer, + delegateHandler: delegationTarget.UnprotectedHandler(), + proxyClientCert: c.ExtraConfig.ProxyClientCert, + proxyClientKey: c.ExtraConfig.ProxyClientKey, + proxyTransport: c.ExtraConfig.ProxyTransport, + proxyHandlers: map[string]*proxyHandler{}, + handledGroups: sets.String{}, + lister: informerFactory.Apiregistration().V1().APIServices().Lister(), + APIRegistrationInformers: informerFactory, + serviceResolver: c.ExtraConfig.ServiceResolver, + openAPIConfig: openAPIConfig, + enableAggregatedDiscoveryTimeout: c.ExtraConfig.EnableAggregatedDiscoveryTimeout, } apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter) @@ -286,11 +291,12 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error { // register the proxy handler proxyHandler := &proxyHandler{ - localDelegate: s.delegateHandler, - proxyClientCert: s.proxyClientCert, - proxyClientKey: s.proxyClientKey, - proxyTransport: s.proxyTransport, - serviceResolver: s.serviceResolver, + localDelegate: s.delegateHandler, + proxyClientCert: s.proxyClientCert, + proxyClientKey: s.proxyClientKey, + proxyTransport: s.proxyTransport, + serviceResolver: s.serviceResolver, + enableAggregatedDiscoveryTimeout: s.enableAggregatedDiscoveryTimeout, } proxyHandler.updateAPIService(apiService) if s.openAPIAggregationController != nil { diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go index 05923bd3cbb..6dbe553d2ad 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go @@ -20,8 +20,6 @@ import ( "context" "net/http" "net/url" - "os" - "strconv" "strings" "sync/atomic" "time" @@ -31,7 +29,6 @@ import ( "k8s.io/apimachinery/pkg/util/httpstream/spdy" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/proxy" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" endpointmetrics "k8s.io/apiserver/pkg/endpoints/metrics" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" @@ -50,19 +47,6 @@ const ( aggregatedDiscoveryTimeout = 5 * time.Second ) -var ( - // TODO this should be unconditionally true once we remove the env var override - enableAggregatedDiscoveryTimeout = true -) - -func init() { - disableAggregatedDiscoveryTimeout, err := strconv.ParseBool(os.Getenv("DEPRECATED_DISABLE_AGGREGATOR_DISCOVERY_TIMEOUT")) - if err != nil { - utilruntime.HandleError(err) - } - enableAggregatedDiscoveryTimeout = !disableAggregatedDiscoveryTimeout -} - // proxyHandler provides a http.Handler which will proxy traffic to locations // specified by items implementing Redirector. type proxyHandler struct { @@ -79,6 +63,8 @@ type proxyHandler struct { serviceResolver ServiceResolver handlingInfo atomic.Value + + enableAggregatedDiscoveryTimeout bool } type proxyHandlingInfo struct { @@ -162,7 +148,7 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { location.Path = req.URL.Path location.RawQuery = req.URL.Query().Encode() - newReq, cancelFn := newRequestForProxy(location, req) + newReq, cancelFn := newRequestForProxy(location, req, r.enableAggregatedDiscoveryTimeout) defer cancelFn() if handlingInfo.proxyRoundTripper == nil { @@ -191,7 +177,7 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { } // newRequestForProxy returns a shallow copy of the original request with a context that may include a timeout for discovery requests -func newRequestForProxy(location *url.URL, req *http.Request) (*http.Request, context.CancelFunc) { +func newRequestForProxy(location *url.URL, req *http.Request, enableAggregatedDiscoveryTimeout bool) (*http.Request, context.CancelFunc) { newCtx := context.Background() cancelFn := func() {} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go index 70ce3e8c630..5df58b170e7 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go @@ -540,7 +540,7 @@ func TestGetContextForNewRequest(t *testing.T) { } location.Path = req.URL.Path - newReq, cancelFn := newRequestForProxy(location, req) + newReq, cancelFn := newRequestForProxy(location, req, true) defer cancelFn() theproxy := proxy.NewUpgradeAwareHandler(location, server.Client().Transport, true, false, &responder{w: w}) From 275f5cf5a0aae36e92388f60b794a111bd6a9889 Mon Sep 17 00:00:00 2001 From: David Eads Date: Thu, 29 Aug 2019 16:37:06 -0400 Subject: [PATCH 4/5] use the same context for aggregated and proxy requests --- .../apimachinery/pkg/util/proxy/upgradeaware.go | 12 ++---------- .../kube-aggregator/pkg/apiserver/handler_proxy.go | 14 +++++--------- 2 files changed, 7 insertions(+), 19 deletions(-) diff --git a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go index 4ce687393dd..fcdc76a0529 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go @@ -19,7 +19,6 @@ package proxy import ( "bufio" "bytes" - "context" "fmt" "io" "io/ioutil" @@ -222,15 +221,8 @@ func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request h.Transport = h.defaultProxyTransport(req.URL, h.Transport) } - // WithContext creates a shallow clone of the request with the new context. - ctx := context.Background() - // if the original request has a deadline, we should honor that deadline for our proxied request - if deadline, ok := req.Context().Deadline(); ok { - var cancelFn context.CancelFunc - ctx, cancelFn = context.WithDeadline(ctx, deadline) - defer cancelFn() - } - newReq := req.WithContext(ctx) + // WithContext creates a shallow clone of the request with the same context. + newReq := req.WithContext(req.Context()) newReq.Header = utilnet.CloneHeader(req.Header) if !h.UseRequestLocation { newReq.URL = &loc diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go index 6dbe553d2ad..9576cdd84e9 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go @@ -178,23 +178,19 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { // newRequestForProxy returns a shallow copy of the original request with a context that may include a timeout for discovery requests func newRequestForProxy(location *url.URL, req *http.Request, enableAggregatedDiscoveryTimeout bool) (*http.Request, context.CancelFunc) { - newCtx := context.Background() + newCtx := req.Context() cancelFn := func() {} - // if the original request has a deadline, we should honor that deadline for our proxied request - if deadline, ok := req.Context().Deadline(); ok { - newCtx, cancelFn = context.WithDeadline(newCtx, deadline) - - // trim leading and trailing slashes. Then "/apis/group/version" requests are for discovery, so if we have exactly three - // segments that we are going to proxy, we have a discovery request. - } else if enableAggregatedDiscoveryTimeout && len(strings.Split(strings.Trim(req.URL.Path, "/"), "/")) == 3 { + // trim leading and trailing slashes. Then "/apis/group/version" requests are for discovery, so if we have exactly three + // segments that we are going to proxy, we have a discovery request. + if enableAggregatedDiscoveryTimeout && len(strings.Split(strings.Trim(req.URL.Path, "/"), "/")) == 3 { // discovery requests are used by kubectl and others to determine which resources a server has. This is a cheap call that // should be fast for every aggregated apiserver. Latency for aggregation is expected to be low (as for all extensions) // so forcing a short timeout here helps responsiveness of all clients. newCtx, cancelFn = context.WithTimeout(newCtx, aggregatedDiscoveryTimeout) } - // WithContext creates a shallow clone of the request with the new context. + // WithContext creates a shallow clone of the request with the same context. newReq := req.WithContext(newCtx) newReq.Header = utilnet.CloneHeader(req.Header) newReq.URL = location From c24a36610eaf3a93a1f3ffe7a43dd660ee6a345d Mon Sep 17 00:00:00 2001 From: David Eads Date: Thu, 29 Aug 2019 17:02:23 -0400 Subject: [PATCH 5/5] use the existing request info --- .../pkg/apiserver/handler_proxy.go | 16 +++++++++------- .../pkg/apiserver/handler_proxy_test.go | 3 ++- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go index 9576cdd84e9..9d5cc5f0479 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go @@ -181,13 +181,15 @@ func newRequestForProxy(location *url.URL, req *http.Request, enableAggregatedDi newCtx := req.Context() cancelFn := func() {} - // trim leading and trailing slashes. Then "/apis/group/version" requests are for discovery, so if we have exactly three - // segments that we are going to proxy, we have a discovery request. - if enableAggregatedDiscoveryTimeout && len(strings.Split(strings.Trim(req.URL.Path, "/"), "/")) == 3 { - // discovery requests are used by kubectl and others to determine which resources a server has. This is a cheap call that - // should be fast for every aggregated apiserver. Latency for aggregation is expected to be low (as for all extensions) - // so forcing a short timeout here helps responsiveness of all clients. - newCtx, cancelFn = context.WithTimeout(newCtx, aggregatedDiscoveryTimeout) + if requestInfo, ok := genericapirequest.RequestInfoFrom(req.Context()); ok { + // trim leading and trailing slashes. Then "/apis/group/version" requests are for discovery, so if we have exactly three + // segments that we are going to proxy, we have a discovery request. + if enableAggregatedDiscoveryTimeout && !requestInfo.IsResourceRequest && len(strings.Split(strings.Trim(requestInfo.Path, "/"), "/")) == 3 { + // discovery requests are used by kubectl and others to determine which resources a server has. This is a cheap call that + // should be fast for every aggregated apiserver. Latency for aggregation is expected to be low (as for all extensions) + // so forcing a short timeout here helps responsiveness of all clients. + newCtx, cancelFn = context.WithTimeout(newCtx, aggregatedDiscoveryTimeout) + } } // WithContext creates a shallow clone of the request with the same context. diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go index 5df58b170e7..f9390923c09 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go @@ -540,7 +540,8 @@ func TestGetContextForNewRequest(t *testing.T) { } location.Path = req.URL.Path - newReq, cancelFn := newRequestForProxy(location, req, true) + nestedReq := req.WithContext(genericapirequest.WithRequestInfo(req.Context(), &genericapirequest.RequestInfo{Path: req.URL.Path})) + newReq, cancelFn := newRequestForProxy(location, nestedReq, true) defer cancelFn() theproxy := proxy.NewUpgradeAwareHandler(location, server.Client().Transport, true, false, &responder{w: w})