From 8a3b4d81e6c3a74fa1afa5fd17d3bf42ba1e856d Mon Sep 17 00:00:00 2001 From: deads2k Date: Fri, 21 Jul 2017 12:56:28 -0400 Subject: [PATCH] rate limiting should not affect system masters --- .../src/k8s.io/apiserver/pkg/server/config.go | 2 +- .../k8s.io/apiserver/pkg/server/filters/BUILD | 2 + .../pkg/server/filters/maxinflight.go | 13 +++ .../pkg/server/filters/maxinflight_test.go | 83 ++++++++++++++++++- 4 files changed, 96 insertions(+), 4 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 3d06f9ab0ef..a0bf338f5d5 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -468,6 +468,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { handler := genericapifilters.WithAuthorization(apiHandler, c.RequestContextMapper, c.Authorizer, c.Serializer) + handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.RequestContextMapper, c.LongRunningFunc) handler = genericapifilters.WithImpersonation(handler, c.RequestContextMapper, c.Authorizer, c.Serializer) if utilfeature.DefaultFeatureGate.Enabled(features.AdvancedAuditing) { handler = genericapifilters.WithAudit(handler, c.RequestContextMapper, c.AuditBackend, c.AuditPolicyChecker, c.LongRunningFunc) @@ -477,7 +478,6 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { handler = genericapifilters.WithAuthentication(handler, c.RequestContextMapper, c.Authenticator, genericapifilters.Unauthorized(c.RequestContextMapper, c.Serializer, c.SupportsBasicAuth)) handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true") handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.RequestContextMapper, c.LongRunningFunc) - handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.RequestContextMapper, c.LongRunningFunc) handler = genericapifilters.WithRequestInfo(handler, NewRequestInfoResolver(c), c.RequestContextMapper) handler = apirequest.WithRequestContext(handler, c.RequestContextMapper) handler = genericfilters.WithPanicRecovery(handler) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD b/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD index 77e148ed7af..2db11df9f77 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD @@ -24,6 +24,7 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library", "//vendor/k8s.io/apiserver/pkg/endpoints/filters:go_default_library", "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library", ], @@ -48,6 +49,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library", "//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library", "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library", "//vendor/k8s.io/apiserver/pkg/server/httplog:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go index 346d7fbdac9..960d400d787 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints/metrics" apirequest "k8s.io/apiserver/pkg/endpoints/request" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" @@ -91,11 +92,23 @@ func WithMaxInFlightLimit( if c == nil { handler.ServeHTTP(w, r) } else { + select { case c <- true: defer func() { <-c }() handler.ServeHTTP(w, r) + default: + // at this point we're about to return a 429, BUT not all actors should be rate limited. A system:master is so powerful + // that he should always get an answer. It's a super-admin or a loopback connection. + if currUser, ok := apirequest.UserFrom(ctx); ok { + for _, group := range currUser.GetGroups() { + if group == user.SystemPrivilegedGroup { + handler.ServeHTTP(w, r) + return + } + } + } metrics.MonitorRequest(r, strings.ToUpper(requestInfo.Verb), requestInfo.Resource, requestInfo.Subresource, "", errors.StatusTooManyRequests, time.Now()) tooManyRequests(r, w) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight_test.go index eb5debfbaa1..79e185e818c 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight_test.go @@ -26,12 +26,12 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/authentication/user" apifilters "k8s.io/apiserver/pkg/endpoints/filters" apirequest "k8s.io/apiserver/pkg/endpoints/request" ) func createMaxInflightServer(callsWg, blockWg *sync.WaitGroup, disableCallsWg *bool, disableCallsWgMutex *sync.Mutex, nonMutating, mutating int) *httptest.Server { - longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy")) requestContextMapper := apirequest.NewRequestContextMapper() @@ -55,12 +55,30 @@ func createMaxInflightServer(callsWg, blockWg *sync.WaitGroup, disableCallsWg *b requestContextMapper, longRunningRequestCheck, ) + handler = withFakeUser(handler, requestContextMapper) handler = apifilters.WithRequestInfo(handler, requestInfoFactory, requestContextMapper) handler = apirequest.WithRequestContext(handler, requestContextMapper) return httptest.NewServer(handler) } +func withFakeUser(handler http.Handler, requestContextMapper apirequest.RequestContextMapper) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx, ok := requestContextMapper.Get(r) + if !ok { + handleError(w, r, fmt.Errorf("no context found for request, handler chain must be wrong")) + return + } + + if len(r.Header["Groups"]) > 0 { + requestContextMapper.Update(r, apirequest.WithUser(ctx, &user.DefaultInfo{ + Groups: r.Header["Groups"], + })) + } + handler.ServeHTTP(w, r) + }) +} + // Tests that MaxInFlightLimit works, i.e. // - "long" requests such as proxy or watch, identified by regexp are not accounted despite // hanging for the long time, @@ -228,8 +246,16 @@ func expectHTTPGet(url string, code int) error { } // We use POST as a sample mutating request. -func expectHTTPPost(url string, code int) error { - r, err := http.Post(url, "text/html", strings.NewReader("foo bar")) +func expectHTTPPost(url string, code int, groups ...string) error { + req, err := http.NewRequest(http.MethodPost, url, strings.NewReader("foo bar")) + if err != nil { + return err + } + for _, group := range groups { + req.Header.Add("Groups", group) + } + + r, err := http.DefaultClient.Do(req) if err != nil { return fmt.Errorf("unexpected error: %v", err) } @@ -238,3 +264,54 @@ func expectHTTPPost(url string, code int) error { } return nil } + +func TestMaxInFlightSkipsMasters(t *testing.T) { + const AllowedMutatingInflightRequestsNo = 3 + + calls := &sync.WaitGroup{} + calls.Add(AllowedMutatingInflightRequestsNo) + + responses := &sync.WaitGroup{} + responses.Add(AllowedMutatingInflightRequestsNo) + + // Block is used to keep requests in flight for as long as we need to. All requests will + // be unblocked at the same time. + block := &sync.WaitGroup{} + block.Add(1) + + waitForCalls := true + waitForCallsMutex := sync.Mutex{} + + server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo) + defer server.Close() + + // These should hang and be accounted, i.e. saturate the server + for i := 0; i < AllowedMutatingInflightRequestsNo; i++ { + // These should hang waiting on block... + go func() { + if err := expectHTTPPost(server.URL+"/foo/bar", http.StatusOK); err != nil { + t.Error(err) + } + responses.Done() + }() + } + // We wait for all calls to be received by the server + calls.Wait() + // Disable calls notifications in the server + // Disable calls notifications in the server + waitForCallsMutex.Lock() + waitForCalls = false + waitForCallsMutex.Unlock() + + // Do this multiple times to show that rate limit rejected requests don't block. + for i := 0; i < 2; i++ { + if err := expectHTTPPost(server.URL+"/dontwait", http.StatusOK, user.SystemPrivilegedGroup); err != nil { + t.Error(err) + } + } + + // Let all hanging requests finish + block.Done() + + responses.Wait() +}