diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 8fd5a423..62a19683 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -468,7 +468,7 @@ }, { "ImportPath": "k8s.io/apimachinery", - "Rev": "859536f6dc9b" + "Rev": "2456ebdaba22" }, { "ImportPath": "k8s.io/gengo", diff --git a/go.mod b/go.mod index 2f8f61cc..35a9aee7 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,7 @@ require ( golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e k8s.io/api v0.0.0-20201114085527-4a626d306b98 - k8s.io/apimachinery v0.0.0-20201114085355-859536f6dc9b + k8s.io/apimachinery v0.0.0-20201118005411-2456ebdaba22 k8s.io/klog/v2 v2.4.0 k8s.io/utils v0.0.0-20201110183641-67b214c5f920 sigs.k8s.io/yaml v1.2.0 @@ -35,5 +35,5 @@ require ( replace ( k8s.io/api => k8s.io/api v0.0.0-20201114085527-4a626d306b98 - k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20201114085355-859536f6dc9b + k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20201118005411-2456ebdaba22 ) diff --git a/go.sum b/go.sum index 71411b12..ebceea59 100644 --- a/go.sum +++ b/go.sum @@ -434,7 +434,7 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= k8s.io/api v0.0.0-20201114085527-4a626d306b98/go.mod h1:Vaqh9qFKpET0Mx+jNQHyAcNFyvwkGvuIKOt2htB36BQ= -k8s.io/apimachinery v0.0.0-20201114085355-859536f6dc9b/go.mod h1:WlLqWAHZGg07AeltaI0MV5uk1Omp8xaN0JGLY6gkRpU= +k8s.io/apimachinery v0.0.0-20201118005411-2456ebdaba22/go.mod h1:WlLqWAHZGg07AeltaI0MV5uk1Omp8xaN0JGLY6gkRpU= k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= k8s.io/klog/v2 v2.4.0 h1:7+X0fUguPyrKEC4WjH8iGDg3laWgMo5tMnRTIGTTxGQ= diff --git a/rest/connection_test.go b/rest/connection_test.go new file mode 100644 index 00000000..e58aff19 --- /dev/null +++ b/rest/connection_test.go @@ -0,0 +1,164 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rest + +import ( + "context" + "fmt" + "io" + "net" + "net/http" + "net/http/httptest" + "net/url" + "os" + "strconv" + "sync/atomic" + "testing" + "time" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + utilnet "k8s.io/apimachinery/pkg/util/net" +) + +type tcpLB struct { + t *testing.T + ln net.Listener + serverURL string + dials int32 +} + +func (lb *tcpLB) handleConnection(in net.Conn, stopCh chan struct{}) { + out, err := net.Dial("tcp", lb.serverURL) + if err != nil { + lb.t.Log(err) + return + } + go io.Copy(out, in) + go io.Copy(in, out) + <-stopCh + if err := out.Close(); err != nil { + lb.t.Fatalf("failed to close connection: %v", err) + } +} + +func (lb *tcpLB) serve(stopCh chan struct{}) { + conn, err := lb.ln.Accept() + if err != nil { + lb.t.Fatalf("failed to accept: %v", err) + } + atomic.AddInt32(&lb.dials, 1) + go lb.handleConnection(conn, stopCh) +} + +func newLB(t *testing.T, serverURL string) *tcpLB { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("failed to bind: %v", err) + } + lb := tcpLB{ + serverURL: serverURL, + ln: ln, + t: t, + } + return &lb +} + +func setEnv(key, value string) func() { + originalValue := os.Getenv(key) + os.Setenv(key, value) + return func() { + os.Setenv(key, originalValue) + } +} + +const ( + readIdleTimeout int = 1 + pingTimeout int = 1 +) + +func TestReconnectBrokenTCP(t *testing.T) { + defer setEnv("HTTP2_READ_IDLE_TIMEOUT_SECONDS", strconv.Itoa(readIdleTimeout))() + defer setEnv("HTTP2_PING_TIMEOUT_SECONDS", strconv.Itoa(pingTimeout))() + defer setEnv("DISABLE_HTTP2", "")() + ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "Hello, %s", r.Proto) + })) + ts.EnableHTTP2 = true + ts.StartTLS() + defer ts.Close() + + u, err := url.Parse(ts.URL) + if err != nil { + t.Fatalf("failed to parse URL from %q: %v", ts.URL, err) + } + lb := newLB(t, u.Host) + defer lb.ln.Close() + stopCh := make(chan struct{}) + go lb.serve(stopCh) + transport, ok := ts.Client().Transport.(*http.Transport) + if !ok { + t.Fatalf("failed to assert *http.Transport") + } + config := &Config{ + Host: "https://" + lb.ln.Addr().String(), + Transport: utilnet.SetTransportDefaults(transport), + Timeout: 1 * time.Second, + // These fields are required to create a REST client. + ContentConfig: ContentConfig{ + GroupVersion: &schema.GroupVersion{}, + NegotiatedSerializer: &serializer.CodecFactory{}, + }, + } + client, err := RESTClientFor(config) + if err != nil { + t.Fatalf("failed to create REST client: %v", err) + } + data, err := client.Get().AbsPath("/").DoRaw(context.TODO()) + if err != nil { + t.Fatalf("unexpected err: %s: %v", data, err) + } + if string(data) != "Hello, HTTP/2.0" { + t.Fatalf("unexpected response: %s", data) + } + + // Deliberately let the LB stop proxying traffic for the current + // connection. This mimics a broken TCP connection that's not properly + // closed. + close(stopCh) + + stopCh = make(chan struct{}) + go lb.serve(stopCh) + // Sleep enough time for the HTTP/2 health check to detect and close + // the broken TCP connection. + time.Sleep(time.Duration(1+readIdleTimeout+pingTimeout) * time.Second) + // If the HTTP/2 health check were disabled, the broken connection + // would still be in the connection pool, the following request would + // then reuse the broken connection instead of creating a new one, and + // thus would fail. + data, err = client.Get().AbsPath("/").DoRaw(context.TODO()) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if string(data) != "Hello, HTTP/2.0" { + t.Fatalf("unexpected response: %s", data) + } + dials := atomic.LoadInt32(&lb.dials) + if dials != 2 { + t.Fatalf("expected %d dials, got %d", 2, dials) + } +}