mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
add a timeout for proxying discovery requests
This commit is contained in:
parent
c4f39177e0
commit
aaebd67a63
@ -223,7 +223,14 @@ func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
|
||||
}
|
||||
|
||||
// WithContext creates a shallow clone of the request with the new context.
|
||||
newReq := req.WithContext(context.Background())
|
||||
ctx := context.Background()
|
||||
// if the original request has a deadline, we should honor that deadline for our proxied request
|
||||
if deadline, ok := req.Context().Deadline(); ok {
|
||||
var cancelFn context.CancelFunc
|
||||
ctx, cancelFn = context.WithDeadline(ctx, deadline)
|
||||
defer cancelFn()
|
||||
}
|
||||
newReq := req.WithContext(ctx)
|
||||
newReq.Header = utilnet.CloneHeader(req.Header)
|
||||
if !h.UseRequestLocation {
|
||||
newReq.URL = &loc
|
||||
|
@ -18,6 +18,7 @@ go_test(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/proxy:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||
|
@ -18,10 +18,11 @@ package apiserver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"k8s.io/klog"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/httpstream"
|
||||
@ -35,11 +36,16 @@ import (
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/transport"
|
||||
"k8s.io/klog"
|
||||
apiregistrationv1api "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
|
||||
apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
|
||||
)
|
||||
|
||||
const aggregatorComponent string = "aggregator"
|
||||
const (
|
||||
aggregatorComponent string = "aggregator"
|
||||
|
||||
aggregatedDiscoveryTimeout = 5 * time.Second
|
||||
)
|
||||
|
||||
// proxyHandler provides a http.Handler which will proxy traffic to locations
|
||||
// specified by items implementing Redirector.
|
||||
@ -140,11 +146,8 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
location.Path = req.URL.Path
|
||||
location.RawQuery = req.URL.Query().Encode()
|
||||
|
||||
// WithContext creates a shallow clone of the request with the new context.
|
||||
newReq := req.WithContext(context.Background())
|
||||
newReq.Header = utilnet.CloneHeader(req.Header)
|
||||
newReq.URL = location
|
||||
newReq.Host = location.Host
|
||||
newReq, cancelFn := newRequestForProxy(location, req)
|
||||
defer cancelFn()
|
||||
|
||||
if handlingInfo.proxyRoundTripper == nil {
|
||||
proxyError(w, req, "", http.StatusNotFound)
|
||||
@ -171,6 +174,33 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
handler.ServeHTTP(w, newReq)
|
||||
}
|
||||
|
||||
// newRequestForProxy returns a shallow copy of the original request with a context that may include a timeout for discovery requests
|
||||
func newRequestForProxy(location *url.URL, req *http.Request) (*http.Request, context.CancelFunc) {
|
||||
newCtx := context.Background()
|
||||
cancelFn := func() {}
|
||||
|
||||
// if the original request has a deadline, we should honor that deadline for our proxied request
|
||||
if deadline, ok := req.Context().Deadline(); ok {
|
||||
newCtx, cancelFn = context.WithDeadline(newCtx, deadline)
|
||||
|
||||
// trim leading and trailing slashes. Then "/apis/group/version" requests are for discovery, so if we have exactly three
|
||||
// segments that we are going to proxy, we have a discovery request.
|
||||
} else if len(strings.Split(strings.Trim(req.URL.Path, "/"), "/")) == 3 {
|
||||
// discovery requests are used by kubectl and others to determine which resources a server has. This is a cheap call that
|
||||
// should be fast for every aggregated apiserver. Latency for aggregation is expected to be low (as for all extensions)
|
||||
// so forcing a short timeout here helps responsiveness of all clients.
|
||||
newCtx, cancelFn = context.WithTimeout(newCtx, aggregatedDiscoveryTimeout)
|
||||
}
|
||||
|
||||
// WithContext creates a shallow clone of the request with the new context.
|
||||
newReq := req.WithContext(newCtx)
|
||||
newReq.Header = utilnet.CloneHeader(req.Header)
|
||||
newReq.URL = location
|
||||
newReq.Host = location.Host
|
||||
|
||||
return newReq, cancelFn
|
||||
}
|
||||
|
||||
// maybeWrapForConnectionUpgrades wraps the roundtripper for upgrades. The bool indicates if it was wrapped
|
||||
func maybeWrapForConnectionUpgrades(restConfig *restclient.Config, rt http.RoundTripper, req *http.Request) (http.RoundTripper, bool, error) {
|
||||
if !httpstream.IsUpgradeRequest(req) {
|
||||
|
@ -20,7 +20,6 @@ import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"k8s.io/utils/pointer"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/http/httputil"
|
||||
@ -33,10 +32,12 @@ import (
|
||||
"golang.org/x/net/websocket"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/proxy"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apiserver/pkg/authentication/user"
|
||||
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
apiregistration "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
|
||||
"k8s.io/utils/pointer"
|
||||
)
|
||||
|
||||
type targetHTTPHandler struct {
|
||||
@ -523,3 +524,44 @@ gR0TAoGACFOvhl8txfbkwLeuNeunyOPL7J4nIccthgd2ioFOr3HTou6wzN++vYTa
|
||||
a3OF9jH5Z7m6X1rrwn6J1+Gw9sBme38/GeGXHigsBI/8WaTvyuppyVIXOVPoTvVf
|
||||
VYsTwo5YgV1HzDkV+BNmBCw1GYcGXAElhJI+dCsgQuuU6TKzgl8=
|
||||
-----END RSA PRIVATE KEY-----`)
|
||||
|
||||
func TestGetContextForNewRequest(t *testing.T) {
|
||||
done := make(chan struct{})
|
||||
server := httptest.NewTLSServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {
|
||||
<-done // never return so that we're certain to return base on timeout
|
||||
}))
|
||||
defer server.Close()
|
||||
defer close(done)
|
||||
|
||||
proxyServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
location, err := url.Parse(server.URL)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
location.Path = req.URL.Path
|
||||
|
||||
newReq, cancelFn := newRequestForProxy(location, req)
|
||||
defer cancelFn()
|
||||
|
||||
theproxy := proxy.NewUpgradeAwareHandler(location, server.Client().Transport, true, false, &responder{w: w})
|
||||
theproxy.ServeHTTP(w, newReq)
|
||||
}))
|
||||
defer proxyServer.Close()
|
||||
|
||||
// normal clients will not be setting a timeout, don't set one here. Our proxy logic should construct this for us
|
||||
resp, err := proxyServer.Client().Get(proxyServer.URL + "/apis/group/version")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if resp.StatusCode != http.StatusServiceUnavailable {
|
||||
t.Error(err)
|
||||
}
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !strings.Contains(string(body), "Error trying to reach service: 'context deadline exceeded'") {
|
||||
t.Error(string(body))
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user