From 4762acbd1ef9666ca31a0b32e9e7d319ae3943e4 Mon Sep 17 00:00:00 2001 From: gmarek Date: Wed, 2 Nov 2016 14:35:16 +0100 Subject: [PATCH] Split inflight requests into read-only and mutating groups --- hack/verify-flags/known-flags.txt | 1 + pkg/genericapiserver/config.go | 10 +- pkg/genericapiserver/filters/BUILD | 8 +- pkg/genericapiserver/filters/maxinflight.go | 68 ++++++- .../filters/maxinflight_test.go | 179 ++++++++++++++---- .../options/server_run_options.go | 90 +++++---- 6 files changed, 263 insertions(+), 93 deletions(-) diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index 3e4f162b3d7..e7872d05972 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -363,6 +363,7 @@ max-connection-bytes-per-sec max-log-age max-log-backups max-log-size +max-mutating-requests-inflight max-open-files max-outgoing-burst max-outgoing-qps diff --git a/pkg/genericapiserver/config.go b/pkg/genericapiserver/config.go index e19f8419a0b..5bd2940761f 100644 --- a/pkg/genericapiserver/config.go +++ b/pkg/genericapiserver/config.go @@ -141,8 +141,11 @@ type Config struct { OpenAPIConfig *common.Config // MaxRequestsInFlight is the maximum number of parallel non-long-running requests. Every further - // request has to wait. + // request has to wait. Applies only to non-mutating requests. MaxRequestsInFlight int + // MaxMutatingRequestsInFlight is the maximum number of parallel mutating requests. Every further + // request has to wait. + MaxMutatingRequestsInFlight int // Predicate which is true for paths of long-running http requests LongRunningFunc genericfilters.LongRunningRequestCheck @@ -320,6 +323,7 @@ func (c *Config) ApplyOptions(options *options.ServerRunOptions) *Config { c.EnableSwaggerUI = options.EnableSwaggerUI c.ExternalAddress = options.ExternalHost c.MaxRequestsInFlight = options.MaxRequestsInFlight + c.MaxMutatingRequestsInFlight = options.MaxMutatingRequestsInFlight c.MinRequestTimeout = options.MinRequestTimeout c.PublicAddress = options.AdvertiseAddress @@ -483,10 +487,10 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) (secure, insec generic := func(handler http.Handler) http.Handler { handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true") handler = genericfilters.WithPanicRecovery(handler, c.RequestContextMapper) + handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc) + handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.RequestContextMapper, c.LongRunningFunc) handler = apiserverfilters.WithRequestInfo(handler, NewRequestInfoResolver(c), c.RequestContextMapper) handler = api.WithRequestContext(handler, c.RequestContextMapper) - handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc) - handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.LongRunningFunc) return handler } audit := func(handler http.Handler) http.Handler { diff --git a/pkg/genericapiserver/filters/BUILD b/pkg/genericapiserver/filters/BUILD index 8e1cf74bda7..1fcf8369ffb 100644 --- a/pkg/genericapiserver/filters/BUILD +++ b/pkg/genericapiserver/filters/BUILD @@ -28,6 +28,7 @@ go_library( "//pkg/httplog:go_default_library", "//pkg/util:go_default_library", "//pkg/util/runtime:go_default_library", + "//pkg/util/sets:go_default_library", "//vendor:github.com/golang/glog", ], ) @@ -41,5 +42,10 @@ go_test( ], library = "go_default_library", tags = ["automanaged"], - deps = ["//pkg/api/errors:go_default_library"], + deps = [ + "//pkg/api:go_default_library", + "//pkg/api/errors:go_default_library", + "//pkg/apiserver/filters:go_default_library", + "//pkg/apiserver/request:go_default_library", + ], ) diff --git a/pkg/genericapiserver/filters/maxinflight.go b/pkg/genericapiserver/filters/maxinflight.go index 12f3cfc176b..30a337fb285 100644 --- a/pkg/genericapiserver/filters/maxinflight.go +++ b/pkg/genericapiserver/filters/maxinflight.go @@ -17,34 +17,86 @@ limitations under the License. package filters import ( + "fmt" "net/http" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/apiserver/request" "k8s.io/kubernetes/pkg/httplog" + "k8s.io/kubernetes/pkg/util/sets" + + "github.com/golang/glog" ) // Constant for the retry-after interval on rate limiting. // TODO: maybe make this dynamic? or user-adjustable? const retryAfter = "1" +var nonMutatingRequestVerbs = sets.NewString("get", "list", "watch") + +func handleError(w http.ResponseWriter, r *http.Request, err error) { + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprintf(w, "Internal Server Error: %#v", r.RequestURI) + glog.Errorf(err.Error()) +} + // WithMaxInFlightLimit limits the number of in-flight requests to buffer size of the passed in channel. -func WithMaxInFlightLimit(handler http.Handler, limit int, longRunningRequestCheck LongRunningRequestCheck) http.Handler { - if limit == 0 { +func WithMaxInFlightLimit( + handler http.Handler, + nonMutatingLimit int, + mutatingLimit int, + requestContextMapper api.RequestContextMapper, + longRunningRequestCheck LongRunningRequestCheck, +) http.Handler { + if nonMutatingLimit == 0 && mutatingLimit == 0 { return handler } - c := make(chan bool, limit) + var nonMutatingChan chan bool + var mutatingChan chan bool + if nonMutatingLimit != 0 { + nonMutatingChan = make(chan bool, nonMutatingLimit) + } + if mutatingLimit != 0 { + mutatingChan = make(chan bool, mutatingLimit) + } + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // TODO: migrate to use requestInfo instead of having custom request parser. if longRunningRequestCheck(r) { // Skip tracking long running events. handler.ServeHTTP(w, r) return } - select { - case c <- true: - defer func() { <-c }() + + ctx, ok := requestContextMapper.Get(r) + if !ok { + handleError(w, r, fmt.Errorf("no context found for request, handler chain must be wrong")) + return + } + requestInfo, ok := request.RequestInfoFrom(ctx) + if !ok { + handleError(w, r, fmt.Errorf("no RequestInfo found in context, handler chain must be wrong")) + return + } + + var c chan bool + if !nonMutatingRequestVerbs.Has(requestInfo.Verb) { + c = mutatingChan + } else { + c = nonMutatingChan + } + + if c == nil { handler.ServeHTTP(w, r) - default: - tooManyRequests(r, w) + } else { + select { + case c <- true: + defer func() { <-c }() + handler.ServeHTTP(w, r) + default: + tooManyRequests(r, w) + } } }) } diff --git a/pkg/genericapiserver/filters/maxinflight_test.go b/pkg/genericapiserver/filters/maxinflight_test.go index d5b2bed5bed..97c272aebc4 100644 --- a/pkg/genericapiserver/filters/maxinflight_test.go +++ b/pkg/genericapiserver/filters/maxinflight_test.go @@ -25,9 +25,46 @@ import ( "sync" "testing" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" + apiserverfilters "k8s.io/kubernetes/pkg/apiserver/filters" + "k8s.io/kubernetes/pkg/apiserver/request" ) +func createMaxInflightServer(callsWg, blockWg *sync.WaitGroup, disableCallsWg *bool, disableCallsWgMutex *sync.Mutex, nonMutating, mutating int) *httptest.Server { + + // notAccountedPathsRegexp specifies paths requests to which we don't account into + // requests in flight. + notAccountedPathsRegexp := regexp.MustCompile(".*\\/watch") + longRunningRequestCheck := BasicLongRunningRequestCheck(notAccountedPathsRegexp, map[string]string{"watch": "true"}) + + requestContextMapper := api.NewRequestContextMapper() + requestInfoFactory := &request.RequestInfoFactory{} + handler := WithMaxInFlightLimit( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // A short, accounted request that does not wait for block WaitGroup. + if strings.Contains(r.URL.Path, "dontwait") { + return + } + disableCallsWgMutex.Lock() + waitForCalls := *disableCallsWg + disableCallsWgMutex.Unlock() + if waitForCalls { + callsWg.Done() + } + blockWg.Wait() + }), + nonMutating, + mutating, + requestContextMapper, + longRunningRequestCheck, + ) + handler = apiserverfilters.WithRequestInfo(handler, requestInfoFactory, requestContextMapper) + handler = api.WithRequestContext(handler, requestContextMapper) + + return httptest.NewServer(handler) +} + // Tests that MaxInFlightLimit works, i.e. // - "long" requests such as proxy or watch, identified by regexp are not accounted despite // hanging for the long time, @@ -36,69 +73,53 @@ import ( // - subsequent "short" requests are rejected instantly with appropriate error, // - subsequent "long" requests are handled normally, // - we correctly recover after some "short" requests finish, i.e. we can process new ones. -func TestMaxInFlight(t *testing.T) { - const AllowedInflightRequestsNo = 3 - - // notAccountedPathsRegexp specifies paths requests to which we don't account into - // requests in flight. - notAccountedPathsRegexp := regexp.MustCompile(".*\\/watch") - longRunningRequestCheck := BasicLongRunningRequestCheck(notAccountedPathsRegexp, map[string]string{"watch": "true"}) +func TestMaxInFlightNonMutating(t *testing.T) { + const AllowedNonMutatingInflightRequestsNo = 3 // Calls is used to wait until all server calls are received. We are sending - // AllowedInflightRequestsNo of 'long' not-accounted requests and the same number of + // AllowedNonMutatingInflightRequestsNo of 'long' not-accounted requests and the same number of // 'short' accounted ones. calls := &sync.WaitGroup{} - calls.Add(AllowedInflightRequestsNo * 2) + calls.Add(AllowedNonMutatingInflightRequestsNo * 2) // Responses is used to wait until all responses are // received. This prevents some async requests getting EOF // errors from prematurely closing the server - responses := sync.WaitGroup{} - responses.Add(AllowedInflightRequestsNo * 2) + responses := &sync.WaitGroup{} + responses.Add(AllowedNonMutatingInflightRequestsNo * 2) // Block is used to keep requests in flight for as long as we need to. All requests will // be unblocked at the same time. - block := sync.WaitGroup{} + block := &sync.WaitGroup{} block.Add(1) - server := httptest.NewServer( - WithMaxInFlightLimit( - http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // A short, accounted request that does not wait for block WaitGroup. - if strings.Contains(r.URL.Path, "dontwait") { - return - } - if calls != nil { - calls.Done() - } - block.Wait() - }), - AllowedInflightRequestsNo, - longRunningRequestCheck, - ), - ) + waitForCalls := true + waitForCallsMutex := sync.Mutex{} + + server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, AllowedNonMutatingInflightRequestsNo, 1) defer server.Close() // These should hang, but not affect accounting. use a query param match - for i := 0; i < AllowedInflightRequestsNo; i++ { + for i := 0; i < AllowedNonMutatingInflightRequestsNo; i++ { // These should hang waiting on block... go func() { - if err := expectHTTP(server.URL+"/foo/bar?watch=true", http.StatusOK); err != nil { + if err := expectHTTPGet(server.URL+"/foo/bar?watch=true", http.StatusOK); err != nil { t.Error(err) } responses.Done() }() } + // Check that sever is not saturated by not-accounted calls - if err := expectHTTP(server.URL+"/dontwait", http.StatusOK); err != nil { + if err := expectHTTPGet(server.URL+"/dontwait", http.StatusOK); err != nil { t.Error(err) } // These should hang and be accounted, i.e. saturate the server - for i := 0; i < AllowedInflightRequestsNo; i++ { + for i := 0; i < AllowedNonMutatingInflightRequestsNo; i++ { // These should hang waiting on block... go func() { - if err := expectHTTP(server.URL, http.StatusOK); err != nil { + if err := expectHTTPGet(server.URL, http.StatusOK); err != nil { t.Error(err) } responses.Done() @@ -107,16 +128,23 @@ func TestMaxInFlight(t *testing.T) { // We wait for all calls to be received by the server calls.Wait() // Disable calls notifications in the server - calls = nil + waitForCallsMutex.Lock() + waitForCalls = false + waitForCallsMutex.Unlock() - // Do this multiple times to show that it rate limit rejected requests don't block. + // Do this multiple times to show that rate limit rejected requests don't block. for i := 0; i < 2; i++ { - if err := expectHTTP(server.URL, errors.StatusTooManyRequests); err != nil { + if err := expectHTTPGet(server.URL, errors.StatusTooManyRequests); err != nil { t.Error(err) } } // Validate that non-accounted URLs still work. use a path regex match - if err := expectHTTP(server.URL+"/dontwait/watch", http.StatusOK); err != nil { + if err := expectHTTPGet(server.URL+"/dontwait/watch", http.StatusOK); err != nil { + t.Error(err) + } + + // We should allow a single mutating request. + if err := expectHTTPPost(server.URL+"/dontwait", http.StatusOK); err != nil { t.Error(err) } @@ -126,12 +154,73 @@ func TestMaxInFlight(t *testing.T) { // Show that we recover from being blocked up. // Too avoid flakyness we need to wait until at least one of the requests really finishes. responses.Wait() - if err := expectHTTP(server.URL, http.StatusOK); err != nil { + if err := expectHTTPGet(server.URL, http.StatusOK); err != nil { t.Error(err) } } -func expectHTTP(url string, code int) error { +func TestMaxInFlightMutating(t *testing.T) { + const AllowedMutatingInflightRequestsNo = 3 + + calls := &sync.WaitGroup{} + calls.Add(AllowedMutatingInflightRequestsNo) + + responses := &sync.WaitGroup{} + responses.Add(AllowedMutatingInflightRequestsNo) + + // Block is used to keep requests in flight for as long as we need to. All requests will + // be unblocked at the same time. + block := &sync.WaitGroup{} + block.Add(1) + + waitForCalls := true + waitForCallsMutex := sync.Mutex{} + + server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo) + defer server.Close() + + // These should hang and be accounted, i.e. saturate the server + for i := 0; i < AllowedMutatingInflightRequestsNo; i++ { + // These should hang waiting on block... + go func() { + if err := expectHTTPPost(server.URL+"/foo/bar", http.StatusOK); err != nil { + t.Error(err) + } + responses.Done() + }() + } + // We wait for all calls to be received by the server + calls.Wait() + // Disable calls notifications in the server + // Disable calls notifications in the server + waitForCallsMutex.Lock() + waitForCalls = false + waitForCallsMutex.Unlock() + + // Do this multiple times to show that rate limit rejected requests don't block. + for i := 0; i < 2; i++ { + if err := expectHTTPPost(server.URL+"/foo/bar/", errors.StatusTooManyRequests); err != nil { + t.Error(err) + } + } + // Validate that non-mutating URLs still work. use a path regex match + if err := expectHTTPGet(server.URL+"/dontwait", http.StatusOK); err != nil { + t.Error(err) + } + + // Let all hanging requests finish + block.Done() + + // Show that we recover from being blocked up. + // Too avoid flakyness we need to wait until at least one of the requests really finishes. + responses.Wait() + if err := expectHTTPPost(server.URL+"/foo/bar", http.StatusOK); err != nil { + t.Error(err) + } +} + +// We use GET as a sample non-mutating request. +func expectHTTPGet(url string, code int) error { r, err := http.Get(url) if err != nil { return fmt.Errorf("unexpected error: %v", err) @@ -141,3 +230,15 @@ func expectHTTP(url string, code int) error { } return nil } + +// We use POST as a sample mutating request. +func expectHTTPPost(url string, code int) error { + r, err := http.Post(url, "text/html", strings.NewReader("foo bar")) + if err != nil { + return fmt.Errorf("unexpected error: %v", err) + } + if r.StatusCode != code { + return fmt.Errorf("unexpected response: %v", r.StatusCode) + } + return nil +} diff --git a/pkg/genericapiserver/options/server_run_options.go b/pkg/genericapiserver/options/server_run_options.go index 51b96add8fc..f16bcc7b527 100644 --- a/pkg/genericapiserver/options/server_run_options.go +++ b/pkg/genericapiserver/options/server_run_options.go @@ -44,31 +44,32 @@ type ServerRunOptions struct { AdmissionControlConfigFile string AdvertiseAddress net.IP - CloudConfigFile string - CloudProvider string - CorsAllowedOriginList []string - DefaultStorageMediaType string - DeleteCollectionWorkers int - AuditLogPath string - AuditLogMaxAge int - AuditLogMaxBackups int - AuditLogMaxSize int - EnableGarbageCollection bool - EnableProfiling bool - EnableContentionProfiling bool - EnableSwaggerUI bool - EnableWatchCache bool - ExternalHost string - KubernetesServiceNodePort int - LongRunningRequestRE string - MasterCount int - MasterServiceNamespace string - MaxRequestsInFlight int - MinRequestTimeout int - RuntimeConfig config.ConfigurationMap - ServiceClusterIPRange net.IPNet // TODO: make this a list - ServiceNodePortRange utilnet.PortRange - StorageVersions string + CloudConfigFile string + CloudProvider string + CorsAllowedOriginList []string + DefaultStorageMediaType string + DeleteCollectionWorkers int + AuditLogPath string + AuditLogMaxAge int + AuditLogMaxBackups int + AuditLogMaxSize int + EnableGarbageCollection bool + EnableProfiling bool + EnableContentionProfiling bool + EnableSwaggerUI bool + EnableWatchCache bool + ExternalHost string + KubernetesServiceNodePort int + LongRunningRequestRE string + MasterCount int + MasterServiceNamespace string + MaxRequestsInFlight int + MaxMutatingRequestsInFlight int + MinRequestTimeout int + RuntimeConfig config.ConfigurationMap + ServiceClusterIPRange net.IPNet // TODO: make this a list + ServiceNodePortRange utilnet.PortRange + StorageVersions string // The default values for StorageVersions. StorageVersions overrides // these; you can change this if you want to change the defaults (e.g., // for testing). This is not actually exposed as a flag. @@ -80,22 +81,23 @@ type ServerRunOptions struct { func NewServerRunOptions() *ServerRunOptions { return &ServerRunOptions{ - AdmissionControl: "AlwaysAdmit", - DefaultStorageMediaType: "application/json", - DefaultStorageVersions: registered.AllPreferredGroupVersions(), - DeleteCollectionWorkers: 1, - EnableGarbageCollection: true, - EnableProfiling: true, - EnableContentionProfiling: false, - EnableWatchCache: true, - LongRunningRequestRE: DefaultLongRunningRequestRE, - MasterCount: 1, - MasterServiceNamespace: api.NamespaceDefault, - MaxRequestsInFlight: 400, - MinRequestTimeout: 1800, - RuntimeConfig: make(config.ConfigurationMap), - ServiceNodePortRange: DefaultServiceNodePortRange, - StorageVersions: registered.AllPreferredGroupVersions(), + AdmissionControl: "AlwaysAdmit", + DefaultStorageMediaType: "application/json", + DefaultStorageVersions: registered.AllPreferredGroupVersions(), + DeleteCollectionWorkers: 1, + EnableGarbageCollection: true, + EnableProfiling: true, + EnableContentionProfiling: false, + EnableWatchCache: true, + LongRunningRequestRE: DefaultLongRunningRequestRE, + MasterCount: 1, + MasterServiceNamespace: api.NamespaceDefault, + MaxRequestsInFlight: 400, + MaxMutatingRequestsInFlight: 200, + MinRequestTimeout: 1800, + RuntimeConfig: make(config.ConfigurationMap), + ServiceNodePortRange: DefaultServiceNodePortRange, + StorageVersions: registered.AllPreferredGroupVersions(), } } @@ -253,7 +255,11 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) { "DEPRECATED: the namespace from which the kubernetes master services should be injected into pods.") fs.IntVar(&s.MaxRequestsInFlight, "max-requests-inflight", s.MaxRequestsInFlight, ""+ - "The maximum number of requests in flight at a given time. When the server exceeds this, "+ + "The maximum number of non-mutating requests in flight at a given time. When the server exceeds this, "+ + "it rejects requests. Zero for no limit.") + + fs.IntVar(&s.MaxMutatingRequestsInFlight, "max-mutating-requests-inflight", s.MaxMutatingRequestsInFlight, ""+ + "The maximum number of mutating requests in flight at a given time. When the server exceeds this, "+ "it rejects requests. Zero for no limit.") fs.IntVar(&s.MinRequestTimeout, "min-request-timeout", s.MinRequestTimeout, ""+