mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-29 21:29:24 +00:00
Add a limit to the number of in-flight requests that a server processes.
This commit is contained in:
@@ -43,6 +43,10 @@ var specialVerbs = map[string]bool{
|
||||
"watch": true,
|
||||
}
|
||||
|
||||
// Constant for the retry-after interval on rate limiting.
|
||||
// TODO: maybe make this dynamic? or user-adjustable?
|
||||
const RetryAfter = "1"
|
||||
|
||||
// IsReadOnlyReq() is true for any (or at least many) request which has no observable
|
||||
// side effects on state of apiserver (though there may be internal side effects like
|
||||
// caching and logging).
|
||||
@@ -66,6 +70,27 @@ func ReadOnly(handler http.Handler) http.Handler {
|
||||
})
|
||||
}
|
||||
|
||||
// MaxInFlight limits the number of in-flight requests to buffer size of the passed in channel.
|
||||
func MaxInFlightLimit(c chan bool, longRunningRequestRE *regexp.Regexp, handler http.Handler) http.Handler {
|
||||
if c == nil {
|
||||
return handler
|
||||
}
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if longRunningRequestRE.MatchString(r.URL.Path) {
|
||||
// Skip tracking long running events.
|
||||
handler.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
select {
|
||||
case c <- true:
|
||||
defer func() { <-c }()
|
||||
handler.ServeHTTP(w, r)
|
||||
default:
|
||||
tooManyRequests(w)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// RateLimit uses rl to rate limit accepting requests to 'handler'.
|
||||
func RateLimit(rl util.RateLimiter, handler http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
@@ -73,13 +98,17 @@ func RateLimit(rl util.RateLimiter, handler http.Handler) http.Handler {
|
||||
handler.ServeHTTP(w, req)
|
||||
return
|
||||
}
|
||||
// Return a 429 status indicating "Too Many Requests"
|
||||
w.Header().Set("Retry-After", "1")
|
||||
w.WriteHeader(errors.StatusTooManyRequests)
|
||||
fmt.Fprintf(w, "Rate limit is 10 QPS or a burst of 200")
|
||||
tooManyRequests(w)
|
||||
})
|
||||
}
|
||||
|
||||
func tooManyRequests(w http.ResponseWriter) {
|
||||
// Return a 429 status indicating "Too Many Requests"
|
||||
w.Header().Set("Retry-After", RetryAfter)
|
||||
w.WriteHeader(errors.StatusTooManyRequests)
|
||||
fmt.Fprintf(w, "Too many requests, please try again later.")
|
||||
}
|
||||
|
||||
// RecoverPanics wraps an http Handler to recover and log panics.
|
||||
func RecoverPanics(handler http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
|
||||
@@ -20,10 +20,14 @@ import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
)
|
||||
@@ -33,6 +37,68 @@ type fakeRL bool
|
||||
func (fakeRL) Stop() {}
|
||||
func (f fakeRL) CanAccept() bool { return bool(f) }
|
||||
|
||||
func expectHTTP(url string, code int, t *testing.T) {
|
||||
r, err := http.Get(url)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if r.StatusCode != code {
|
||||
t.Errorf("unexpected response: %v", r.StatusCode)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMaxInFlight(t *testing.T) {
|
||||
const Iterations = 3
|
||||
block := sync.WaitGroup{}
|
||||
block.Add(1)
|
||||
sem := make(chan bool, Iterations)
|
||||
|
||||
re := regexp.MustCompile("[.*\\/watch][^\\/proxy.*]")
|
||||
|
||||
server := httptest.NewServer(MaxInFlightLimit(sem, re, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if strings.Contains(r.URL.Path, "dontwait") {
|
||||
return
|
||||
}
|
||||
block.Wait()
|
||||
})))
|
||||
defer server.Close()
|
||||
|
||||
// These should hang, but not affect accounting.
|
||||
for i := 0; i < Iterations; i++ {
|
||||
// These should hang waiting on block...
|
||||
go func() {
|
||||
expectHTTP(server.URL+"/foo/bar/watch", http.StatusOK, t)
|
||||
}()
|
||||
}
|
||||
for i := 0; i < Iterations; i++ {
|
||||
// These should hang waiting on block...
|
||||
go func() {
|
||||
expectHTTP(server.URL+"/proxy/foo/bar", http.StatusOK, t)
|
||||
}()
|
||||
}
|
||||
expectHTTP(server.URL+"/dontwait", http.StatusOK, t)
|
||||
|
||||
for i := 0; i < Iterations; i++ {
|
||||
// These should hang waiting on block...
|
||||
go func() {
|
||||
expectHTTP(server.URL, http.StatusOK, t)
|
||||
}()
|
||||
}
|
||||
// There's really no more elegant way to do this. I could use a WaitGroup, but even then
|
||||
// it'd still be racy.
|
||||
time.Sleep(1 * time.Second)
|
||||
expectHTTP(server.URL+"/dontwait/watch", http.StatusOK, t)
|
||||
|
||||
// Do this multiple times to show that it rate limit rejected requests don't block.
|
||||
for i := 0; i < 2; i++ {
|
||||
expectHTTP(server.URL, errors.StatusTooManyRequests, t)
|
||||
}
|
||||
block.Done()
|
||||
|
||||
// Show that we recover from being blocked up.
|
||||
expectHTTP(server.URL, http.StatusOK, t)
|
||||
}
|
||||
|
||||
func TestRateLimit(t *testing.T) {
|
||||
for _, allow := range []bool{true, false} {
|
||||
rl := fakeRL(allow)
|
||||
|
||||
Reference in New Issue
Block a user