Split inflight requests into read-only and mutating groups

This commit is contained in:
gmarek 2016-11-02 14:35:16 +01:00
parent 5658addb9b
commit 4762acbd1e
6 changed files with 263 additions and 93 deletions

View File

@ -363,6 +363,7 @@ max-connection-bytes-per-sec
max-log-age max-log-age
max-log-backups max-log-backups
max-log-size max-log-size
max-mutating-requests-inflight
max-open-files max-open-files
max-outgoing-burst max-outgoing-burst
max-outgoing-qps max-outgoing-qps

View File

@ -141,8 +141,11 @@ type Config struct {
OpenAPIConfig *common.Config OpenAPIConfig *common.Config
// MaxRequestsInFlight is the maximum number of parallel non-long-running requests. Every further // MaxRequestsInFlight is the maximum number of parallel non-long-running requests. Every further
// request has to wait. // request has to wait. Applies only to non-mutating requests.
MaxRequestsInFlight int MaxRequestsInFlight int
// MaxMutatingRequestsInFlight is the maximum number of parallel mutating requests. Every further
// request has to wait.
MaxMutatingRequestsInFlight int
// Predicate which is true for paths of long-running http requests // Predicate which is true for paths of long-running http requests
LongRunningFunc genericfilters.LongRunningRequestCheck LongRunningFunc genericfilters.LongRunningRequestCheck
@ -320,6 +323,7 @@ func (c *Config) ApplyOptions(options *options.ServerRunOptions) *Config {
c.EnableSwaggerUI = options.EnableSwaggerUI c.EnableSwaggerUI = options.EnableSwaggerUI
c.ExternalAddress = options.ExternalHost c.ExternalAddress = options.ExternalHost
c.MaxRequestsInFlight = options.MaxRequestsInFlight c.MaxRequestsInFlight = options.MaxRequestsInFlight
c.MaxMutatingRequestsInFlight = options.MaxMutatingRequestsInFlight
c.MinRequestTimeout = options.MinRequestTimeout c.MinRequestTimeout = options.MinRequestTimeout
c.PublicAddress = options.AdvertiseAddress c.PublicAddress = options.AdvertiseAddress
@ -483,10 +487,10 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) (secure, insec
generic := func(handler http.Handler) http.Handler { generic := func(handler http.Handler) http.Handler {
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true") handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
handler = genericfilters.WithPanicRecovery(handler, c.RequestContextMapper) handler = genericfilters.WithPanicRecovery(handler, c.RequestContextMapper)
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc)
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.RequestContextMapper, c.LongRunningFunc)
handler = apiserverfilters.WithRequestInfo(handler, NewRequestInfoResolver(c), c.RequestContextMapper) handler = apiserverfilters.WithRequestInfo(handler, NewRequestInfoResolver(c), c.RequestContextMapper)
handler = api.WithRequestContext(handler, c.RequestContextMapper) handler = api.WithRequestContext(handler, c.RequestContextMapper)
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc)
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.LongRunningFunc)
return handler return handler
} }
audit := func(handler http.Handler) http.Handler { audit := func(handler http.Handler) http.Handler {

View File

@ -28,6 +28,7 @@ go_library(
"//pkg/httplog:go_default_library", "//pkg/httplog:go_default_library",
"//pkg/util:go_default_library", "//pkg/util:go_default_library",
"//pkg/util/runtime:go_default_library", "//pkg/util/runtime:go_default_library",
"//pkg/util/sets:go_default_library",
"//vendor:github.com/golang/glog", "//vendor:github.com/golang/glog",
], ],
) )
@ -41,5 +42,10 @@ go_test(
], ],
library = "go_default_library", library = "go_default_library",
tags = ["automanaged"], tags = ["automanaged"],
deps = ["//pkg/api/errors:go_default_library"], deps = [
"//pkg/api:go_default_library",
"//pkg/api/errors:go_default_library",
"//pkg/apiserver/filters:go_default_library",
"//pkg/apiserver/request:go_default_library",
],
) )

View File

@ -17,34 +17,86 @@ limitations under the License.
package filters package filters
import ( import (
"fmt"
"net/http" "net/http"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/apiserver/request"
"k8s.io/kubernetes/pkg/httplog" "k8s.io/kubernetes/pkg/httplog"
"k8s.io/kubernetes/pkg/util/sets"
"github.com/golang/glog"
) )
// Constant for the retry-after interval on rate limiting. // Constant for the retry-after interval on rate limiting.
// TODO: maybe make this dynamic? or user-adjustable? // TODO: maybe make this dynamic? or user-adjustable?
const retryAfter = "1" const retryAfter = "1"
var nonMutatingRequestVerbs = sets.NewString("get", "list", "watch")
func handleError(w http.ResponseWriter, r *http.Request, err error) {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Internal Server Error: %#v", r.RequestURI)
glog.Errorf(err.Error())
}
// WithMaxInFlightLimit limits the number of in-flight requests to buffer size of the passed in channel. // WithMaxInFlightLimit limits the number of in-flight requests to buffer size of the passed in channel.
func WithMaxInFlightLimit(handler http.Handler, limit int, longRunningRequestCheck LongRunningRequestCheck) http.Handler { func WithMaxInFlightLimit(
if limit == 0 { handler http.Handler,
nonMutatingLimit int,
mutatingLimit int,
requestContextMapper api.RequestContextMapper,
longRunningRequestCheck LongRunningRequestCheck,
) http.Handler {
if nonMutatingLimit == 0 && mutatingLimit == 0 {
return handler return handler
} }
c := make(chan bool, limit) var nonMutatingChan chan bool
var mutatingChan chan bool
if nonMutatingLimit != 0 {
nonMutatingChan = make(chan bool, nonMutatingLimit)
}
if mutatingLimit != 0 {
mutatingChan = make(chan bool, mutatingLimit)
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// TODO: migrate to use requestInfo instead of having custom request parser.
if longRunningRequestCheck(r) { if longRunningRequestCheck(r) {
// Skip tracking long running events. // Skip tracking long running events.
handler.ServeHTTP(w, r) handler.ServeHTTP(w, r)
return return
} }
select {
case c <- true: ctx, ok := requestContextMapper.Get(r)
defer func() { <-c }() if !ok {
handleError(w, r, fmt.Errorf("no context found for request, handler chain must be wrong"))
return
}
requestInfo, ok := request.RequestInfoFrom(ctx)
if !ok {
handleError(w, r, fmt.Errorf("no RequestInfo found in context, handler chain must be wrong"))
return
}
var c chan bool
if !nonMutatingRequestVerbs.Has(requestInfo.Verb) {
c = mutatingChan
} else {
c = nonMutatingChan
}
if c == nil {
handler.ServeHTTP(w, r) handler.ServeHTTP(w, r)
default: } else {
tooManyRequests(r, w) select {
case c <- true:
defer func() { <-c }()
handler.ServeHTTP(w, r)
default:
tooManyRequests(r, w)
}
} }
}) })
} }

View File

@ -25,9 +25,46 @@ import (
"sync" "sync"
"testing" "testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/errors"
apiserverfilters "k8s.io/kubernetes/pkg/apiserver/filters"
"k8s.io/kubernetes/pkg/apiserver/request"
) )
func createMaxInflightServer(callsWg, blockWg *sync.WaitGroup, disableCallsWg *bool, disableCallsWgMutex *sync.Mutex, nonMutating, mutating int) *httptest.Server {
// notAccountedPathsRegexp specifies paths requests to which we don't account into
// requests in flight.
notAccountedPathsRegexp := regexp.MustCompile(".*\\/watch")
longRunningRequestCheck := BasicLongRunningRequestCheck(notAccountedPathsRegexp, map[string]string{"watch": "true"})
requestContextMapper := api.NewRequestContextMapper()
requestInfoFactory := &request.RequestInfoFactory{}
handler := WithMaxInFlightLimit(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// A short, accounted request that does not wait for block WaitGroup.
if strings.Contains(r.URL.Path, "dontwait") {
return
}
disableCallsWgMutex.Lock()
waitForCalls := *disableCallsWg
disableCallsWgMutex.Unlock()
if waitForCalls {
callsWg.Done()
}
blockWg.Wait()
}),
nonMutating,
mutating,
requestContextMapper,
longRunningRequestCheck,
)
handler = apiserverfilters.WithRequestInfo(handler, requestInfoFactory, requestContextMapper)
handler = api.WithRequestContext(handler, requestContextMapper)
return httptest.NewServer(handler)
}
// Tests that MaxInFlightLimit works, i.e. // Tests that MaxInFlightLimit works, i.e.
// - "long" requests such as proxy or watch, identified by regexp are not accounted despite // - "long" requests such as proxy or watch, identified by regexp are not accounted despite
// hanging for the long time, // hanging for the long time,
@ -36,69 +73,53 @@ import (
// - subsequent "short" requests are rejected instantly with appropriate error, // - subsequent "short" requests are rejected instantly with appropriate error,
// - subsequent "long" requests are handled normally, // - subsequent "long" requests are handled normally,
// - we correctly recover after some "short" requests finish, i.e. we can process new ones. // - we correctly recover after some "short" requests finish, i.e. we can process new ones.
func TestMaxInFlight(t *testing.T) { func TestMaxInFlightNonMutating(t *testing.T) {
const AllowedInflightRequestsNo = 3 const AllowedNonMutatingInflightRequestsNo = 3
// notAccountedPathsRegexp specifies paths requests to which we don't account into
// requests in flight.
notAccountedPathsRegexp := regexp.MustCompile(".*\\/watch")
longRunningRequestCheck := BasicLongRunningRequestCheck(notAccountedPathsRegexp, map[string]string{"watch": "true"})
// Calls is used to wait until all server calls are received. We are sending // Calls is used to wait until all server calls are received. We are sending
// AllowedInflightRequestsNo of 'long' not-accounted requests and the same number of // AllowedNonMutatingInflightRequestsNo of 'long' not-accounted requests and the same number of
// 'short' accounted ones. // 'short' accounted ones.
calls := &sync.WaitGroup{} calls := &sync.WaitGroup{}
calls.Add(AllowedInflightRequestsNo * 2) calls.Add(AllowedNonMutatingInflightRequestsNo * 2)
// Responses is used to wait until all responses are // Responses is used to wait until all responses are
// received. This prevents some async requests getting EOF // received. This prevents some async requests getting EOF
// errors from prematurely closing the server // errors from prematurely closing the server
responses := sync.WaitGroup{} responses := &sync.WaitGroup{}
responses.Add(AllowedInflightRequestsNo * 2) responses.Add(AllowedNonMutatingInflightRequestsNo * 2)
// Block is used to keep requests in flight for as long as we need to. All requests will // Block is used to keep requests in flight for as long as we need to. All requests will
// be unblocked at the same time. // be unblocked at the same time.
block := sync.WaitGroup{} block := &sync.WaitGroup{}
block.Add(1) block.Add(1)
server := httptest.NewServer( waitForCalls := true
WithMaxInFlightLimit( waitForCallsMutex := sync.Mutex{}
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// A short, accounted request that does not wait for block WaitGroup. server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, AllowedNonMutatingInflightRequestsNo, 1)
if strings.Contains(r.URL.Path, "dontwait") {
return
}
if calls != nil {
calls.Done()
}
block.Wait()
}),
AllowedInflightRequestsNo,
longRunningRequestCheck,
),
)
defer server.Close() defer server.Close()
// These should hang, but not affect accounting. use a query param match // These should hang, but not affect accounting. use a query param match
for i := 0; i < AllowedInflightRequestsNo; i++ { for i := 0; i < AllowedNonMutatingInflightRequestsNo; i++ {
// These should hang waiting on block... // These should hang waiting on block...
go func() { go func() {
if err := expectHTTP(server.URL+"/foo/bar?watch=true", http.StatusOK); err != nil { if err := expectHTTPGet(server.URL+"/foo/bar?watch=true", http.StatusOK); err != nil {
t.Error(err) t.Error(err)
} }
responses.Done() responses.Done()
}() }()
} }
// Check that sever is not saturated by not-accounted calls // Check that sever is not saturated by not-accounted calls
if err := expectHTTP(server.URL+"/dontwait", http.StatusOK); err != nil { if err := expectHTTPGet(server.URL+"/dontwait", http.StatusOK); err != nil {
t.Error(err) t.Error(err)
} }
// These should hang and be accounted, i.e. saturate the server // These should hang and be accounted, i.e. saturate the server
for i := 0; i < AllowedInflightRequestsNo; i++ { for i := 0; i < AllowedNonMutatingInflightRequestsNo; i++ {
// These should hang waiting on block... // These should hang waiting on block...
go func() { go func() {
if err := expectHTTP(server.URL, http.StatusOK); err != nil { if err := expectHTTPGet(server.URL, http.StatusOK); err != nil {
t.Error(err) t.Error(err)
} }
responses.Done() responses.Done()
@ -107,16 +128,23 @@ func TestMaxInFlight(t *testing.T) {
// We wait for all calls to be received by the server // We wait for all calls to be received by the server
calls.Wait() calls.Wait()
// Disable calls notifications in the server // Disable calls notifications in the server
calls = nil waitForCallsMutex.Lock()
waitForCalls = false
waitForCallsMutex.Unlock()
// Do this multiple times to show that it rate limit rejected requests don't block. // Do this multiple times to show that rate limit rejected requests don't block.
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
if err := expectHTTP(server.URL, errors.StatusTooManyRequests); err != nil { if err := expectHTTPGet(server.URL, errors.StatusTooManyRequests); err != nil {
t.Error(err) t.Error(err)
} }
} }
// Validate that non-accounted URLs still work. use a path regex match // Validate that non-accounted URLs still work. use a path regex match
if err := expectHTTP(server.URL+"/dontwait/watch", http.StatusOK); err != nil { if err := expectHTTPGet(server.URL+"/dontwait/watch", http.StatusOK); err != nil {
t.Error(err)
}
// We should allow a single mutating request.
if err := expectHTTPPost(server.URL+"/dontwait", http.StatusOK); err != nil {
t.Error(err) t.Error(err)
} }
@ -126,12 +154,73 @@ func TestMaxInFlight(t *testing.T) {
// Show that we recover from being blocked up. // Show that we recover from being blocked up.
// Too avoid flakyness we need to wait until at least one of the requests really finishes. // Too avoid flakyness we need to wait until at least one of the requests really finishes.
responses.Wait() responses.Wait()
if err := expectHTTP(server.URL, http.StatusOK); err != nil { if err := expectHTTPGet(server.URL, http.StatusOK); err != nil {
t.Error(err) t.Error(err)
} }
} }
func expectHTTP(url string, code int) error { func TestMaxInFlightMutating(t *testing.T) {
const AllowedMutatingInflightRequestsNo = 3
calls := &sync.WaitGroup{}
calls.Add(AllowedMutatingInflightRequestsNo)
responses := &sync.WaitGroup{}
responses.Add(AllowedMutatingInflightRequestsNo)
// Block is used to keep requests in flight for as long as we need to. All requests will
// be unblocked at the same time.
block := &sync.WaitGroup{}
block.Add(1)
waitForCalls := true
waitForCallsMutex := sync.Mutex{}
server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo)
defer server.Close()
// These should hang and be accounted, i.e. saturate the server
for i := 0; i < AllowedMutatingInflightRequestsNo; i++ {
// These should hang waiting on block...
go func() {
if err := expectHTTPPost(server.URL+"/foo/bar", http.StatusOK); err != nil {
t.Error(err)
}
responses.Done()
}()
}
// We wait for all calls to be received by the server
calls.Wait()
// Disable calls notifications in the server
// Disable calls notifications in the server
waitForCallsMutex.Lock()
waitForCalls = false
waitForCallsMutex.Unlock()
// Do this multiple times to show that rate limit rejected requests don't block.
for i := 0; i < 2; i++ {
if err := expectHTTPPost(server.URL+"/foo/bar/", errors.StatusTooManyRequests); err != nil {
t.Error(err)
}
}
// Validate that non-mutating URLs still work. use a path regex match
if err := expectHTTPGet(server.URL+"/dontwait", http.StatusOK); err != nil {
t.Error(err)
}
// Let all hanging requests finish
block.Done()
// Show that we recover from being blocked up.
// Too avoid flakyness we need to wait until at least one of the requests really finishes.
responses.Wait()
if err := expectHTTPPost(server.URL+"/foo/bar", http.StatusOK); err != nil {
t.Error(err)
}
}
// We use GET as a sample non-mutating request.
func expectHTTPGet(url string, code int) error {
r, err := http.Get(url) r, err := http.Get(url)
if err != nil { if err != nil {
return fmt.Errorf("unexpected error: %v", err) return fmt.Errorf("unexpected error: %v", err)
@ -141,3 +230,15 @@ func expectHTTP(url string, code int) error {
} }
return nil return nil
} }
// We use POST as a sample mutating request.
func expectHTTPPost(url string, code int) error {
r, err := http.Post(url, "text/html", strings.NewReader("foo bar"))
if err != nil {
return fmt.Errorf("unexpected error: %v", err)
}
if r.StatusCode != code {
return fmt.Errorf("unexpected response: %v", r.StatusCode)
}
return nil
}

View File

@ -44,31 +44,32 @@ type ServerRunOptions struct {
AdmissionControlConfigFile string AdmissionControlConfigFile string
AdvertiseAddress net.IP AdvertiseAddress net.IP
CloudConfigFile string CloudConfigFile string
CloudProvider string CloudProvider string
CorsAllowedOriginList []string CorsAllowedOriginList []string
DefaultStorageMediaType string DefaultStorageMediaType string
DeleteCollectionWorkers int DeleteCollectionWorkers int
AuditLogPath string AuditLogPath string
AuditLogMaxAge int AuditLogMaxAge int
AuditLogMaxBackups int AuditLogMaxBackups int
AuditLogMaxSize int AuditLogMaxSize int
EnableGarbageCollection bool EnableGarbageCollection bool
EnableProfiling bool EnableProfiling bool
EnableContentionProfiling bool EnableContentionProfiling bool
EnableSwaggerUI bool EnableSwaggerUI bool
EnableWatchCache bool EnableWatchCache bool
ExternalHost string ExternalHost string
KubernetesServiceNodePort int KubernetesServiceNodePort int
LongRunningRequestRE string LongRunningRequestRE string
MasterCount int MasterCount int
MasterServiceNamespace string MasterServiceNamespace string
MaxRequestsInFlight int MaxRequestsInFlight int
MinRequestTimeout int MaxMutatingRequestsInFlight int
RuntimeConfig config.ConfigurationMap MinRequestTimeout int
ServiceClusterIPRange net.IPNet // TODO: make this a list RuntimeConfig config.ConfigurationMap
ServiceNodePortRange utilnet.PortRange ServiceClusterIPRange net.IPNet // TODO: make this a list
StorageVersions string ServiceNodePortRange utilnet.PortRange
StorageVersions string
// The default values for StorageVersions. StorageVersions overrides // The default values for StorageVersions. StorageVersions overrides
// these; you can change this if you want to change the defaults (e.g., // these; you can change this if you want to change the defaults (e.g.,
// for testing). This is not actually exposed as a flag. // for testing). This is not actually exposed as a flag.
@ -80,22 +81,23 @@ type ServerRunOptions struct {
func NewServerRunOptions() *ServerRunOptions { func NewServerRunOptions() *ServerRunOptions {
return &ServerRunOptions{ return &ServerRunOptions{
AdmissionControl: "AlwaysAdmit", AdmissionControl: "AlwaysAdmit",
DefaultStorageMediaType: "application/json", DefaultStorageMediaType: "application/json",
DefaultStorageVersions: registered.AllPreferredGroupVersions(), DefaultStorageVersions: registered.AllPreferredGroupVersions(),
DeleteCollectionWorkers: 1, DeleteCollectionWorkers: 1,
EnableGarbageCollection: true, EnableGarbageCollection: true,
EnableProfiling: true, EnableProfiling: true,
EnableContentionProfiling: false, EnableContentionProfiling: false,
EnableWatchCache: true, EnableWatchCache: true,
LongRunningRequestRE: DefaultLongRunningRequestRE, LongRunningRequestRE: DefaultLongRunningRequestRE,
MasterCount: 1, MasterCount: 1,
MasterServiceNamespace: api.NamespaceDefault, MasterServiceNamespace: api.NamespaceDefault,
MaxRequestsInFlight: 400, MaxRequestsInFlight: 400,
MinRequestTimeout: 1800, MaxMutatingRequestsInFlight: 200,
RuntimeConfig: make(config.ConfigurationMap), MinRequestTimeout: 1800,
ServiceNodePortRange: DefaultServiceNodePortRange, RuntimeConfig: make(config.ConfigurationMap),
StorageVersions: registered.AllPreferredGroupVersions(), ServiceNodePortRange: DefaultServiceNodePortRange,
StorageVersions: registered.AllPreferredGroupVersions(),
} }
} }
@ -253,7 +255,11 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) {
"DEPRECATED: the namespace from which the kubernetes master services should be injected into pods.") "DEPRECATED: the namespace from which the kubernetes master services should be injected into pods.")
fs.IntVar(&s.MaxRequestsInFlight, "max-requests-inflight", s.MaxRequestsInFlight, ""+ fs.IntVar(&s.MaxRequestsInFlight, "max-requests-inflight", s.MaxRequestsInFlight, ""+
"The maximum number of requests in flight at a given time. When the server exceeds this, "+ "The maximum number of non-mutating requests in flight at a given time. When the server exceeds this, "+
"it rejects requests. Zero for no limit.")
fs.IntVar(&s.MaxMutatingRequestsInFlight, "max-mutating-requests-inflight", s.MaxMutatingRequestsInFlight, ""+
"The maximum number of mutating requests in flight at a given time. When the server exceeds this, "+
"it rejects requests. Zero for no limit.") "it rejects requests. Zero for no limit.")
fs.IntVar(&s.MinRequestTimeout, "min-request-timeout", s.MinRequestTimeout, ""+ fs.IntVar(&s.MinRequestTimeout, "min-request-timeout", s.MinRequestTimeout, ""+