Detect long-running requests from parsed request info

This commit is contained in:
Jordan Liggitt 2016-12-05 11:04:19 -05:00
parent 55f13b5729
commit 4359054616
No known key found for this signature in database
GPG Key ID: 24E7ADF9A3B42012
13 changed files with 59 additions and 161 deletions

View File

@ -35,11 +35,13 @@ go_library(
"//pkg/generated/openapi:go_default_library",
"//pkg/genericapiserver:go_default_library",
"//pkg/genericapiserver/authorizer:go_default_library",
"//pkg/genericapiserver/filters:go_default_library",
"//pkg/master:go_default_library",
"//pkg/registry/cachesize:go_default_library",
"//pkg/runtime/schema:go_default_library",
"//pkg/util/errors:go_default_library",
"//pkg/util/net:go_default_library",
"//pkg/util/sets:go_default_library",
"//pkg/util/wait:go_default_library",
"//pkg/version:go_default_library",
"//plugin/pkg/admission/admit:go_default_library",
@ -67,11 +69,3 @@ go_library(
"//vendor:github.com/spf13/pflag",
],
)
go_test(
name = "go_default_test",
srcs = ["server_test.go"],
library = "go_default_library",
tags = ["automanaged"],
deps = ["//cmd/kube-apiserver/app/options:go_default_library"],
)

View File

@ -50,11 +50,13 @@ import (
generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi"
"k8s.io/kubernetes/pkg/genericapiserver"
"k8s.io/kubernetes/pkg/genericapiserver/authorizer"
"k8s.io/kubernetes/pkg/genericapiserver/filters"
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/registry/cachesize"
"k8s.io/kubernetes/pkg/runtime/schema"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/version"
)
@ -277,6 +279,10 @@ func Run(s *options.ServerRunOptions) error {
genericConfig.EnableOpenAPISupport = true
genericConfig.EnableMetrics = true
genericConfig.OpenAPIConfig.SecurityDefinitions = securityDefinitions
genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
)
config := &master.Config{
GenericConfig: genericConfig,

View File

@ -1,65 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package app
import (
"regexp"
"testing"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
)
func TestLongRunningRequestRegexp(t *testing.T) {
regexp := regexp.MustCompile(options.NewServerRunOptions().GenericServerRunOptions.LongRunningRequestRE)
dontMatch := []string{
"/api/v1/watch-namespace/",
"/api/v1/namespace-proxy/",
"/api/v1/namespace-watch",
"/api/v1/namespace-proxy",
"/api/v1/namespace-portforward/pods",
"/api/v1/portforward/pods",
". anything",
"/ that",
}
doMatch := []string{
"/api/v1/pods/watch",
"/api/v1/watch/stuff",
"/api/v1/default/service/proxy",
"/api/v1/pods/proxy/path/to/thing",
"/api/v1/namespaces/myns/pods/mypod/log",
"/api/v1/namespaces/myns/pods/mypod/logs",
"/api/v1/namespaces/myns/pods/mypod/portforward",
"/api/v1/namespaces/myns/pods/mypod/exec",
"/api/v1/namespaces/myns/pods/mypod/attach",
"/api/v1/namespaces/myns/pods/mypod/log/",
"/api/v1/namespaces/myns/pods/mypod/logs/",
"/api/v1/namespaces/myns/pods/mypod/portforward/",
"/api/v1/namespaces/myns/pods/mypod/exec/",
"/api/v1/namespaces/myns/pods/mypod/attach/",
"/api/v1/watch/namespaces/myns/pods",
}
for _, path := range dontMatch {
if regexp.MatchString(path) {
t.Errorf("path should not have match regexp but did: %s", path)
}
}
for _, path := range doMatch {
if !regexp.MatchString(path) {
t.Errorf("path should have match regexp did not: %s", path)
}
}
}

View File

@ -42,6 +42,7 @@ go_library(
"//pkg/generated/openapi:go_default_library",
"//pkg/genericapiserver:go_default_library",
"//pkg/genericapiserver/authorizer:go_default_library",
"//pkg/genericapiserver/filters:go_default_library",
"//pkg/registry/cachesize:go_default_library",
"//pkg/registry/core/configmap/etcd:go_default_library",
"//pkg/registry/core/event/etcd:go_default_library",
@ -57,6 +58,7 @@ go_library(
"//pkg/routes:go_default_library",
"//pkg/runtime/schema:go_default_library",
"//pkg/util/errors:go_default_library",
"//pkg/util/sets:go_default_library",
"//pkg/util/wait:go_default_library",
"//pkg/version:go_default_library",
"//plugin/pkg/admission/admit:go_default_library",

View File

@ -38,12 +38,14 @@ import (
"k8s.io/kubernetes/pkg/generated/openapi"
"k8s.io/kubernetes/pkg/genericapiserver"
"k8s.io/kubernetes/pkg/genericapiserver/authorizer"
"k8s.io/kubernetes/pkg/genericapiserver/filters"
"k8s.io/kubernetes/pkg/registry/cachesize"
"k8s.io/kubernetes/pkg/registry/generic"
genericregistry "k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/routes"
"k8s.io/kubernetes/pkg/runtime/schema"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/version"
)
@ -166,6 +168,10 @@ func Run(s *options.ServerRunOptions) error {
genericConfig.OpenAPIConfig.Definitions = openapi.OpenAPIDefinitions
genericConfig.EnableOpenAPISupport = true
genericConfig.OpenAPIConfig.SecurityDefinitions = securityDefinitions
genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
)
// TODO: Move this to generic api server (Need to move the command line flag).
if s.GenericServerRunOptions.EnableWatchCache {

View File

@ -25,7 +25,6 @@ import (
"net"
"net/http"
"os"
"regexp"
goruntime "runtime"
"sort"
"strconv"
@ -187,8 +186,6 @@ type SecureServingInfo struct {
// NewConfig returns a Config struct with the default values
func NewConfig() *Config {
longRunningRE := regexp.MustCompile(options.DefaultLongRunningRequestRE)
config := &Config{
Serializer: api.Codecs,
ReadWritePort: 6443,
@ -214,7 +211,10 @@ func NewConfig() *Config {
},
GetOperationIDAndTags: apiserveropenapi.GetOperationIDAndTags,
},
LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(longRunningRE, map[string]string{"watch": "true"}),
// Default to treating watch as a long-running operation
// Generic API servers have no inherent long-running subresources
LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
}
// this keeps the defaults in sync
@ -471,7 +471,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) (secure, insec
generic := func(handler http.Handler) http.Handler {
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
handler = genericfilters.WithPanicRecovery(handler, c.RequestContextMapper)
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc)
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.RequestContextMapper, c.LongRunningFunc)
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.RequestContextMapper, c.LongRunningFunc)
handler = apiserverfilters.WithRequestInfo(handler, NewRequestInfoResolver(c), c.RequestContextMapper)
handler = api.WithRequestContext(handler, c.RequestContextMapper)

View File

@ -47,5 +47,6 @@ go_test(
"//pkg/api/errors:go_default_library",
"//pkg/apiserver/filters:go_default_library",
"//pkg/apiserver/request:go_default_library",
"//pkg/util/sets:go_default_library",
],
)

View File

@ -18,29 +18,23 @@ package filters
import (
"net/http"
"regexp"
"strings"
"k8s.io/kubernetes/pkg/apiserver/request"
"k8s.io/kubernetes/pkg/util/sets"
)
// LongRunningRequestCheck is a predicate which is true for long-running http requests.
type LongRunningRequestCheck func(r *http.Request) bool
type LongRunningRequestCheck func(r *http.Request, requestInfo *request.RequestInfo) bool
// BasicLongRunningRequestCheck pathRegex operates against the url path, the queryParams match is case insensitive.
// Any one match flags the request.
// TODO tighten this check to eliminate the abuse potential by malicious clients that start setting queryParameters
// to bypass the rate limitter. This could be done using a full parse and special casing the bits we need.
func BasicLongRunningRequestCheck(pathRegex *regexp.Regexp, queryParams map[string]string) LongRunningRequestCheck {
return func(r *http.Request) bool {
if pathRegex.MatchString(r.URL.Path) {
// BasicLongRunningRequestCheck returns true if the given request has one of the specified verbs or one of the specified subresources
func BasicLongRunningRequestCheck(longRunningVerbs, longRunningSubresources sets.String) LongRunningRequestCheck {
return func(r *http.Request, requestInfo *request.RequestInfo) bool {
if longRunningVerbs.Has(requestInfo.Verb) {
return true
}
for key, expectedValue := range queryParams {
if strings.ToLower(expectedValue) == strings.ToLower(r.URL.Query().Get(key)) {
return true
}
if requestInfo.IsResourceRequest && longRunningSubresources.Has(requestInfo.Subresource) {
return true
}
return false
}
}

View File

@ -62,13 +62,6 @@ func WithMaxInFlightLimit(
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// TODO: migrate to use requestInfo instead of having custom request parser.
if longRunningRequestCheck(r) {
// Skip tracking long running events.
handler.ServeHTTP(w, r)
return
}
ctx, ok := requestContextMapper.Get(r)
if !ok {
handleError(w, r, fmt.Errorf("no context found for request, handler chain must be wrong"))
@ -80,6 +73,12 @@ func WithMaxInFlightLimit(
return
}
// Skip tracking long running events.
if longRunningRequestCheck != nil && longRunningRequestCheck(r, requestInfo) {
handler.ServeHTTP(w, r)
return
}
var c chan bool
if !nonMutatingRequestVerbs.Has(requestInfo.Verb) {
c = mutatingChan

View File

@ -20,7 +20,6 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"regexp"
"strings"
"sync"
"testing"
@ -29,17 +28,15 @@ import (
"k8s.io/kubernetes/pkg/api/errors"
apiserverfilters "k8s.io/kubernetes/pkg/apiserver/filters"
"k8s.io/kubernetes/pkg/apiserver/request"
"k8s.io/kubernetes/pkg/util/sets"
)
func createMaxInflightServer(callsWg, blockWg *sync.WaitGroup, disableCallsWg *bool, disableCallsWgMutex *sync.Mutex, nonMutating, mutating int) *httptest.Server {
// notAccountedPathsRegexp specifies paths requests to which we don't account into
// requests in flight.
notAccountedPathsRegexp := regexp.MustCompile(".*\\/watch")
longRunningRequestCheck := BasicLongRunningRequestCheck(notAccountedPathsRegexp, map[string]string{"watch": "true"})
longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy"))
requestContextMapper := api.NewRequestContextMapper()
requestInfoFactory := &request.RequestInfoFactory{}
requestInfoFactory := &request.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")}
handler := WithMaxInFlightLimit(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// A short, accounted request that does not wait for block WaitGroup.
@ -103,7 +100,7 @@ func TestMaxInFlightNonMutating(t *testing.T) {
for i := 0; i < AllowedNonMutatingInflightRequestsNo; i++ {
// These should hang waiting on block...
go func() {
if err := expectHTTPGet(server.URL+"/foo/bar?watch=true", http.StatusOK); err != nil {
if err := expectHTTPGet(server.URL+"/api/v1/namespaces/default/wait?watch=true", http.StatusOK); err != nil {
t.Error(err)
}
responses.Done()
@ -139,7 +136,7 @@ func TestMaxInFlightNonMutating(t *testing.T) {
}
}
// Validate that non-accounted URLs still work. use a path regex match
if err := expectHTTPGet(server.URL+"/dontwait/watch", http.StatusOK); err != nil {
if err := expectHTTPGet(server.URL+"/api/v1/watch/namespaces/default/dontwait", http.StatusOK); err != nil {
t.Error(err)
}

View File

@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/apiserver/request"
)
const globalTimeout = time.Minute
@ -34,13 +35,23 @@ const globalTimeout = time.Minute
var errConnKilled = fmt.Errorf("kill connection/stream")
// WithTimeoutForNonLongRunningRequests times out non-long-running requests after the time given by globalTimeout.
func WithTimeoutForNonLongRunningRequests(handler http.Handler, longRunning LongRunningRequestCheck) http.Handler {
func WithTimeoutForNonLongRunningRequests(handler http.Handler, requestContextMapper api.RequestContextMapper, longRunning LongRunningRequestCheck) http.Handler {
if longRunning == nil {
return handler
}
timeoutFunc := func(req *http.Request) (<-chan time.Time, string) {
// TODO unify this with apiserver.MaxInFlightLimit
if longRunning(req) {
ctx, ok := requestContextMapper.Get(req)
if !ok {
return time.After(globalTimeout), ""
}
requestInfo, ok := request.RequestInfoFrom(ctx)
if !ok {
return time.After(globalTimeout), ""
}
if longRunning(req, requestInfo) {
return nil, ""
}
return time.After(globalTimeout), ""

View File

@ -31,11 +31,6 @@ import (
"github.com/spf13/pflag"
)
const (
// TODO: This can be tightened up. It still matches objects named watch or proxy.
DefaultLongRunningRequestRE = "(/|^)((watch|proxy)(/|$)|(logs?|portforward|exec|attach)/?$)"
)
var DefaultServiceNodePortRange = utilnet.PortRange{Base: 30000, Size: 2768}
// ServerRunOptions contains the options while running a generic api server.
@ -60,7 +55,6 @@ type ServerRunOptions struct {
EnableWatchCache bool
ExternalHost string
KubernetesServiceNodePort int
LongRunningRequestRE string
MasterCount int
MasterServiceNamespace string
MaxRequestsInFlight int
@ -88,7 +82,6 @@ func NewServerRunOptions() *ServerRunOptions {
EnableProfiling: true,
EnableContentionProfiling: false,
EnableWatchCache: true,
LongRunningRequestRE: DefaultLongRunningRequestRE,
MasterCount: 1,
MasterServiceNamespace: api.NamespaceDefault,
MaxRequestsInFlight: 400,
@ -243,9 +236,11 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) {
"of type NodePort, using this as the value of the port. If zero, the Kubernetes master "+
"service will be of type ClusterIP.")
fs.StringVar(&s.LongRunningRequestRE, "long-running-request-regexp", s.LongRunningRequestRE, ""+
// TODO: remove post-1.6
fs.String("long-running-request-regexp", "", ""+
"A regular expression matching long running requests which should "+
"be excluded from maximum inflight request handling.")
fs.MarkDeprecated("long-running-request-regexp", "regular expression matching of long-running requests is no longer supported")
fs.IntVar(&s.MasterCount, "apiserver-count", s.MasterCount,
"The number of apiservers running in the cluster.")

View File

@ -22,7 +22,6 @@ import (
"io/ioutil"
"net"
"net/http"
"regexp"
"testing"
"time"
@ -37,47 +36,6 @@ import (
"k8s.io/kubernetes/pkg/runtime/schema"
)
func TestLongRunningRequestRegexp(t *testing.T) {
regexp := regexp.MustCompile(options.NewServerRunOptions().GenericServerRunOptions.LongRunningRequestRE)
dontMatch := []string{
"/api/v1/watch-namespace/",
"/api/v1/namespace-proxy/",
"/api/v1/namespace-watch",
"/api/v1/namespace-proxy",
"/api/v1/namespace-portforward/pods",
"/api/v1/portforward/pods",
". anything",
"/ that",
}
doMatch := []string{
"/api/v1/pods/watch",
"/api/v1/watch/stuff",
"/api/v1/default/service/proxy",
"/api/v1/pods/proxy/path/to/thing",
"/api/v1/namespaces/myns/pods/mypod/log",
"/api/v1/namespaces/myns/pods/mypod/logs",
"/api/v1/namespaces/myns/pods/mypod/portforward",
"/api/v1/namespaces/myns/pods/mypod/exec",
"/api/v1/namespaces/myns/pods/mypod/attach",
"/api/v1/namespaces/myns/pods/mypod/log/",
"/api/v1/namespaces/myns/pods/mypod/logs/",
"/api/v1/namespaces/myns/pods/mypod/portforward/",
"/api/v1/namespaces/myns/pods/mypod/exec/",
"/api/v1/namespaces/myns/pods/mypod/attach/",
"/api/v1/watch/namespaces/myns/pods",
}
for _, path := range dontMatch {
if regexp.MatchString(path) {
t.Errorf("path should not have match regexp but did: %s", path)
}
}
for _, path := range doMatch {
if !regexp.MatchString(path) {
t.Errorf("path should have match regexp did not: %s", path)
}
}
}
var securePort = 6443 + 2
var insecurePort = 8080 + 2
var serverIP = fmt.Sprintf("http://localhost:%v", insecurePort)