mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-05 10:19:50 +00:00
Merge pull request #82146 from deads2k/agg-discovery-timeout-2
add a timeout for proxying discovery requests
This commit is contained in:
commit
a7ac8d4c0b
@ -41,7 +41,7 @@ import (
|
|||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
kubeexternalinformers "k8s.io/client-go/informers"
|
kubeexternalinformers "k8s.io/client-go/informers"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
|
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
|
||||||
v1helper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
|
v1helper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
|
||||||
"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1"
|
"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1"
|
||||||
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
|
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
|
||||||
@ -50,6 +50,7 @@ import (
|
|||||||
informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1"
|
informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1"
|
||||||
"k8s.io/kube-aggregator/pkg/controllers/autoregister"
|
"k8s.io/kube-aggregator/pkg/controllers/autoregister"
|
||||||
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
|
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
|
||||||
|
kubefeatures "k8s.io/kubernetes/pkg/features"
|
||||||
"k8s.io/kubernetes/pkg/master/controller/crdregistration"
|
"k8s.io/kubernetes/pkg/master/controller/crdregistration"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -113,6 +114,7 @@ func createAggregatorConfig(
|
|||||||
ProxyClientKey: keyBytes,
|
ProxyClientKey: keyBytes,
|
||||||
ServiceResolver: serviceResolver,
|
ServiceResolver: serviceResolver,
|
||||||
ProxyTransport: proxyTransport,
|
ProxyTransport: proxyTransport,
|
||||||
|
EnableAggregatedDiscoveryTimeout: utilfeature.DefaultFeatureGate.Enabled(kubefeatures.EnableAggregatedDiscoveryTimeout),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -496,6 +496,13 @@ const (
|
|||||||
//
|
//
|
||||||
// Enables the startupProbe in kubelet worker.
|
// Enables the startupProbe in kubelet worker.
|
||||||
StartupProbe featuregate.Feature = "StartupProbe"
|
StartupProbe featuregate.Feature = "StartupProbe"
|
||||||
|
|
||||||
|
// owner @deads2k
|
||||||
|
// deprecated: v1.16
|
||||||
|
//
|
||||||
|
// Enable the aggregated discovery timeout to ensure client responsiveness. Note this feature is present
|
||||||
|
// only for backward compatibility, it will be removed in the 1.17 release.
|
||||||
|
EnableAggregatedDiscoveryTimeout featuregate.Feature = "EnableAggregatedDiscoveryTimeout"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -598,6 +605,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
|
|||||||
apiextensionsfeatures.CustomResourcePublishOpenAPI: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
|
apiextensionsfeatures.CustomResourcePublishOpenAPI: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
|
||||||
apiextensionsfeatures.CustomResourceDefaulting: {Default: true, PreRelease: featuregate.Beta},
|
apiextensionsfeatures.CustomResourceDefaulting: {Default: true, PreRelease: featuregate.Beta},
|
||||||
|
|
||||||
|
EnableAggregatedDiscoveryTimeout: {Default: true, PreRelease: featuregate.Deprecated},
|
||||||
|
|
||||||
// features that enable backwards compatibility but are scheduled to be removed
|
// features that enable backwards compatibility but are scheduled to be removed
|
||||||
// ...
|
// ...
|
||||||
HPAScaleToZero: {Default: false, PreRelease: featuregate.Alpha},
|
HPAScaleToZero: {Default: false, PreRelease: featuregate.Alpha},
|
||||||
|
@ -19,7 +19,6 @@ package proxy
|
|||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
@ -222,8 +221,8 @@ func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
|
|||||||
h.Transport = h.defaultProxyTransport(req.URL, h.Transport)
|
h.Transport = h.defaultProxyTransport(req.URL, h.Transport)
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithContext creates a shallow clone of the request with the new context.
|
// WithContext creates a shallow clone of the request with the same context.
|
||||||
newReq := req.WithContext(context.Background())
|
newReq := req.WithContext(req.Context())
|
||||||
newReq.Header = utilnet.CloneHeader(req.Header)
|
newReq.Header = utilnet.CloneHeader(req.Header)
|
||||||
if !h.UseRequestLocation {
|
if !h.UseRequestLocation {
|
||||||
newReq.URL = &loc
|
newReq.URL = &loc
|
||||||
|
@ -18,6 +18,7 @@ go_test(
|
|||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/proxy:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||||
|
@ -71,6 +71,8 @@ type ExtraConfig struct {
|
|||||||
|
|
||||||
// Mechanism by which the Aggregator will resolve services. Required.
|
// Mechanism by which the Aggregator will resolve services. Required.
|
||||||
ServiceResolver ServiceResolver
|
ServiceResolver ServiceResolver
|
||||||
|
|
||||||
|
EnableAggregatedDiscoveryTimeout bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Config represents the configuration needed to create an APIAggregator.
|
// Config represents the configuration needed to create an APIAggregator.
|
||||||
@ -132,6 +134,8 @@ type APIAggregator struct {
|
|||||||
|
|
||||||
// openAPIAggregationController downloads and merges OpenAPI specs.
|
// openAPIAggregationController downloads and merges OpenAPI specs.
|
||||||
openAPIAggregationController *openapicontroller.AggregationController
|
openAPIAggregationController *openapicontroller.AggregationController
|
||||||
|
|
||||||
|
enableAggregatedDiscoveryTimeout bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
|
// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
|
||||||
@ -183,6 +187,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
|
|||||||
APIRegistrationInformers: informerFactory,
|
APIRegistrationInformers: informerFactory,
|
||||||
serviceResolver: c.ExtraConfig.ServiceResolver,
|
serviceResolver: c.ExtraConfig.ServiceResolver,
|
||||||
openAPIConfig: openAPIConfig,
|
openAPIConfig: openAPIConfig,
|
||||||
|
enableAggregatedDiscoveryTimeout: c.ExtraConfig.EnableAggregatedDiscoveryTimeout,
|
||||||
}
|
}
|
||||||
|
|
||||||
apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter)
|
apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter)
|
||||||
@ -291,6 +296,7 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
|
|||||||
proxyClientKey: s.proxyClientKey,
|
proxyClientKey: s.proxyClientKey,
|
||||||
proxyTransport: s.proxyTransport,
|
proxyTransport: s.proxyTransport,
|
||||||
serviceResolver: s.serviceResolver,
|
serviceResolver: s.serviceResolver,
|
||||||
|
enableAggregatedDiscoveryTimeout: s.enableAggregatedDiscoveryTimeout,
|
||||||
}
|
}
|
||||||
proxyHandler.updateAPIService(apiService)
|
proxyHandler.updateAPIService(apiService)
|
||||||
if s.openAPIAggregationController != nil {
|
if s.openAPIAggregationController != nil {
|
||||||
|
@ -18,10 +18,11 @@ package apiserver
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"k8s.io/klog"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"strings"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/httpstream"
|
"k8s.io/apimachinery/pkg/util/httpstream"
|
||||||
@ -35,11 +36,16 @@ import (
|
|||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
restclient "k8s.io/client-go/rest"
|
restclient "k8s.io/client-go/rest"
|
||||||
"k8s.io/client-go/transport"
|
"k8s.io/client-go/transport"
|
||||||
|
"k8s.io/klog"
|
||||||
apiregistrationv1api "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
|
apiregistrationv1api "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
|
||||||
apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
|
apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
|
||||||
)
|
)
|
||||||
|
|
||||||
const aggregatorComponent string = "aggregator"
|
const (
|
||||||
|
aggregatorComponent string = "aggregator"
|
||||||
|
|
||||||
|
aggregatedDiscoveryTimeout = 5 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
// proxyHandler provides a http.Handler which will proxy traffic to locations
|
// proxyHandler provides a http.Handler which will proxy traffic to locations
|
||||||
// specified by items implementing Redirector.
|
// specified by items implementing Redirector.
|
||||||
@ -57,6 +63,8 @@ type proxyHandler struct {
|
|||||||
serviceResolver ServiceResolver
|
serviceResolver ServiceResolver
|
||||||
|
|
||||||
handlingInfo atomic.Value
|
handlingInfo atomic.Value
|
||||||
|
|
||||||
|
enableAggregatedDiscoveryTimeout bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type proxyHandlingInfo struct {
|
type proxyHandlingInfo struct {
|
||||||
@ -140,11 +148,8 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|||||||
location.Path = req.URL.Path
|
location.Path = req.URL.Path
|
||||||
location.RawQuery = req.URL.Query().Encode()
|
location.RawQuery = req.URL.Query().Encode()
|
||||||
|
|
||||||
// WithContext creates a shallow clone of the request with the new context.
|
newReq, cancelFn := newRequestForProxy(location, req, r.enableAggregatedDiscoveryTimeout)
|
||||||
newReq := req.WithContext(context.Background())
|
defer cancelFn()
|
||||||
newReq.Header = utilnet.CloneHeader(req.Header)
|
|
||||||
newReq.URL = location
|
|
||||||
newReq.Host = location.Host
|
|
||||||
|
|
||||||
if handlingInfo.proxyRoundTripper == nil {
|
if handlingInfo.proxyRoundTripper == nil {
|
||||||
proxyError(w, req, "", http.StatusNotFound)
|
proxyError(w, req, "", http.StatusNotFound)
|
||||||
@ -171,6 +176,31 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|||||||
handler.ServeHTTP(w, newReq)
|
handler.ServeHTTP(w, newReq)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// newRequestForProxy returns a shallow copy of the original request with a context that may include a timeout for discovery requests
|
||||||
|
func newRequestForProxy(location *url.URL, req *http.Request, enableAggregatedDiscoveryTimeout bool) (*http.Request, context.CancelFunc) {
|
||||||
|
newCtx := req.Context()
|
||||||
|
cancelFn := func() {}
|
||||||
|
|
||||||
|
if requestInfo, ok := genericapirequest.RequestInfoFrom(req.Context()); ok {
|
||||||
|
// trim leading and trailing slashes. Then "/apis/group/version" requests are for discovery, so if we have exactly three
|
||||||
|
// segments that we are going to proxy, we have a discovery request.
|
||||||
|
if enableAggregatedDiscoveryTimeout && !requestInfo.IsResourceRequest && len(strings.Split(strings.Trim(requestInfo.Path, "/"), "/")) == 3 {
|
||||||
|
// discovery requests are used by kubectl and others to determine which resources a server has. This is a cheap call that
|
||||||
|
// should be fast for every aggregated apiserver. Latency for aggregation is expected to be low (as for all extensions)
|
||||||
|
// so forcing a short timeout here helps responsiveness of all clients.
|
||||||
|
newCtx, cancelFn = context.WithTimeout(newCtx, aggregatedDiscoveryTimeout)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithContext creates a shallow clone of the request with the same context.
|
||||||
|
newReq := req.WithContext(newCtx)
|
||||||
|
newReq.Header = utilnet.CloneHeader(req.Header)
|
||||||
|
newReq.URL = location
|
||||||
|
newReq.Host = location.Host
|
||||||
|
|
||||||
|
return newReq, cancelFn
|
||||||
|
}
|
||||||
|
|
||||||
// maybeWrapForConnectionUpgrades wraps the roundtripper for upgrades. The bool indicates if it was wrapped
|
// maybeWrapForConnectionUpgrades wraps the roundtripper for upgrades. The bool indicates if it was wrapped
|
||||||
func maybeWrapForConnectionUpgrades(restConfig *restclient.Config, rt http.RoundTripper, req *http.Request) (http.RoundTripper, bool, error) {
|
func maybeWrapForConnectionUpgrades(restConfig *restclient.Config, rt http.RoundTripper, req *http.Request) (http.RoundTripper, bool, error) {
|
||||||
if !httpstream.IsUpgradeRequest(req) {
|
if !httpstream.IsUpgradeRequest(req) {
|
||||||
|
@ -20,7 +20,6 @@ import (
|
|||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"k8s.io/utils/pointer"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"net/http/httputil"
|
"net/http/httputil"
|
||||||
@ -33,10 +32,12 @@ import (
|
|||||||
"golang.org/x/net/websocket"
|
"golang.org/x/net/websocket"
|
||||||
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/util/proxy"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/apiserver/pkg/authentication/user"
|
"k8s.io/apiserver/pkg/authentication/user"
|
||||||
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||||
apiregistration "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
|
apiregistration "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
|
||||||
|
"k8s.io/utils/pointer"
|
||||||
)
|
)
|
||||||
|
|
||||||
type targetHTTPHandler struct {
|
type targetHTTPHandler struct {
|
||||||
@ -523,3 +524,45 @@ gR0TAoGACFOvhl8txfbkwLeuNeunyOPL7J4nIccthgd2ioFOr3HTou6wzN++vYTa
|
|||||||
a3OF9jH5Z7m6X1rrwn6J1+Gw9sBme38/GeGXHigsBI/8WaTvyuppyVIXOVPoTvVf
|
a3OF9jH5Z7m6X1rrwn6J1+Gw9sBme38/GeGXHigsBI/8WaTvyuppyVIXOVPoTvVf
|
||||||
VYsTwo5YgV1HzDkV+BNmBCw1GYcGXAElhJI+dCsgQuuU6TKzgl8=
|
VYsTwo5YgV1HzDkV+BNmBCw1GYcGXAElhJI+dCsgQuuU6TKzgl8=
|
||||||
-----END RSA PRIVATE KEY-----`)
|
-----END RSA PRIVATE KEY-----`)
|
||||||
|
|
||||||
|
func TestGetContextForNewRequest(t *testing.T) {
|
||||||
|
done := make(chan struct{})
|
||||||
|
server := httptest.NewTLSServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {
|
||||||
|
<-done // never return so that we're certain to return base on timeout
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
defer close(done)
|
||||||
|
|
||||||
|
proxyServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||||
|
location, err := url.Parse(server.URL)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
location.Path = req.URL.Path
|
||||||
|
|
||||||
|
nestedReq := req.WithContext(genericapirequest.WithRequestInfo(req.Context(), &genericapirequest.RequestInfo{Path: req.URL.Path}))
|
||||||
|
newReq, cancelFn := newRequestForProxy(location, nestedReq, true)
|
||||||
|
defer cancelFn()
|
||||||
|
|
||||||
|
theproxy := proxy.NewUpgradeAwareHandler(location, server.Client().Transport, true, false, &responder{w: w})
|
||||||
|
theproxy.ServeHTTP(w, newReq)
|
||||||
|
}))
|
||||||
|
defer proxyServer.Close()
|
||||||
|
|
||||||
|
// normal clients will not be setting a timeout, don't set one here. Our proxy logic should construct this for us
|
||||||
|
resp, err := proxyServer.Client().Get(proxyServer.URL + "/apis/group/version")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if resp.StatusCode != http.StatusServiceUnavailable {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
body, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if !strings.Contains(string(body), "Error trying to reach service: 'context deadline exceeded'") {
|
||||||
|
t.Error(string(body))
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user