mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 09:22:44 +00:00
Merge pull request #6207 from brendandburns/server
Add a limit to the number of in-flight requests that a server processes.
This commit is contained in:
commit
4a2000c4aa
@ -24,6 +24,7 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@ -77,6 +78,8 @@ type APIServer struct {
|
|||||||
KubeletConfig client.KubeletConfig
|
KubeletConfig client.KubeletConfig
|
||||||
ClusterName string
|
ClusterName string
|
||||||
EnableProfiling bool
|
EnableProfiling bool
|
||||||
|
MaxRequestsInFlight int
|
||||||
|
LongRunningRequestRE string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewAPIServer creates a new APIServer object with default parameters
|
// NewAPIServer creates a new APIServer object with default parameters
|
||||||
@ -157,6 +160,8 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
|
|||||||
fs.StringVar(&s.ClusterName, "cluster_name", s.ClusterName, "The instance prefix for the cluster")
|
fs.StringVar(&s.ClusterName, "cluster_name", s.ClusterName, "The instance prefix for the cluster")
|
||||||
fs.BoolVar(&s.EnableProfiling, "profiling", false, "Enable profiling via web interface host:port/debug/pprof/")
|
fs.BoolVar(&s.EnableProfiling, "profiling", false, "Enable profiling via web interface host:port/debug/pprof/")
|
||||||
fs.StringVar(&s.ExternalHost, "external_hostname", "", "The hostname to use when generating externalized URLs for this master (e.g. Swagger API Docs.)")
|
fs.StringVar(&s.ExternalHost, "external_hostname", "", "The hostname to use when generating externalized URLs for this master (e.g. Swagger API Docs.)")
|
||||||
|
fs.IntVar(&s.MaxRequestsInFlight, "max_requests_inflight", 20, "The maximum number of requests in flight at a given time. When the server exceeds this, it rejects requests. Zero for no limit.")
|
||||||
|
fs.StringVar(&s.LongRunningRequestRE, "long_running_request_regexp", "[.*\\/watch$][^\\/proxy.*]", "A regular expression matching long running requests which should be excluded from maximum inflight request handling.")
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Longer term we should read this from some config store, rather than a flag.
|
// TODO: Longer term we should read this from some config store, rather than a flag.
|
||||||
@ -300,12 +305,19 @@ func (s *APIServer) Run(_ []string) error {
|
|||||||
|
|
||||||
// See the flag commentary to understand our assumptions when opening the read-only and read-write ports.
|
// See the flag commentary to understand our assumptions when opening the read-only and read-write ports.
|
||||||
|
|
||||||
|
var sem chan bool
|
||||||
|
if s.MaxRequestsInFlight > 0 {
|
||||||
|
sem = make(chan bool, s.MaxRequestsInFlight)
|
||||||
|
}
|
||||||
|
|
||||||
|
longRunningRE := regexp.MustCompile(s.LongRunningRequestRE)
|
||||||
|
|
||||||
if roLocation != "" {
|
if roLocation != "" {
|
||||||
// Default settings allow 1 read-only request per second, allow up to 20 in a burst before enforcing.
|
// Default settings allow 1 read-only request per second, allow up to 20 in a burst before enforcing.
|
||||||
rl := util.NewTokenBucketRateLimiter(s.APIRate, s.APIBurst)
|
rl := util.NewTokenBucketRateLimiter(s.APIRate, s.APIBurst)
|
||||||
readOnlyServer := &http.Server{
|
readOnlyServer := &http.Server{
|
||||||
Addr: roLocation,
|
Addr: roLocation,
|
||||||
Handler: apiserver.RecoverPanics(apiserver.ReadOnly(apiserver.RateLimit(rl, m.InsecureHandler))),
|
Handler: apiserver.MaxInFlightLimit(sem, longRunningRE, apiserver.RecoverPanics(apiserver.ReadOnly(apiserver.RateLimit(rl, m.InsecureHandler)))),
|
||||||
ReadTimeout: 5 * time.Minute,
|
ReadTimeout: 5 * time.Minute,
|
||||||
WriteTimeout: 5 * time.Minute,
|
WriteTimeout: 5 * time.Minute,
|
||||||
MaxHeaderBytes: 1 << 20,
|
MaxHeaderBytes: 1 << 20,
|
||||||
@ -325,7 +337,7 @@ func (s *APIServer) Run(_ []string) error {
|
|||||||
if secureLocation != "" {
|
if secureLocation != "" {
|
||||||
secureServer := &http.Server{
|
secureServer := &http.Server{
|
||||||
Addr: secureLocation,
|
Addr: secureLocation,
|
||||||
Handler: apiserver.RecoverPanics(m.Handler),
|
Handler: apiserver.MaxInFlightLimit(sem, longRunningRE, apiserver.RecoverPanics(m.Handler)),
|
||||||
ReadTimeout: 5 * time.Minute,
|
ReadTimeout: 5 * time.Minute,
|
||||||
WriteTimeout: 5 * time.Minute,
|
WriteTimeout: 5 * time.Minute,
|
||||||
MaxHeaderBytes: 1 << 20,
|
MaxHeaderBytes: 1 << 20,
|
||||||
|
@ -43,6 +43,10 @@ var specialVerbs = map[string]bool{
|
|||||||
"watch": true,
|
"watch": true,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Constant for the retry-after interval on rate limiting.
|
||||||
|
// TODO: maybe make this dynamic? or user-adjustable?
|
||||||
|
const RetryAfter = "1"
|
||||||
|
|
||||||
// IsReadOnlyReq() is true for any (or at least many) request which has no observable
|
// IsReadOnlyReq() is true for any (or at least many) request which has no observable
|
||||||
// side effects on state of apiserver (though there may be internal side effects like
|
// side effects on state of apiserver (though there may be internal side effects like
|
||||||
// caching and logging).
|
// caching and logging).
|
||||||
@ -66,6 +70,27 @@ func ReadOnly(handler http.Handler) http.Handler {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MaxInFlight limits the number of in-flight requests to buffer size of the passed in channel.
|
||||||
|
func MaxInFlightLimit(c chan bool, longRunningRequestRE *regexp.Regexp, handler http.Handler) http.Handler {
|
||||||
|
if c == nil {
|
||||||
|
return handler
|
||||||
|
}
|
||||||
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if longRunningRequestRE.MatchString(r.URL.Path) {
|
||||||
|
// Skip tracking long running events.
|
||||||
|
handler.ServeHTTP(w, r)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case c <- true:
|
||||||
|
defer func() { <-c }()
|
||||||
|
handler.ServeHTTP(w, r)
|
||||||
|
default:
|
||||||
|
tooManyRequests(w)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// RateLimit uses rl to rate limit accepting requests to 'handler'.
|
// RateLimit uses rl to rate limit accepting requests to 'handler'.
|
||||||
func RateLimit(rl util.RateLimiter, handler http.Handler) http.Handler {
|
func RateLimit(rl util.RateLimiter, handler http.Handler) http.Handler {
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||||
@ -73,13 +98,17 @@ func RateLimit(rl util.RateLimiter, handler http.Handler) http.Handler {
|
|||||||
handler.ServeHTTP(w, req)
|
handler.ServeHTTP(w, req)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Return a 429 status indicating "Too Many Requests"
|
tooManyRequests(w)
|
||||||
w.Header().Set("Retry-After", "1")
|
|
||||||
w.WriteHeader(errors.StatusTooManyRequests)
|
|
||||||
fmt.Fprintf(w, "Rate limit is 10 QPS or a burst of 200")
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func tooManyRequests(w http.ResponseWriter) {
|
||||||
|
// Return a 429 status indicating "Too Many Requests"
|
||||||
|
w.Header().Set("Retry-After", RetryAfter)
|
||||||
|
w.WriteHeader(errors.StatusTooManyRequests)
|
||||||
|
fmt.Fprintf(w, "Too many requests, please try again later.")
|
||||||
|
}
|
||||||
|
|
||||||
// RecoverPanics wraps an http Handler to recover and log panics.
|
// RecoverPanics wraps an http Handler to recover and log panics.
|
||||||
func RecoverPanics(handler http.Handler) http.Handler {
|
func RecoverPanics(handler http.Handler) http.Handler {
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||||
|
@ -20,10 +20,14 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
)
|
)
|
||||||
@ -33,6 +37,68 @@ type fakeRL bool
|
|||||||
func (fakeRL) Stop() {}
|
func (fakeRL) Stop() {}
|
||||||
func (f fakeRL) CanAccept() bool { return bool(f) }
|
func (f fakeRL) CanAccept() bool { return bool(f) }
|
||||||
|
|
||||||
|
func expectHTTP(url string, code int, t *testing.T) {
|
||||||
|
r, err := http.Get(url)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if r.StatusCode != code {
|
||||||
|
t.Errorf("unexpected response: %v", r.StatusCode)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMaxInFlight(t *testing.T) {
|
||||||
|
const Iterations = 3
|
||||||
|
block := sync.WaitGroup{}
|
||||||
|
block.Add(1)
|
||||||
|
sem := make(chan bool, Iterations)
|
||||||
|
|
||||||
|
re := regexp.MustCompile("[.*\\/watch][^\\/proxy.*]")
|
||||||
|
|
||||||
|
server := httptest.NewServer(MaxInFlightLimit(sem, re, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if strings.Contains(r.URL.Path, "dontwait") {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
block.Wait()
|
||||||
|
})))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
// These should hang, but not affect accounting.
|
||||||
|
for i := 0; i < Iterations; i++ {
|
||||||
|
// These should hang waiting on block...
|
||||||
|
go func() {
|
||||||
|
expectHTTP(server.URL+"/foo/bar/watch", http.StatusOK, t)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
for i := 0; i < Iterations; i++ {
|
||||||
|
// These should hang waiting on block...
|
||||||
|
go func() {
|
||||||
|
expectHTTP(server.URL+"/proxy/foo/bar", http.StatusOK, t)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
expectHTTP(server.URL+"/dontwait", http.StatusOK, t)
|
||||||
|
|
||||||
|
for i := 0; i < Iterations; i++ {
|
||||||
|
// These should hang waiting on block...
|
||||||
|
go func() {
|
||||||
|
expectHTTP(server.URL, http.StatusOK, t)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
// There's really no more elegant way to do this. I could use a WaitGroup, but even then
|
||||||
|
// it'd still be racy.
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
expectHTTP(server.URL+"/dontwait/watch", http.StatusOK, t)
|
||||||
|
|
||||||
|
// Do this multiple times to show that it rate limit rejected requests don't block.
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
expectHTTP(server.URL, errors.StatusTooManyRequests, t)
|
||||||
|
}
|
||||||
|
block.Done()
|
||||||
|
|
||||||
|
// Show that we recover from being blocked up.
|
||||||
|
expectHTTP(server.URL, http.StatusOK, t)
|
||||||
|
}
|
||||||
|
|
||||||
func TestRateLimit(t *testing.T) {
|
func TestRateLimit(t *testing.T) {
|
||||||
for _, allow := range []bool{true, false} {
|
for _, allow := range []bool{true, false} {
|
||||||
rl := fakeRL(allow)
|
rl := fakeRL(allow)
|
||||||
|
Loading…
Reference in New Issue
Block a user