diff --git a/cmd/kube-apiserver/app/aggregator.go b/cmd/kube-apiserver/app/aggregator.go index 7e360811f34..c19e5f248bb 100644 --- a/cmd/kube-apiserver/app/aggregator.go +++ b/cmd/kube-apiserver/app/aggregator.go @@ -41,7 +41,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" kubeexternalinformers "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" - "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" + v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" v1helper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper" "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1" aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver" @@ -50,6 +50,7 @@ import ( informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1" "k8s.io/kube-aggregator/pkg/controllers/autoregister" "k8s.io/kubernetes/cmd/kube-apiserver/app/options" + kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/master/controller/crdregistration" ) @@ -109,10 +110,11 @@ func createAggregatorConfig( SharedInformerFactory: externalInformers, }, ExtraConfig: aggregatorapiserver.ExtraConfig{ - ProxyClientCert: certBytes, - ProxyClientKey: keyBytes, - ServiceResolver: serviceResolver, - ProxyTransport: proxyTransport, + ProxyClientCert: certBytes, + ProxyClientKey: keyBytes, + ServiceResolver: serviceResolver, + ProxyTransport: proxyTransport, + EnableAggregatedDiscoveryTimeout: utilfeature.DefaultFeatureGate.Enabled(kubefeatures.EnableAggregatedDiscoveryTimeout), }, } diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 3769e103d37..a4dfbdd879d 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -496,6 +496,13 @@ const ( // // Enables the startupProbe in kubelet worker. StartupProbe featuregate.Feature = "StartupProbe" + + // owner @deads2k + // deprecated: v1.16 + // + // Enable the aggregated discovery timeout to ensure client responsiveness. Note this feature is present + // only for backward compatibility, it will be removed in the 1.17 release. + EnableAggregatedDiscoveryTimeout featuregate.Feature = "EnableAggregatedDiscoveryTimeout" ) func init() { @@ -598,6 +605,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS apiextensionsfeatures.CustomResourcePublishOpenAPI: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, apiextensionsfeatures.CustomResourceDefaulting: {Default: true, PreRelease: featuregate.Beta}, + EnableAggregatedDiscoveryTimeout: {Default: true, PreRelease: featuregate.Deprecated}, + // features that enable backwards compatibility but are scheduled to be removed // ... HPAScaleToZero: {Default: false, PreRelease: featuregate.Alpha}, diff --git a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go index ad8b160959e..fcdc76a0529 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go @@ -19,7 +19,6 @@ package proxy import ( "bufio" "bytes" - "context" "fmt" "io" "io/ioutil" @@ -222,8 +221,8 @@ func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request h.Transport = h.defaultProxyTransport(req.URL, h.Transport) } - // WithContext creates a shallow clone of the request with the new context. - newReq := req.WithContext(context.Background()) + // WithContext creates a shallow clone of the request with the same context. + newReq := req.WithContext(req.Context()) newReq.Header = utilnet.CloneHeader(req.Header) if !h.UseRequestLocation { newReq.URL = &loc diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD index 4795cff0deb..fdeac0d8e4a 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD @@ -18,6 +18,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/proxy:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library", diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index ec233788466..1804266787f 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -71,6 +71,8 @@ type ExtraConfig struct { // Mechanism by which the Aggregator will resolve services. Required. ServiceResolver ServiceResolver + + EnableAggregatedDiscoveryTimeout bool } // Config represents the configuration needed to create an APIAggregator. @@ -132,6 +134,8 @@ type APIAggregator struct { // openAPIAggregationController downloads and merges OpenAPI specs. openAPIAggregationController *openapicontroller.AggregationController + + enableAggregatedDiscoveryTimeout bool } // Complete fills in any fields not set that are required to have valid data. It's mutating the receiver. @@ -172,17 +176,18 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg ) s := &APIAggregator{ - GenericAPIServer: genericServer, - delegateHandler: delegationTarget.UnprotectedHandler(), - proxyClientCert: c.ExtraConfig.ProxyClientCert, - proxyClientKey: c.ExtraConfig.ProxyClientKey, - proxyTransport: c.ExtraConfig.ProxyTransport, - proxyHandlers: map[string]*proxyHandler{}, - handledGroups: sets.String{}, - lister: informerFactory.Apiregistration().V1().APIServices().Lister(), - APIRegistrationInformers: informerFactory, - serviceResolver: c.ExtraConfig.ServiceResolver, - openAPIConfig: openAPIConfig, + GenericAPIServer: genericServer, + delegateHandler: delegationTarget.UnprotectedHandler(), + proxyClientCert: c.ExtraConfig.ProxyClientCert, + proxyClientKey: c.ExtraConfig.ProxyClientKey, + proxyTransport: c.ExtraConfig.ProxyTransport, + proxyHandlers: map[string]*proxyHandler{}, + handledGroups: sets.String{}, + lister: informerFactory.Apiregistration().V1().APIServices().Lister(), + APIRegistrationInformers: informerFactory, + serviceResolver: c.ExtraConfig.ServiceResolver, + openAPIConfig: openAPIConfig, + enableAggregatedDiscoveryTimeout: c.ExtraConfig.EnableAggregatedDiscoveryTimeout, } apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter) @@ -286,11 +291,12 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error { // register the proxy handler proxyHandler := &proxyHandler{ - localDelegate: s.delegateHandler, - proxyClientCert: s.proxyClientCert, - proxyClientKey: s.proxyClientKey, - proxyTransport: s.proxyTransport, - serviceResolver: s.serviceResolver, + localDelegate: s.delegateHandler, + proxyClientCert: s.proxyClientCert, + proxyClientKey: s.proxyClientKey, + proxyTransport: s.proxyTransport, + serviceResolver: s.serviceResolver, + enableAggregatedDiscoveryTimeout: s.enableAggregatedDiscoveryTimeout, } proxyHandler.updateAPIService(apiService) if s.openAPIAggregationController != nil { diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go index 0fd1a5da58f..9d5cc5f0479 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go @@ -18,10 +18,11 @@ package apiserver import ( "context" - "k8s.io/klog" "net/http" "net/url" + "strings" "sync/atomic" + "time" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/httpstream" @@ -35,11 +36,16 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" restclient "k8s.io/client-go/rest" "k8s.io/client-go/transport" + "k8s.io/klog" apiregistrationv1api "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper" ) -const aggregatorComponent string = "aggregator" +const ( + aggregatorComponent string = "aggregator" + + aggregatedDiscoveryTimeout = 5 * time.Second +) // proxyHandler provides a http.Handler which will proxy traffic to locations // specified by items implementing Redirector. @@ -57,6 +63,8 @@ type proxyHandler struct { serviceResolver ServiceResolver handlingInfo atomic.Value + + enableAggregatedDiscoveryTimeout bool } type proxyHandlingInfo struct { @@ -140,11 +148,8 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { location.Path = req.URL.Path location.RawQuery = req.URL.Query().Encode() - // WithContext creates a shallow clone of the request with the new context. - newReq := req.WithContext(context.Background()) - newReq.Header = utilnet.CloneHeader(req.Header) - newReq.URL = location - newReq.Host = location.Host + newReq, cancelFn := newRequestForProxy(location, req, r.enableAggregatedDiscoveryTimeout) + defer cancelFn() if handlingInfo.proxyRoundTripper == nil { proxyError(w, req, "", http.StatusNotFound) @@ -171,6 +176,31 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { handler.ServeHTTP(w, newReq) } +// newRequestForProxy returns a shallow copy of the original request with a context that may include a timeout for discovery requests +func newRequestForProxy(location *url.URL, req *http.Request, enableAggregatedDiscoveryTimeout bool) (*http.Request, context.CancelFunc) { + newCtx := req.Context() + cancelFn := func() {} + + if requestInfo, ok := genericapirequest.RequestInfoFrom(req.Context()); ok { + // trim leading and trailing slashes. Then "/apis/group/version" requests are for discovery, so if we have exactly three + // segments that we are going to proxy, we have a discovery request. + if enableAggregatedDiscoveryTimeout && !requestInfo.IsResourceRequest && len(strings.Split(strings.Trim(requestInfo.Path, "/"), "/")) == 3 { + // discovery requests are used by kubectl and others to determine which resources a server has. This is a cheap call that + // should be fast for every aggregated apiserver. Latency for aggregation is expected to be low (as for all extensions) + // so forcing a short timeout here helps responsiveness of all clients. + newCtx, cancelFn = context.WithTimeout(newCtx, aggregatedDiscoveryTimeout) + } + } + + // WithContext creates a shallow clone of the request with the same context. + newReq := req.WithContext(newCtx) + newReq.Header = utilnet.CloneHeader(req.Header) + newReq.URL = location + newReq.Host = location.Host + + return newReq, cancelFn +} + // maybeWrapForConnectionUpgrades wraps the roundtripper for upgrades. The bool indicates if it was wrapped func maybeWrapForConnectionUpgrades(restConfig *restclient.Config, rt http.RoundTripper, req *http.Request) (http.RoundTripper, bool, error) { if !httpstream.IsUpgradeRequest(req) { diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go index 761f30a18a9..f9390923c09 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go @@ -20,7 +20,6 @@ import ( "crypto/tls" "fmt" "io/ioutil" - "k8s.io/utils/pointer" "net/http" "net/http/httptest" "net/http/httputil" @@ -33,10 +32,12 @@ import ( "golang.org/x/net/websocket" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/proxy" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/authentication/user" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" apiregistration "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" + "k8s.io/utils/pointer" ) type targetHTTPHandler struct { @@ -523,3 +524,45 @@ gR0TAoGACFOvhl8txfbkwLeuNeunyOPL7J4nIccthgd2ioFOr3HTou6wzN++vYTa a3OF9jH5Z7m6X1rrwn6J1+Gw9sBme38/GeGXHigsBI/8WaTvyuppyVIXOVPoTvVf VYsTwo5YgV1HzDkV+BNmBCw1GYcGXAElhJI+dCsgQuuU6TKzgl8= -----END RSA PRIVATE KEY-----`) + +func TestGetContextForNewRequest(t *testing.T) { + done := make(chan struct{}) + server := httptest.NewTLSServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) { + <-done // never return so that we're certain to return base on timeout + })) + defer server.Close() + defer close(done) + + proxyServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + location, err := url.Parse(server.URL) + if err != nil { + t.Fatal(err) + } + location.Path = req.URL.Path + + nestedReq := req.WithContext(genericapirequest.WithRequestInfo(req.Context(), &genericapirequest.RequestInfo{Path: req.URL.Path})) + newReq, cancelFn := newRequestForProxy(location, nestedReq, true) + defer cancelFn() + + theproxy := proxy.NewUpgradeAwareHandler(location, server.Client().Transport, true, false, &responder{w: w}) + theproxy.ServeHTTP(w, newReq) + })) + defer proxyServer.Close() + + // normal clients will not be setting a timeout, don't set one here. Our proxy logic should construct this for us + resp, err := proxyServer.Client().Get(proxyServer.URL + "/apis/group/version") + if err != nil { + t.Fatal(err) + } + if resp.StatusCode != http.StatusServiceUnavailable { + t.Error(err) + } + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatal(err) + } + if !strings.Contains(string(body), "Error trying to reach service: 'context deadline exceeded'") { + t.Error(string(body)) + } + +}