add a new generic filter goaway

This commit is contained in:
chenjun.cj 2020-02-28 05:27:25 +08:00
parent b6b494b448
commit 81f46b64a3
5 changed files with 421 additions and 0 deletions

View File

@ -197,6 +197,12 @@ type Config struct {
// Predicate which is true for paths of long-running http requests
LongRunningFunc apirequest.LongRunningRequestCheck
// GoawayChance is the probability that send a GOAWAY to HTTP/2 clients. When client received
// GOAWAY, the in-flight requests will not be affected and new requests will use
// a new TCP connection to triggering re-balancing to another server behind the load balance.
// Default to 0, means never send GOAWAY. Max is 0.02 to prevent break the apiserver.
GoawayChance float64
// MergedResourceConfig indicates which groupVersion enabled and its resources enabled/disabled.
// This is composed of genericapiserver defaultAPIResourceConfig and those parsed from flags.
// If not specify any in flags, then genericapiserver will only enable defaultAPIResourceConfig.
@ -671,6 +677,9 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout)
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {
handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance)
}
handler = genericfilters.WithPanicRecovery(handler)
return handler
}

View File

@ -11,6 +11,7 @@ go_test(
srcs = [
"content_type_test.go",
"cors_test.go",
"goaway_test.go",
"maxinflight_test.go",
"timeout_test.go",
],
@ -25,6 +26,7 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/filters:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//vendor/golang.org/x/net/http2:go_default_library",
],
)
@ -34,6 +36,7 @@ go_library(
"content_type.go",
"cors.go",
"doc.go",
"goaway.go",
"longrunning.go",
"maxinflight.go",
"priority-and-fairness.go",

View File

@ -0,0 +1,88 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
import (
"math/rand"
"net/http"
"sync"
)
// GoawayDecider decides if server should send a GOAWAY
type GoawayDecider interface {
Goaway(r *http.Request) bool
}
var (
// randPool used to get a rand.Rand and generate a random number thread-safely,
// which improve the performance of using rand.Rand with a locker
randPool = &sync.Pool{
New: func() interface{} {
return rand.New(rand.NewSource(rand.Int63()))
},
}
)
// WithProbabilisticGoaway returns an http.Handler that send GOAWAY probabilistically
// according to the given chance for HTTP2 requests. After client receive GOAWAY,
// the in-flight long-running requests will not be influenced, and the new requests
// will use a new TCP connection to re-balancing to another server behind the load balance.
func WithProbabilisticGoaway(inner http.Handler, chance float64) http.Handler {
return &goaway{
handler: inner,
decider: &probabilisticGoawayDecider{
chance: chance,
next: func() float64 {
rnd := randPool.Get().(*rand.Rand)
ret := rnd.Float64()
randPool.Put(rnd)
return ret
},
},
}
}
// goaway send a GOAWAY to client according to decider for HTTP2 requests
type goaway struct {
handler http.Handler
decider GoawayDecider
}
// ServeHTTP implement HTTP handler
func (h *goaway) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Proto == "HTTP/2.0" && h.decider.Goaway(r) {
// Send a GOAWAY and tear down the TCP connection when idle.
w.Header().Set("Connection", "close")
}
h.handler.ServeHTTP(w, r)
}
// probabilisticGoawayDecider send GOAWAY probabilistically according to chance
type probabilisticGoawayDecider struct {
chance float64
next func() float64
}
// Goaway implement GoawayDecider
func (p *probabilisticGoawayDecider) Goaway(r *http.Request) bool {
if p.next() < p.chance {
return true
}
return false
}

View File

@ -0,0 +1,309 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
import (
"crypto/tls"
"io"
"math/rand"
"net"
"net/http"
"net/http/httptest"
"testing"
"time"
"golang.org/x/net/http2"
)
func TestProbabilisticGoawayDecider(t *testing.T) {
cases := []struct {
name string
chance float64
nextFn func(chance float64) func() float64
expectGOAWAY bool
}{
{
name: "always not GOAWAY",
chance: 0,
nextFn: func(chance float64) func() float64 {
return rand.Float64
},
expectGOAWAY: false,
},
{
name: "always GOAWAY",
chance: 1,
nextFn: func(chance float64) func() float64 {
return rand.Float64
},
expectGOAWAY: true,
},
{
name: "hit GOAWAY",
chance: rand.Float64() + 0.01,
nextFn: func(chance float64) func() float64 {
return func() float64 {
return chance - 0.001
}
},
expectGOAWAY: true,
},
{
name: "does not hit GOAWAY",
chance: rand.Float64() + 0.01,
nextFn: func(chance float64) func() float64 {
return func() float64 {
return chance + 0.001
}
},
expectGOAWAY: false,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
d := probabilisticGoawayDecider{chance: tc.chance, next: tc.nextFn(tc.chance)}
result := d.Goaway(nil)
if result != tc.expectGOAWAY {
t.Errorf("expect GOAWAY: %v, got: %v", tc.expectGOAWAY, result)
}
})
}
}
// TestClientReceivedGOAWAY tests the in-flight watch requests will not be affected and new requests use a
// connection after client received GOAWAY, and server response watch request with GOAWAY will not break client
// watching body read.
func TestClientReceivedGOAWAY(t *testing.T) {
const (
urlNormal = "/normal"
urlWatch = "/watch"
urlGoaway = "/goaway"
urlWatchWithGoaway = "/watch-with-goaway"
)
const (
// indicate the bytes watch request will be sent
// used to check if watch request was broke by GOAWAY
watchExpectSendBytes = 5
)
watchHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
timer := time.NewTicker(time.Second)
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(200)
flusher, _ := w.(http.Flusher)
flusher.Flush()
count := 0
for {
select {
case <-timer.C:
n, err := w.Write([]byte("w"))
if err != nil {
return
}
flusher.Flush()
count += n
if count == watchExpectSendBytes {
return
}
}
}
})
mux := http.NewServeMux()
mux.Handle(urlNormal, WithProbabilisticGoaway(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("hello"))
return
}), 0))
mux.Handle(urlWatch, WithProbabilisticGoaway(watchHandler, 0))
mux.Handle(urlGoaway, WithProbabilisticGoaway(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("hello"))
return
}), 1))
mux.Handle(urlWatchWithGoaway, WithProbabilisticGoaway(watchHandler, 1))
s := httptest.NewUnstartedServer(mux)
http2Options := &http2.Server{}
if err := http2.ConfigureServer(s.Config, http2Options); err != nil {
t.Fatalf("failed to configure test server to be HTTP2 server, err: %v", err)
}
s.TLS = s.Config.TLSConfig
s.StartTLS()
defer s.Close()
tlsConfig := &tls.Config{
InsecureSkipVerify: true,
NextProtos: []string{http2.NextProtoTLS},
}
cases := []struct {
name string
reqs []string
// expectConnections always equals to GOAWAY requests(urlGoaway or urlWatchWithGoaway) + 1
expectConnections int
}{
{
name: "all normal requests use only one connection",
reqs: []string{urlNormal, urlNormal, urlNormal},
expectConnections: 1,
},
{
name: "got GOAWAY after set-up watch",
reqs: []string{urlNormal, urlWatch, urlGoaway, urlNormal, urlNormal},
expectConnections: 2,
},
{
name: "got GOAWAY after set-up watch, and set-up a new watch",
reqs: []string{urlNormal, urlWatch, urlGoaway, urlWatch, urlNormal, urlNormal},
expectConnections: 2,
},
{
name: "got 2 GOAWAY after set-up watch",
reqs: []string{urlNormal, urlWatch, urlGoaway, urlGoaway, urlNormal, urlNormal},
expectConnections: 3,
},
{
name: "combine with watch-with-goaway",
reqs: []string{urlNormal, urlWatchWithGoaway, urlNormal, urlWatch, urlGoaway, urlNormal, urlNormal},
expectConnections: 3,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
// localAddr indicates how many TCP connection set up
localAddr := make([]string, 0)
// init HTTP2 client
client := http.Client{
Transport: &http2.Transport{
TLSClientConfig: tlsConfig,
DialTLS: func(network, addr string, cfg *tls.Config) (conn net.Conn, err error) {
conn, err = tls.Dial(network, addr, cfg)
if err != nil {
t.Fatalf("unexpect connection err: %v", err)
}
localAddr = append(localAddr, conn.LocalAddr().String())
return
},
},
}
watchChs := make([]chan int, 0)
for _, url := range tc.reqs {
req, err := http.NewRequest(http.MethodGet, s.URL+url, nil)
if err != nil {
t.Fatalf("unexpect new request error: %v", err)
}
resp, err := client.Do(req)
if err != nil {
t.Fatalf("failed request test server, err: %v", err)
}
// encounter watch bytes received, does not expect to be broken
if url == urlWatch || url == urlWatchWithGoaway {
ch := make(chan int)
watchChs = append(watchChs, ch)
go func() {
count := 0
for {
buffer := make([]byte, 1)
n, err := resp.Body.Read(buffer)
if err != nil {
// urlWatch will receive io.EOF,
// urlWatchWithGoaway will receive http2.GoAwayError
if err != io.EOF {
if _, ok := err.(http2.GoAwayError); !ok {
t.Errorf("watch received not EOF err: %v", err)
}
}
ch <- count
return
}
count += n
}
}()
}
}
// check TCP connection count
if tc.expectConnections != len(localAddr) {
t.Fatalf("expect TCP connection: %d, actual: %d", tc.expectConnections, len(localAddr))
}
// check if watch request is broken by GOAWAY response
watchTimeout := time.NewTimer(time.Second * 10)
for _, watchCh := range watchChs {
select {
case n := <-watchCh:
if n != watchExpectSendBytes {
t.Fatalf("in-flight watch was broken by GOAWAY response, expect go bytes: %d, actual got: %d", watchExpectSendBytes, n)
}
case <-watchTimeout.C:
t.Error("watch receive timeout")
}
}
})
}
}
func TestHTTP1Requests(t *testing.T) {
s := httptest.NewUnstartedServer(WithProbabilisticGoaway(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("hello"))
return
}), 1))
http2Options := &http2.Server{}
if err := http2.ConfigureServer(s.Config, http2Options); err != nil {
t.Fatalf("failed to configure test server to be HTTP2 server, err: %v", err)
}
s.TLS = s.Config.TLSConfig
s.StartTLS()
defer s.Close()
tlsConfig := &tls.Config{
InsecureSkipVerify: true,
NextProtos: []string{"http/1.1"},
}
client := http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
}
resp, err := client.Get(s.URL)
if err != nil {
t.Fatalf("failed to request the server, err: %v", err)
}
if v := resp.Header.Get("Connection"); v != "" {
t.Errorf("expect response HTTP header Connection to be empty, but got: %s", v)
}
}

View File

@ -38,6 +38,7 @@ type ServerRunOptions struct {
MaxRequestsInFlight int
MaxMutatingRequestsInFlight int
RequestTimeout time.Duration
GoawayChance float64
LivezGracePeriod time.Duration
MinRequestTimeout int
ShutdownDelayDuration time.Duration
@ -76,6 +77,7 @@ func (s *ServerRunOptions) ApplyTo(c *server.Config) error {
c.MaxMutatingRequestsInFlight = s.MaxMutatingRequestsInFlight
c.LivezGracePeriod = s.LivezGracePeriod
c.RequestTimeout = s.RequestTimeout
c.GoawayChance = s.GoawayChance
c.MinRequestTimeout = s.MinRequestTimeout
c.ShutdownDelayDuration = s.ShutdownDelayDuration
c.JSONPatchMaxCopyBytes = s.JSONPatchMaxCopyBytes
@ -125,6 +127,10 @@ func (s *ServerRunOptions) Validate() []error {
errors = append(errors, fmt.Errorf("--request-timeout can not be negative value"))
}
if s.GoawayChance < 0 || s.GoawayChance > 0.02 {
errors = append(errors, fmt.Errorf("--goaway-chance can not be less than 0 or greater than 0.02"))
}
if s.MinRequestTimeout < 0 {
errors = append(errors, fmt.Errorf("--min-request-timeout can not be negative value"))
}
@ -182,6 +188,12 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) {
"it out. This is the default request timeout for requests but may be overridden by flags such as "+
"--min-request-timeout for specific types of requests.")
fs.Float64Var(&s.GoawayChance, "goaway-chance", s.GoawayChance, ""+
"To prevent HTTP/2 clients from getting stuck on a single apiserver, randomly close a connection (GOAWAY). "+
"The client's other in-flight requests won't be affected, and the client will reconnect, likely landing on a different apiserver after going through the load balancer again. "+
"This argument sets the fraction of requests that will be sent a GOAWAY. Clusters with single apiservers, or which don't use a load balancer, should NOT enable this. "+
"Min is 0 (off), Max is .02 (1/50 requests); .001 (1/1000) is a recommended starting point.")
fs.DurationVar(&s.LivezGracePeriod, "livez-grace-period", s.LivezGracePeriod, ""+
"This option represents the maximum amount of time it should take for apiserver to complete its startup sequence "+
"and become live. From apiserver's start time to when this amount of time has elapsed, /livez will assume "+