diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 05e33692dfd..db5368bf02f 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -197,6 +197,12 @@ type Config struct { // Predicate which is true for paths of long-running http requests LongRunningFunc apirequest.LongRunningRequestCheck + // GoawayChance is the probability that send a GOAWAY to HTTP/2 clients. When client received + // GOAWAY, the in-flight requests will not be affected and new requests will use + // a new TCP connection to triggering re-balancing to another server behind the load balance. + // Default to 0, means never send GOAWAY. Max is 0.02 to prevent break the apiserver. + GoawayChance float64 + // MergedResourceConfig indicates which groupVersion enabled and its resources enabled/disabled. // This is composed of genericapiserver defaultAPIResourceConfig and those parsed from flags. // If not specify any in flags, then genericapiserver will only enable defaultAPIResourceConfig. @@ -671,6 +677,9 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout) handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup) handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver) + if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 { + handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance) + } handler = genericfilters.WithPanicRecovery(handler) return handler } diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD b/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD index d8fdedd70e0..69c80d3639c 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD @@ -11,6 +11,7 @@ go_test( srcs = [ "content_type_test.go", "cors_test.go", + "goaway_test.go", "maxinflight_test.go", "timeout_test.go", ], @@ -25,6 +26,7 @@ go_test( "//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/filters:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library", + "//vendor/golang.org/x/net/http2:go_default_library", ], ) @@ -34,6 +36,7 @@ go_library( "content_type.go", "cors.go", "doc.go", + "goaway.go", "longrunning.go", "maxinflight.go", "priority-and-fairness.go", diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/goaway.go b/staging/src/k8s.io/apiserver/pkg/server/filters/goaway.go new file mode 100644 index 00000000000..8b1b5e8ea4d --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/goaway.go @@ -0,0 +1,88 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filters + +import ( + "math/rand" + "net/http" + "sync" +) + +// GoawayDecider decides if server should send a GOAWAY +type GoawayDecider interface { + Goaway(r *http.Request) bool +} + +var ( + // randPool used to get a rand.Rand and generate a random number thread-safely, + // which improve the performance of using rand.Rand with a locker + randPool = &sync.Pool{ + New: func() interface{} { + return rand.New(rand.NewSource(rand.Int63())) + }, + } +) + +// WithProbabilisticGoaway returns an http.Handler that send GOAWAY probabilistically +// according to the given chance for HTTP2 requests. After client receive GOAWAY, +// the in-flight long-running requests will not be influenced, and the new requests +// will use a new TCP connection to re-balancing to another server behind the load balance. +func WithProbabilisticGoaway(inner http.Handler, chance float64) http.Handler { + return &goaway{ + handler: inner, + decider: &probabilisticGoawayDecider{ + chance: chance, + next: func() float64 { + rnd := randPool.Get().(*rand.Rand) + ret := rnd.Float64() + randPool.Put(rnd) + return ret + }, + }, + } +} + +// goaway send a GOAWAY to client according to decider for HTTP2 requests +type goaway struct { + handler http.Handler + decider GoawayDecider +} + +// ServeHTTP implement HTTP handler +func (h *goaway) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Proto == "HTTP/2.0" && h.decider.Goaway(r) { + // Send a GOAWAY and tear down the TCP connection when idle. + w.Header().Set("Connection", "close") + } + + h.handler.ServeHTTP(w, r) +} + +// probabilisticGoawayDecider send GOAWAY probabilistically according to chance +type probabilisticGoawayDecider struct { + chance float64 + next func() float64 +} + +// Goaway implement GoawayDecider +func (p *probabilisticGoawayDecider) Goaway(r *http.Request) bool { + if p.next() < p.chance { + return true + } + + return false +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/goaway_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/goaway_test.go new file mode 100644 index 00000000000..ff35f30b9c6 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/goaway_test.go @@ -0,0 +1,309 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filters + +import ( + "crypto/tls" + "io" + "math/rand" + "net" + "net/http" + "net/http/httptest" + "testing" + "time" + + "golang.org/x/net/http2" +) + +func TestProbabilisticGoawayDecider(t *testing.T) { + cases := []struct { + name string + chance float64 + nextFn func(chance float64) func() float64 + expectGOAWAY bool + }{ + { + name: "always not GOAWAY", + chance: 0, + nextFn: func(chance float64) func() float64 { + return rand.Float64 + }, + expectGOAWAY: false, + }, + { + name: "always GOAWAY", + chance: 1, + nextFn: func(chance float64) func() float64 { + return rand.Float64 + }, + expectGOAWAY: true, + }, + { + name: "hit GOAWAY", + chance: rand.Float64() + 0.01, + nextFn: func(chance float64) func() float64 { + return func() float64 { + return chance - 0.001 + } + }, + expectGOAWAY: true, + }, + { + name: "does not hit GOAWAY", + chance: rand.Float64() + 0.01, + nextFn: func(chance float64) func() float64 { + return func() float64 { + return chance + 0.001 + } + }, + expectGOAWAY: false, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + d := probabilisticGoawayDecider{chance: tc.chance, next: tc.nextFn(tc.chance)} + result := d.Goaway(nil) + if result != tc.expectGOAWAY { + t.Errorf("expect GOAWAY: %v, got: %v", tc.expectGOAWAY, result) + } + }) + } +} + +// TestClientReceivedGOAWAY tests the in-flight watch requests will not be affected and new requests use a +// connection after client received GOAWAY, and server response watch request with GOAWAY will not break client +// watching body read. +func TestClientReceivedGOAWAY(t *testing.T) { + const ( + urlNormal = "/normal" + urlWatch = "/watch" + urlGoaway = "/goaway" + urlWatchWithGoaway = "/watch-with-goaway" + ) + + const ( + // indicate the bytes watch request will be sent + // used to check if watch request was broke by GOAWAY + watchExpectSendBytes = 5 + ) + + watchHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + timer := time.NewTicker(time.Second) + + w.Header().Set("Transfer-Encoding", "chunked") + w.WriteHeader(200) + + flusher, _ := w.(http.Flusher) + flusher.Flush() + + count := 0 + for { + select { + case <-timer.C: + n, err := w.Write([]byte("w")) + if err != nil { + return + } + flusher.Flush() + count += n + if count == watchExpectSendBytes { + return + } + } + } + }) + + mux := http.NewServeMux() + mux.Handle(urlNormal, WithProbabilisticGoaway(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("hello")) + return + }), 0)) + mux.Handle(urlWatch, WithProbabilisticGoaway(watchHandler, 0)) + mux.Handle(urlGoaway, WithProbabilisticGoaway(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("hello")) + return + }), 1)) + mux.Handle(urlWatchWithGoaway, WithProbabilisticGoaway(watchHandler, 1)) + + s := httptest.NewUnstartedServer(mux) + + http2Options := &http2.Server{} + + if err := http2.ConfigureServer(s.Config, http2Options); err != nil { + t.Fatalf("failed to configure test server to be HTTP2 server, err: %v", err) + } + + s.TLS = s.Config.TLSConfig + s.StartTLS() + defer s.Close() + + tlsConfig := &tls.Config{ + InsecureSkipVerify: true, + NextProtos: []string{http2.NextProtoTLS}, + } + + cases := []struct { + name string + reqs []string + // expectConnections always equals to GOAWAY requests(urlGoaway or urlWatchWithGoaway) + 1 + expectConnections int + }{ + { + name: "all normal requests use only one connection", + reqs: []string{urlNormal, urlNormal, urlNormal}, + expectConnections: 1, + }, + { + name: "got GOAWAY after set-up watch", + reqs: []string{urlNormal, urlWatch, urlGoaway, urlNormal, urlNormal}, + expectConnections: 2, + }, + { + name: "got GOAWAY after set-up watch, and set-up a new watch", + reqs: []string{urlNormal, urlWatch, urlGoaway, urlWatch, urlNormal, urlNormal}, + expectConnections: 2, + }, + { + name: "got 2 GOAWAY after set-up watch", + reqs: []string{urlNormal, urlWatch, urlGoaway, urlGoaway, urlNormal, urlNormal}, + expectConnections: 3, + }, + { + name: "combine with watch-with-goaway", + reqs: []string{urlNormal, urlWatchWithGoaway, urlNormal, urlWatch, urlGoaway, urlNormal, urlNormal}, + expectConnections: 3, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + // localAddr indicates how many TCP connection set up + localAddr := make([]string, 0) + + // init HTTP2 client + client := http.Client{ + Transport: &http2.Transport{ + TLSClientConfig: tlsConfig, + DialTLS: func(network, addr string, cfg *tls.Config) (conn net.Conn, err error) { + conn, err = tls.Dial(network, addr, cfg) + if err != nil { + t.Fatalf("unexpect connection err: %v", err) + } + localAddr = append(localAddr, conn.LocalAddr().String()) + return + }, + }, + } + + watchChs := make([]chan int, 0) + for _, url := range tc.reqs { + req, err := http.NewRequest(http.MethodGet, s.URL+url, nil) + if err != nil { + t.Fatalf("unexpect new request error: %v", err) + } + resp, err := client.Do(req) + if err != nil { + t.Fatalf("failed request test server, err: %v", err) + } + + // encounter watch bytes received, does not expect to be broken + if url == urlWatch || url == urlWatchWithGoaway { + ch := make(chan int) + watchChs = append(watchChs, ch) + go func() { + count := 0 + for { + buffer := make([]byte, 1) + n, err := resp.Body.Read(buffer) + if err != nil { + // urlWatch will receive io.EOF, + // urlWatchWithGoaway will receive http2.GoAwayError + if err != io.EOF { + if _, ok := err.(http2.GoAwayError); !ok { + t.Errorf("watch received not EOF err: %v", err) + } + } + ch <- count + return + } + count += n + } + }() + } + } + + // check TCP connection count + if tc.expectConnections != len(localAddr) { + t.Fatalf("expect TCP connection: %d, actual: %d", tc.expectConnections, len(localAddr)) + } + + // check if watch request is broken by GOAWAY response + watchTimeout := time.NewTimer(time.Second * 10) + for _, watchCh := range watchChs { + select { + case n := <-watchCh: + if n != watchExpectSendBytes { + t.Fatalf("in-flight watch was broken by GOAWAY response, expect go bytes: %d, actual got: %d", watchExpectSendBytes, n) + } + case <-watchTimeout.C: + t.Error("watch receive timeout") + } + } + }) + } +} + +func TestHTTP1Requests(t *testing.T) { + s := httptest.NewUnstartedServer(WithProbabilisticGoaway(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("hello")) + return + }), 1)) + + http2Options := &http2.Server{} + + if err := http2.ConfigureServer(s.Config, http2Options); err != nil { + t.Fatalf("failed to configure test server to be HTTP2 server, err: %v", err) + } + + s.TLS = s.Config.TLSConfig + s.StartTLS() + defer s.Close() + + tlsConfig := &tls.Config{ + InsecureSkipVerify: true, + NextProtos: []string{"http/1.1"}, + } + + client := http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsConfig, + }, + } + + resp, err := client.Get(s.URL) + if err != nil { + t.Fatalf("failed to request the server, err: %v", err) + } + + if v := resp.Header.Get("Connection"); v != "" { + t.Errorf("expect response HTTP header Connection to be empty, but got: %s", v) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go b/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go index 9645ece1d38..a1190f171ad 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go @@ -38,6 +38,7 @@ type ServerRunOptions struct { MaxRequestsInFlight int MaxMutatingRequestsInFlight int RequestTimeout time.Duration + GoawayChance float64 LivezGracePeriod time.Duration MinRequestTimeout int ShutdownDelayDuration time.Duration @@ -76,6 +77,7 @@ func (s *ServerRunOptions) ApplyTo(c *server.Config) error { c.MaxMutatingRequestsInFlight = s.MaxMutatingRequestsInFlight c.LivezGracePeriod = s.LivezGracePeriod c.RequestTimeout = s.RequestTimeout + c.GoawayChance = s.GoawayChance c.MinRequestTimeout = s.MinRequestTimeout c.ShutdownDelayDuration = s.ShutdownDelayDuration c.JSONPatchMaxCopyBytes = s.JSONPatchMaxCopyBytes @@ -125,6 +127,10 @@ func (s *ServerRunOptions) Validate() []error { errors = append(errors, fmt.Errorf("--request-timeout can not be negative value")) } + if s.GoawayChance < 0 || s.GoawayChance > 0.02 { + errors = append(errors, fmt.Errorf("--goaway-chance can not be less than 0 or greater than 0.02")) + } + if s.MinRequestTimeout < 0 { errors = append(errors, fmt.Errorf("--min-request-timeout can not be negative value")) } @@ -182,6 +188,12 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) { "it out. This is the default request timeout for requests but may be overridden by flags such as "+ "--min-request-timeout for specific types of requests.") + fs.Float64Var(&s.GoawayChance, "goaway-chance", s.GoawayChance, ""+ + "To prevent HTTP/2 clients from getting stuck on a single apiserver, randomly close a connection (GOAWAY). "+ + "The client's other in-flight requests won't be affected, and the client will reconnect, likely landing on a different apiserver after going through the load balancer again. "+ + "This argument sets the fraction of requests that will be sent a GOAWAY. Clusters with single apiservers, or which don't use a load balancer, should NOT enable this. "+ + "Min is 0 (off), Max is .02 (1/50 requests); .001 (1/1000) is a recommended starting point.") + fs.DurationVar(&s.LivezGracePeriod, "livez-grace-period", s.LivezGracePeriod, ""+ "This option represents the maximum amount of time it should take for apiserver to complete its startup sequence "+ "and become live. From apiserver's start time to when this amount of time has elapsed, /livez will assume "+