diff --git a/staging/src/k8s.io/apimachinery/pkg/util/net/http.go b/staging/src/k8s.io/apimachinery/pkg/util/net/http.go index 945886c4380..ba63d02df69 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/net/http.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/net/http.go @@ -33,6 +33,7 @@ import ( "regexp" "strconv" "strings" + "time" "unicode" "unicode/utf8" @@ -132,13 +133,61 @@ func SetTransportDefaults(t *http.Transport) *http.Transport { if s := os.Getenv("DISABLE_HTTP2"); len(s) > 0 { klog.Infof("HTTP2 has been explicitly disabled") } else if allowsHTTP2(t) { - if err := http2.ConfigureTransport(t); err != nil { + if err := configureHTTP2Transport(t); err != nil { klog.Warningf("Transport failed http2 configuration: %v", err) } } return t } +func readIdleTimeoutSeconds() int { + ret := 30 + // User can set the readIdleTimeout to 0 to disable the HTTP/2 + // connection health check. + if s := os.Getenv("HTTP2_READ_IDLE_TIMEOUT_SECONDS"); len(s) > 0 { + i, err := strconv.Atoi(s) + if err != nil { + klog.Warningf("Illegal HTTP2_READ_IDLE_TIMEOUT_SECONDS(%q): %v."+ + " Default value %d is used", s, err, ret) + return ret + } + ret = i + } + return ret +} + +func pingTimeoutSeconds() int { + ret := 15 + if s := os.Getenv("HTTP2_PING_TIMEOUT_SECONDS"); len(s) > 0 { + i, err := strconv.Atoi(s) + if err != nil { + klog.Warningf("Illegal HTTP2_PING_TIMEOUT_SECONDS(%q): %v."+ + " Default value %d is used", s, err, ret) + return ret + } + ret = i + } + return ret +} + +func configureHTTP2Transport(t *http.Transport) error { + t2, err := http2.ConfigureTransports(t) + if err != nil { + return err + } + // The following enables the HTTP/2 connection health check added in + // https://github.com/golang/net/pull/55. The health check detects and + // closes broken transport layer connections. Without the health check, + // a broken connection can linger too long, e.g., a broken TCP + // connection will be closed by the Linux kernel after 13 to 30 minutes + // by default, which caused + // https://github.com/kubernetes/client-go/issues/374 and + // https://github.com/kubernetes/kubernetes/issues/87615. + t2.ReadIdleTimeout = time.Duration(readIdleTimeoutSeconds()) * time.Second + t2.PingTimeout = time.Duration(pingTimeoutSeconds()) * time.Second + return nil +} + func allowsHTTP2(t *http.Transport) bool { if t.TLSClientConfig == nil || len(t.TLSClientConfig.NextProtos) == 0 { // the transport expressed no NextProto preference, allow diff --git a/staging/src/k8s.io/apimachinery/pkg/util/net/http_test.go b/staging/src/k8s.io/apimachinery/pkg/util/net/http_test.go index a13cd96a425..a43161b88b7 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/net/http_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/net/http_test.go @@ -1071,3 +1071,39 @@ func TestIsProbableEOF(t *testing.T) { }) } } + +func setEnv(key, value string) func() { + originalValue := os.Getenv(key) + os.Setenv(key, value) + return func() { + os.Setenv(key, originalValue) + } +} + +func TestReadIdleTimeoutSeconds(t *testing.T) { + reset := setEnv("HTTP2_READ_IDLE_TIMEOUT_SECONDS", "60") + if e, a := 60, readIdleTimeoutSeconds(); e != a { + t.Errorf("expected %d, got %d", e, a) + } + reset() + + reset = setEnv("HTTP2_READ_IDLE_TIMEOUT_SECONDS", "illegal value") + if e, a := 30, readIdleTimeoutSeconds(); e != a { + t.Errorf("expected %d, got %d", e, a) + } + reset() +} + +func TestPingTimeoutSeconds(t *testing.T) { + reset := setEnv("HTTP2_PING_TIMEOUT_SECONDS", "60") + if e, a := 60, pingTimeoutSeconds(); e != a { + t.Errorf("expected %d, got %d", e, a) + } + reset() + + reset = setEnv("HTTP2_PING_TIMEOUT_SECONDS", "illegal value") + if e, a := 15, pingTimeoutSeconds(); e != a { + t.Errorf("expected %d, got %d", e, a) + } + reset() +} diff --git a/staging/src/k8s.io/client-go/rest/BUILD b/staging/src/k8s.io/client-go/rest/BUILD index e2607d93b13..d3fa2648907 100644 --- a/staging/src/k8s.io/client-go/rest/BUILD +++ b/staging/src/k8s.io/client-go/rest/BUILD @@ -11,6 +11,7 @@ go_test( srcs = [ "client_test.go", "config_test.go", + "connection_test.go", "exec_test.go", "plugin_test.go", "request_test.go", @@ -27,12 +28,14 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/streaming:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/httpstream:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", "//staging/src/k8s.io/client-go/pkg/apis/clientauthentication:go_default_library", diff --git a/staging/src/k8s.io/client-go/rest/connection_test.go b/staging/src/k8s.io/client-go/rest/connection_test.go new file mode 100644 index 00000000000..e58aff194a4 --- /dev/null +++ b/staging/src/k8s.io/client-go/rest/connection_test.go @@ -0,0 +1,164 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rest + +import ( + "context" + "fmt" + "io" + "net" + "net/http" + "net/http/httptest" + "net/url" + "os" + "strconv" + "sync/atomic" + "testing" + "time" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + utilnet "k8s.io/apimachinery/pkg/util/net" +) + +type tcpLB struct { + t *testing.T + ln net.Listener + serverURL string + dials int32 +} + +func (lb *tcpLB) handleConnection(in net.Conn, stopCh chan struct{}) { + out, err := net.Dial("tcp", lb.serverURL) + if err != nil { + lb.t.Log(err) + return + } + go io.Copy(out, in) + go io.Copy(in, out) + <-stopCh + if err := out.Close(); err != nil { + lb.t.Fatalf("failed to close connection: %v", err) + } +} + +func (lb *tcpLB) serve(stopCh chan struct{}) { + conn, err := lb.ln.Accept() + if err != nil { + lb.t.Fatalf("failed to accept: %v", err) + } + atomic.AddInt32(&lb.dials, 1) + go lb.handleConnection(conn, stopCh) +} + +func newLB(t *testing.T, serverURL string) *tcpLB { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("failed to bind: %v", err) + } + lb := tcpLB{ + serverURL: serverURL, + ln: ln, + t: t, + } + return &lb +} + +func setEnv(key, value string) func() { + originalValue := os.Getenv(key) + os.Setenv(key, value) + return func() { + os.Setenv(key, originalValue) + } +} + +const ( + readIdleTimeout int = 1 + pingTimeout int = 1 +) + +func TestReconnectBrokenTCP(t *testing.T) { + defer setEnv("HTTP2_READ_IDLE_TIMEOUT_SECONDS", strconv.Itoa(readIdleTimeout))() + defer setEnv("HTTP2_PING_TIMEOUT_SECONDS", strconv.Itoa(pingTimeout))() + defer setEnv("DISABLE_HTTP2", "")() + ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "Hello, %s", r.Proto) + })) + ts.EnableHTTP2 = true + ts.StartTLS() + defer ts.Close() + + u, err := url.Parse(ts.URL) + if err != nil { + t.Fatalf("failed to parse URL from %q: %v", ts.URL, err) + } + lb := newLB(t, u.Host) + defer lb.ln.Close() + stopCh := make(chan struct{}) + go lb.serve(stopCh) + transport, ok := ts.Client().Transport.(*http.Transport) + if !ok { + t.Fatalf("failed to assert *http.Transport") + } + config := &Config{ + Host: "https://" + lb.ln.Addr().String(), + Transport: utilnet.SetTransportDefaults(transport), + Timeout: 1 * time.Second, + // These fields are required to create a REST client. + ContentConfig: ContentConfig{ + GroupVersion: &schema.GroupVersion{}, + NegotiatedSerializer: &serializer.CodecFactory{}, + }, + } + client, err := RESTClientFor(config) + if err != nil { + t.Fatalf("failed to create REST client: %v", err) + } + data, err := client.Get().AbsPath("/").DoRaw(context.TODO()) + if err != nil { + t.Fatalf("unexpected err: %s: %v", data, err) + } + if string(data) != "Hello, HTTP/2.0" { + t.Fatalf("unexpected response: %s", data) + } + + // Deliberately let the LB stop proxying traffic for the current + // connection. This mimics a broken TCP connection that's not properly + // closed. + close(stopCh) + + stopCh = make(chan struct{}) + go lb.serve(stopCh) + // Sleep enough time for the HTTP/2 health check to detect and close + // the broken TCP connection. + time.Sleep(time.Duration(1+readIdleTimeout+pingTimeout) * time.Second) + // If the HTTP/2 health check were disabled, the broken connection + // would still be in the connection pool, the following request would + // then reuse the broken connection instead of creating a new one, and + // thus would fail. + data, err = client.Get().AbsPath("/").DoRaw(context.TODO()) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if string(data) != "Hello, HTTP/2.0" { + t.Fatalf("unexpected response: %s", data) + } + dials := atomic.LoadInt32(&lb.dials) + if dials != 2 { + t.Fatalf("expected %d dials, got %d", 2, dials) + } +}