mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 06:27:05 +00:00
Merge pull request #36064 from gmarek/inflight
Automatic merge from submit-queue Split inflight requests into read-only and mutating groups cc @smarterclayton @lavalamp @caesarxuchao ```release-note API server have two separate limits for read-only and mutating inflight requests. ```
This commit is contained in:
commit
3a5fd6b6c1
@ -374,6 +374,7 @@ max-connection-bytes-per-sec
|
|||||||
max-log-age
|
max-log-age
|
||||||
max-log-backups
|
max-log-backups
|
||||||
max-log-size
|
max-log-size
|
||||||
|
max-mutating-requests-inflight
|
||||||
max-open-files
|
max-open-files
|
||||||
max-outgoing-burst
|
max-outgoing-burst
|
||||||
max-outgoing-qps
|
max-outgoing-qps
|
||||||
|
@ -141,8 +141,11 @@ type Config struct {
|
|||||||
OpenAPIConfig *common.Config
|
OpenAPIConfig *common.Config
|
||||||
|
|
||||||
// MaxRequestsInFlight is the maximum number of parallel non-long-running requests. Every further
|
// MaxRequestsInFlight is the maximum number of parallel non-long-running requests. Every further
|
||||||
// request has to wait.
|
// request has to wait. Applies only to non-mutating requests.
|
||||||
MaxRequestsInFlight int
|
MaxRequestsInFlight int
|
||||||
|
// MaxMutatingRequestsInFlight is the maximum number of parallel mutating requests. Every further
|
||||||
|
// request has to wait.
|
||||||
|
MaxMutatingRequestsInFlight int
|
||||||
|
|
||||||
// Predicate which is true for paths of long-running http requests
|
// Predicate which is true for paths of long-running http requests
|
||||||
LongRunningFunc genericfilters.LongRunningRequestCheck
|
LongRunningFunc genericfilters.LongRunningRequestCheck
|
||||||
@ -320,6 +323,7 @@ func (c *Config) ApplyOptions(options *options.ServerRunOptions) *Config {
|
|||||||
c.EnableSwaggerUI = options.EnableSwaggerUI
|
c.EnableSwaggerUI = options.EnableSwaggerUI
|
||||||
c.ExternalAddress = options.ExternalHost
|
c.ExternalAddress = options.ExternalHost
|
||||||
c.MaxRequestsInFlight = options.MaxRequestsInFlight
|
c.MaxRequestsInFlight = options.MaxRequestsInFlight
|
||||||
|
c.MaxMutatingRequestsInFlight = options.MaxMutatingRequestsInFlight
|
||||||
c.MinRequestTimeout = options.MinRequestTimeout
|
c.MinRequestTimeout = options.MinRequestTimeout
|
||||||
c.PublicAddress = options.AdvertiseAddress
|
c.PublicAddress = options.AdvertiseAddress
|
||||||
|
|
||||||
@ -488,10 +492,10 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) (secure, insec
|
|||||||
generic := func(handler http.Handler) http.Handler {
|
generic := func(handler http.Handler) http.Handler {
|
||||||
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
|
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
|
||||||
handler = genericfilters.WithPanicRecovery(handler, c.RequestContextMapper)
|
handler = genericfilters.WithPanicRecovery(handler, c.RequestContextMapper)
|
||||||
|
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc)
|
||||||
|
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.RequestContextMapper, c.LongRunningFunc)
|
||||||
handler = apiserverfilters.WithRequestInfo(handler, NewRequestInfoResolver(c), c.RequestContextMapper)
|
handler = apiserverfilters.WithRequestInfo(handler, NewRequestInfoResolver(c), c.RequestContextMapper)
|
||||||
handler = api.WithRequestContext(handler, c.RequestContextMapper)
|
handler = api.WithRequestContext(handler, c.RequestContextMapper)
|
||||||
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc)
|
|
||||||
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.LongRunningFunc)
|
|
||||||
return handler
|
return handler
|
||||||
}
|
}
|
||||||
audit := func(handler http.Handler) http.Handler {
|
audit := func(handler http.Handler) http.Handler {
|
||||||
|
@ -28,6 +28,7 @@ go_library(
|
|||||||
"//pkg/httplog:go_default_library",
|
"//pkg/httplog:go_default_library",
|
||||||
"//pkg/util:go_default_library",
|
"//pkg/util:go_default_library",
|
||||||
"//pkg/util/runtime:go_default_library",
|
"//pkg/util/runtime:go_default_library",
|
||||||
|
"//pkg/util/sets:go_default_library",
|
||||||
"//vendor:github.com/golang/glog",
|
"//vendor:github.com/golang/glog",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@ -41,5 +42,10 @@ go_test(
|
|||||||
],
|
],
|
||||||
library = "go_default_library",
|
library = "go_default_library",
|
||||||
tags = ["automanaged"],
|
tags = ["automanaged"],
|
||||||
deps = ["//pkg/api/errors:go_default_library"],
|
deps = [
|
||||||
|
"//pkg/api:go_default_library",
|
||||||
|
"//pkg/api/errors:go_default_library",
|
||||||
|
"//pkg/apiserver/filters:go_default_library",
|
||||||
|
"//pkg/apiserver/request:go_default_library",
|
||||||
|
],
|
||||||
)
|
)
|
||||||
|
@ -17,34 +17,86 @@ limitations under the License.
|
|||||||
package filters
|
package filters
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/errors"
|
"k8s.io/kubernetes/pkg/api/errors"
|
||||||
|
"k8s.io/kubernetes/pkg/apiserver/request"
|
||||||
"k8s.io/kubernetes/pkg/httplog"
|
"k8s.io/kubernetes/pkg/httplog"
|
||||||
|
"k8s.io/kubernetes/pkg/util/sets"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Constant for the retry-after interval on rate limiting.
|
// Constant for the retry-after interval on rate limiting.
|
||||||
// TODO: maybe make this dynamic? or user-adjustable?
|
// TODO: maybe make this dynamic? or user-adjustable?
|
||||||
const retryAfter = "1"
|
const retryAfter = "1"
|
||||||
|
|
||||||
|
var nonMutatingRequestVerbs = sets.NewString("get", "list", "watch")
|
||||||
|
|
||||||
|
func handleError(w http.ResponseWriter, r *http.Request, err error) {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
fmt.Fprintf(w, "Internal Server Error: %#v", r.RequestURI)
|
||||||
|
glog.Errorf(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
// WithMaxInFlightLimit limits the number of in-flight requests to buffer size of the passed in channel.
|
// WithMaxInFlightLimit limits the number of in-flight requests to buffer size of the passed in channel.
|
||||||
func WithMaxInFlightLimit(handler http.Handler, limit int, longRunningRequestCheck LongRunningRequestCheck) http.Handler {
|
func WithMaxInFlightLimit(
|
||||||
if limit == 0 {
|
handler http.Handler,
|
||||||
|
nonMutatingLimit int,
|
||||||
|
mutatingLimit int,
|
||||||
|
requestContextMapper api.RequestContextMapper,
|
||||||
|
longRunningRequestCheck LongRunningRequestCheck,
|
||||||
|
) http.Handler {
|
||||||
|
if nonMutatingLimit == 0 && mutatingLimit == 0 {
|
||||||
return handler
|
return handler
|
||||||
}
|
}
|
||||||
c := make(chan bool, limit)
|
var nonMutatingChan chan bool
|
||||||
|
var mutatingChan chan bool
|
||||||
|
if nonMutatingLimit != 0 {
|
||||||
|
nonMutatingChan = make(chan bool, nonMutatingLimit)
|
||||||
|
}
|
||||||
|
if mutatingLimit != 0 {
|
||||||
|
mutatingChan = make(chan bool, mutatingLimit)
|
||||||
|
}
|
||||||
|
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
// TODO: migrate to use requestInfo instead of having custom request parser.
|
||||||
if longRunningRequestCheck(r) {
|
if longRunningRequestCheck(r) {
|
||||||
// Skip tracking long running events.
|
// Skip tracking long running events.
|
||||||
handler.ServeHTTP(w, r)
|
handler.ServeHTTP(w, r)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
select {
|
|
||||||
case c <- true:
|
ctx, ok := requestContextMapper.Get(r)
|
||||||
defer func() { <-c }()
|
if !ok {
|
||||||
|
handleError(w, r, fmt.Errorf("no context found for request, handler chain must be wrong"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
requestInfo, ok := request.RequestInfoFrom(ctx)
|
||||||
|
if !ok {
|
||||||
|
handleError(w, r, fmt.Errorf("no RequestInfo found in context, handler chain must be wrong"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var c chan bool
|
||||||
|
if !nonMutatingRequestVerbs.Has(requestInfo.Verb) {
|
||||||
|
c = mutatingChan
|
||||||
|
} else {
|
||||||
|
c = nonMutatingChan
|
||||||
|
}
|
||||||
|
|
||||||
|
if c == nil {
|
||||||
handler.ServeHTTP(w, r)
|
handler.ServeHTTP(w, r)
|
||||||
default:
|
} else {
|
||||||
tooManyRequests(r, w)
|
select {
|
||||||
|
case c <- true:
|
||||||
|
defer func() { <-c }()
|
||||||
|
handler.ServeHTTP(w, r)
|
||||||
|
default:
|
||||||
|
tooManyRequests(r, w)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -25,9 +25,46 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/errors"
|
"k8s.io/kubernetes/pkg/api/errors"
|
||||||
|
apiserverfilters "k8s.io/kubernetes/pkg/apiserver/filters"
|
||||||
|
"k8s.io/kubernetes/pkg/apiserver/request"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func createMaxInflightServer(callsWg, blockWg *sync.WaitGroup, disableCallsWg *bool, disableCallsWgMutex *sync.Mutex, nonMutating, mutating int) *httptest.Server {
|
||||||
|
|
||||||
|
// notAccountedPathsRegexp specifies paths requests to which we don't account into
|
||||||
|
// requests in flight.
|
||||||
|
notAccountedPathsRegexp := regexp.MustCompile(".*\\/watch")
|
||||||
|
longRunningRequestCheck := BasicLongRunningRequestCheck(notAccountedPathsRegexp, map[string]string{"watch": "true"})
|
||||||
|
|
||||||
|
requestContextMapper := api.NewRequestContextMapper()
|
||||||
|
requestInfoFactory := &request.RequestInfoFactory{}
|
||||||
|
handler := WithMaxInFlightLimit(
|
||||||
|
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
// A short, accounted request that does not wait for block WaitGroup.
|
||||||
|
if strings.Contains(r.URL.Path, "dontwait") {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
disableCallsWgMutex.Lock()
|
||||||
|
waitForCalls := *disableCallsWg
|
||||||
|
disableCallsWgMutex.Unlock()
|
||||||
|
if waitForCalls {
|
||||||
|
callsWg.Done()
|
||||||
|
}
|
||||||
|
blockWg.Wait()
|
||||||
|
}),
|
||||||
|
nonMutating,
|
||||||
|
mutating,
|
||||||
|
requestContextMapper,
|
||||||
|
longRunningRequestCheck,
|
||||||
|
)
|
||||||
|
handler = apiserverfilters.WithRequestInfo(handler, requestInfoFactory, requestContextMapper)
|
||||||
|
handler = api.WithRequestContext(handler, requestContextMapper)
|
||||||
|
|
||||||
|
return httptest.NewServer(handler)
|
||||||
|
}
|
||||||
|
|
||||||
// Tests that MaxInFlightLimit works, i.e.
|
// Tests that MaxInFlightLimit works, i.e.
|
||||||
// - "long" requests such as proxy or watch, identified by regexp are not accounted despite
|
// - "long" requests such as proxy or watch, identified by regexp are not accounted despite
|
||||||
// hanging for the long time,
|
// hanging for the long time,
|
||||||
@ -36,69 +73,53 @@ import (
|
|||||||
// - subsequent "short" requests are rejected instantly with appropriate error,
|
// - subsequent "short" requests are rejected instantly with appropriate error,
|
||||||
// - subsequent "long" requests are handled normally,
|
// - subsequent "long" requests are handled normally,
|
||||||
// - we correctly recover after some "short" requests finish, i.e. we can process new ones.
|
// - we correctly recover after some "short" requests finish, i.e. we can process new ones.
|
||||||
func TestMaxInFlight(t *testing.T) {
|
func TestMaxInFlightNonMutating(t *testing.T) {
|
||||||
const AllowedInflightRequestsNo = 3
|
const AllowedNonMutatingInflightRequestsNo = 3
|
||||||
|
|
||||||
// notAccountedPathsRegexp specifies paths requests to which we don't account into
|
|
||||||
// requests in flight.
|
|
||||||
notAccountedPathsRegexp := regexp.MustCompile(".*\\/watch")
|
|
||||||
longRunningRequestCheck := BasicLongRunningRequestCheck(notAccountedPathsRegexp, map[string]string{"watch": "true"})
|
|
||||||
|
|
||||||
// Calls is used to wait until all server calls are received. We are sending
|
// Calls is used to wait until all server calls are received. We are sending
|
||||||
// AllowedInflightRequestsNo of 'long' not-accounted requests and the same number of
|
// AllowedNonMutatingInflightRequestsNo of 'long' not-accounted requests and the same number of
|
||||||
// 'short' accounted ones.
|
// 'short' accounted ones.
|
||||||
calls := &sync.WaitGroup{}
|
calls := &sync.WaitGroup{}
|
||||||
calls.Add(AllowedInflightRequestsNo * 2)
|
calls.Add(AllowedNonMutatingInflightRequestsNo * 2)
|
||||||
|
|
||||||
// Responses is used to wait until all responses are
|
// Responses is used to wait until all responses are
|
||||||
// received. This prevents some async requests getting EOF
|
// received. This prevents some async requests getting EOF
|
||||||
// errors from prematurely closing the server
|
// errors from prematurely closing the server
|
||||||
responses := sync.WaitGroup{}
|
responses := &sync.WaitGroup{}
|
||||||
responses.Add(AllowedInflightRequestsNo * 2)
|
responses.Add(AllowedNonMutatingInflightRequestsNo * 2)
|
||||||
|
|
||||||
// Block is used to keep requests in flight for as long as we need to. All requests will
|
// Block is used to keep requests in flight for as long as we need to. All requests will
|
||||||
// be unblocked at the same time.
|
// be unblocked at the same time.
|
||||||
block := sync.WaitGroup{}
|
block := &sync.WaitGroup{}
|
||||||
block.Add(1)
|
block.Add(1)
|
||||||
|
|
||||||
server := httptest.NewServer(
|
waitForCalls := true
|
||||||
WithMaxInFlightLimit(
|
waitForCallsMutex := sync.Mutex{}
|
||||||
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
// A short, accounted request that does not wait for block WaitGroup.
|
server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, AllowedNonMutatingInflightRequestsNo, 1)
|
||||||
if strings.Contains(r.URL.Path, "dontwait") {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if calls != nil {
|
|
||||||
calls.Done()
|
|
||||||
}
|
|
||||||
block.Wait()
|
|
||||||
}),
|
|
||||||
AllowedInflightRequestsNo,
|
|
||||||
longRunningRequestCheck,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
|
|
||||||
// These should hang, but not affect accounting. use a query param match
|
// These should hang, but not affect accounting. use a query param match
|
||||||
for i := 0; i < AllowedInflightRequestsNo; i++ {
|
for i := 0; i < AllowedNonMutatingInflightRequestsNo; i++ {
|
||||||
// These should hang waiting on block...
|
// These should hang waiting on block...
|
||||||
go func() {
|
go func() {
|
||||||
if err := expectHTTP(server.URL+"/foo/bar?watch=true", http.StatusOK); err != nil {
|
if err := expectHTTPGet(server.URL+"/foo/bar?watch=true", http.StatusOK); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
responses.Done()
|
responses.Done()
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check that sever is not saturated by not-accounted calls
|
// Check that sever is not saturated by not-accounted calls
|
||||||
if err := expectHTTP(server.URL+"/dontwait", http.StatusOK); err != nil {
|
if err := expectHTTPGet(server.URL+"/dontwait", http.StatusOK); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// These should hang and be accounted, i.e. saturate the server
|
// These should hang and be accounted, i.e. saturate the server
|
||||||
for i := 0; i < AllowedInflightRequestsNo; i++ {
|
for i := 0; i < AllowedNonMutatingInflightRequestsNo; i++ {
|
||||||
// These should hang waiting on block...
|
// These should hang waiting on block...
|
||||||
go func() {
|
go func() {
|
||||||
if err := expectHTTP(server.URL, http.StatusOK); err != nil {
|
if err := expectHTTPGet(server.URL, http.StatusOK); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
responses.Done()
|
responses.Done()
|
||||||
@ -107,16 +128,23 @@ func TestMaxInFlight(t *testing.T) {
|
|||||||
// We wait for all calls to be received by the server
|
// We wait for all calls to be received by the server
|
||||||
calls.Wait()
|
calls.Wait()
|
||||||
// Disable calls notifications in the server
|
// Disable calls notifications in the server
|
||||||
calls = nil
|
waitForCallsMutex.Lock()
|
||||||
|
waitForCalls = false
|
||||||
|
waitForCallsMutex.Unlock()
|
||||||
|
|
||||||
// Do this multiple times to show that it rate limit rejected requests don't block.
|
// Do this multiple times to show that rate limit rejected requests don't block.
|
||||||
for i := 0; i < 2; i++ {
|
for i := 0; i < 2; i++ {
|
||||||
if err := expectHTTP(server.URL, errors.StatusTooManyRequests); err != nil {
|
if err := expectHTTPGet(server.URL, errors.StatusTooManyRequests); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Validate that non-accounted URLs still work. use a path regex match
|
// Validate that non-accounted URLs still work. use a path regex match
|
||||||
if err := expectHTTP(server.URL+"/dontwait/watch", http.StatusOK); err != nil {
|
if err := expectHTTPGet(server.URL+"/dontwait/watch", http.StatusOK); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// We should allow a single mutating request.
|
||||||
|
if err := expectHTTPPost(server.URL+"/dontwait", http.StatusOK); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -126,12 +154,73 @@ func TestMaxInFlight(t *testing.T) {
|
|||||||
// Show that we recover from being blocked up.
|
// Show that we recover from being blocked up.
|
||||||
// Too avoid flakyness we need to wait until at least one of the requests really finishes.
|
// Too avoid flakyness we need to wait until at least one of the requests really finishes.
|
||||||
responses.Wait()
|
responses.Wait()
|
||||||
if err := expectHTTP(server.URL, http.StatusOK); err != nil {
|
if err := expectHTTPGet(server.URL, http.StatusOK); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func expectHTTP(url string, code int) error {
|
func TestMaxInFlightMutating(t *testing.T) {
|
||||||
|
const AllowedMutatingInflightRequestsNo = 3
|
||||||
|
|
||||||
|
calls := &sync.WaitGroup{}
|
||||||
|
calls.Add(AllowedMutatingInflightRequestsNo)
|
||||||
|
|
||||||
|
responses := &sync.WaitGroup{}
|
||||||
|
responses.Add(AllowedMutatingInflightRequestsNo)
|
||||||
|
|
||||||
|
// Block is used to keep requests in flight for as long as we need to. All requests will
|
||||||
|
// be unblocked at the same time.
|
||||||
|
block := &sync.WaitGroup{}
|
||||||
|
block.Add(1)
|
||||||
|
|
||||||
|
waitForCalls := true
|
||||||
|
waitForCallsMutex := sync.Mutex{}
|
||||||
|
|
||||||
|
server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo)
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
// These should hang and be accounted, i.e. saturate the server
|
||||||
|
for i := 0; i < AllowedMutatingInflightRequestsNo; i++ {
|
||||||
|
// These should hang waiting on block...
|
||||||
|
go func() {
|
||||||
|
if err := expectHTTPPost(server.URL+"/foo/bar", http.StatusOK); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
responses.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
// We wait for all calls to be received by the server
|
||||||
|
calls.Wait()
|
||||||
|
// Disable calls notifications in the server
|
||||||
|
// Disable calls notifications in the server
|
||||||
|
waitForCallsMutex.Lock()
|
||||||
|
waitForCalls = false
|
||||||
|
waitForCallsMutex.Unlock()
|
||||||
|
|
||||||
|
// Do this multiple times to show that rate limit rejected requests don't block.
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
if err := expectHTTPPost(server.URL+"/foo/bar/", errors.StatusTooManyRequests); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Validate that non-mutating URLs still work. use a path regex match
|
||||||
|
if err := expectHTTPGet(server.URL+"/dontwait", http.StatusOK); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Let all hanging requests finish
|
||||||
|
block.Done()
|
||||||
|
|
||||||
|
// Show that we recover from being blocked up.
|
||||||
|
// Too avoid flakyness we need to wait until at least one of the requests really finishes.
|
||||||
|
responses.Wait()
|
||||||
|
if err := expectHTTPPost(server.URL+"/foo/bar", http.StatusOK); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// We use GET as a sample non-mutating request.
|
||||||
|
func expectHTTPGet(url string, code int) error {
|
||||||
r, err := http.Get(url)
|
r, err := http.Get(url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unexpected error: %v", err)
|
return fmt.Errorf("unexpected error: %v", err)
|
||||||
@ -141,3 +230,15 @@ func expectHTTP(url string, code int) error {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We use POST as a sample mutating request.
|
||||||
|
func expectHTTPPost(url string, code int) error {
|
||||||
|
r, err := http.Post(url, "text/html", strings.NewReader("foo bar"))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if r.StatusCode != code {
|
||||||
|
return fmt.Errorf("unexpected response: %v", r.StatusCode)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -44,31 +44,32 @@ type ServerRunOptions struct {
|
|||||||
AdmissionControlConfigFile string
|
AdmissionControlConfigFile string
|
||||||
AdvertiseAddress net.IP
|
AdvertiseAddress net.IP
|
||||||
|
|
||||||
CloudConfigFile string
|
CloudConfigFile string
|
||||||
CloudProvider string
|
CloudProvider string
|
||||||
CorsAllowedOriginList []string
|
CorsAllowedOriginList []string
|
||||||
DefaultStorageMediaType string
|
DefaultStorageMediaType string
|
||||||
DeleteCollectionWorkers int
|
DeleteCollectionWorkers int
|
||||||
AuditLogPath string
|
AuditLogPath string
|
||||||
AuditLogMaxAge int
|
AuditLogMaxAge int
|
||||||
AuditLogMaxBackups int
|
AuditLogMaxBackups int
|
||||||
AuditLogMaxSize int
|
AuditLogMaxSize int
|
||||||
EnableGarbageCollection bool
|
EnableGarbageCollection bool
|
||||||
EnableProfiling bool
|
EnableProfiling bool
|
||||||
EnableContentionProfiling bool
|
EnableContentionProfiling bool
|
||||||
EnableSwaggerUI bool
|
EnableSwaggerUI bool
|
||||||
EnableWatchCache bool
|
EnableWatchCache bool
|
||||||
ExternalHost string
|
ExternalHost string
|
||||||
KubernetesServiceNodePort int
|
KubernetesServiceNodePort int
|
||||||
LongRunningRequestRE string
|
LongRunningRequestRE string
|
||||||
MasterCount int
|
MasterCount int
|
||||||
MasterServiceNamespace string
|
MasterServiceNamespace string
|
||||||
MaxRequestsInFlight int
|
MaxRequestsInFlight int
|
||||||
MinRequestTimeout int
|
MaxMutatingRequestsInFlight int
|
||||||
RuntimeConfig config.ConfigurationMap
|
MinRequestTimeout int
|
||||||
ServiceClusterIPRange net.IPNet // TODO: make this a list
|
RuntimeConfig config.ConfigurationMap
|
||||||
ServiceNodePortRange utilnet.PortRange
|
ServiceClusterIPRange net.IPNet // TODO: make this a list
|
||||||
StorageVersions string
|
ServiceNodePortRange utilnet.PortRange
|
||||||
|
StorageVersions string
|
||||||
// The default values for StorageVersions. StorageVersions overrides
|
// The default values for StorageVersions. StorageVersions overrides
|
||||||
// these; you can change this if you want to change the defaults (e.g.,
|
// these; you can change this if you want to change the defaults (e.g.,
|
||||||
// for testing). This is not actually exposed as a flag.
|
// for testing). This is not actually exposed as a flag.
|
||||||
@ -80,22 +81,23 @@ type ServerRunOptions struct {
|
|||||||
|
|
||||||
func NewServerRunOptions() *ServerRunOptions {
|
func NewServerRunOptions() *ServerRunOptions {
|
||||||
return &ServerRunOptions{
|
return &ServerRunOptions{
|
||||||
AdmissionControl: "AlwaysAdmit",
|
AdmissionControl: "AlwaysAdmit",
|
||||||
DefaultStorageMediaType: "application/json",
|
DefaultStorageMediaType: "application/json",
|
||||||
DefaultStorageVersions: registered.AllPreferredGroupVersions(),
|
DefaultStorageVersions: registered.AllPreferredGroupVersions(),
|
||||||
DeleteCollectionWorkers: 1,
|
DeleteCollectionWorkers: 1,
|
||||||
EnableGarbageCollection: true,
|
EnableGarbageCollection: true,
|
||||||
EnableProfiling: true,
|
EnableProfiling: true,
|
||||||
EnableContentionProfiling: false,
|
EnableContentionProfiling: false,
|
||||||
EnableWatchCache: true,
|
EnableWatchCache: true,
|
||||||
LongRunningRequestRE: DefaultLongRunningRequestRE,
|
LongRunningRequestRE: DefaultLongRunningRequestRE,
|
||||||
MasterCount: 1,
|
MasterCount: 1,
|
||||||
MasterServiceNamespace: api.NamespaceDefault,
|
MasterServiceNamespace: api.NamespaceDefault,
|
||||||
MaxRequestsInFlight: 400,
|
MaxRequestsInFlight: 400,
|
||||||
MinRequestTimeout: 1800,
|
MaxMutatingRequestsInFlight: 200,
|
||||||
RuntimeConfig: make(config.ConfigurationMap),
|
MinRequestTimeout: 1800,
|
||||||
ServiceNodePortRange: DefaultServiceNodePortRange,
|
RuntimeConfig: make(config.ConfigurationMap),
|
||||||
StorageVersions: registered.AllPreferredGroupVersions(),
|
ServiceNodePortRange: DefaultServiceNodePortRange,
|
||||||
|
StorageVersions: registered.AllPreferredGroupVersions(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -253,7 +255,11 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) {
|
|||||||
"DEPRECATED: the namespace from which the kubernetes master services should be injected into pods.")
|
"DEPRECATED: the namespace from which the kubernetes master services should be injected into pods.")
|
||||||
|
|
||||||
fs.IntVar(&s.MaxRequestsInFlight, "max-requests-inflight", s.MaxRequestsInFlight, ""+
|
fs.IntVar(&s.MaxRequestsInFlight, "max-requests-inflight", s.MaxRequestsInFlight, ""+
|
||||||
"The maximum number of requests in flight at a given time. When the server exceeds this, "+
|
"The maximum number of non-mutating requests in flight at a given time. When the server exceeds this, "+
|
||||||
|
"it rejects requests. Zero for no limit.")
|
||||||
|
|
||||||
|
fs.IntVar(&s.MaxMutatingRequestsInFlight, "max-mutating-requests-inflight", s.MaxMutatingRequestsInFlight, ""+
|
||||||
|
"The maximum number of mutating requests in flight at a given time. When the server exceeds this, "+
|
||||||
"it rejects requests. Zero for no limit.")
|
"it rejects requests. Zero for no limit.")
|
||||||
|
|
||||||
fs.IntVar(&s.MinRequestTimeout, "min-request-timeout", s.MinRequestTimeout, ""+
|
fs.IntVar(&s.MinRequestTimeout, "min-request-timeout", s.MinRequestTimeout, ""+
|
||||||
|
Loading…
Reference in New Issue
Block a user