diff --git a/go.mod b/go.mod index 54991882..d1db9669 100644 --- a/go.mod +++ b/go.mod @@ -30,8 +30,8 @@ require ( golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac google.golang.org/protobuf v1.27.1 - k8s.io/api v0.0.0-20211111072600-2d1f8bde0d9a - k8s.io/apimachinery v0.0.0-20211111072039-19377c9f105d + k8s.io/api v0.0.0-20211115232129-3b96cd16e7e6 + k8s.io/apimachinery v0.0.0-20211116191949-10158cf6d3ff k8s.io/klog/v2 v2.30.0 k8s.io/kube-openapi v0.0.0-20211105084753-ee342a809c29 k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b @@ -40,6 +40,6 @@ require ( ) replace ( - k8s.io/api => k8s.io/api v0.0.0-20211111072600-2d1f8bde0d9a - k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20211111072039-19377c9f105d + k8s.io/api => k8s.io/api v0.0.0-20211115232129-3b96cd16e7e6 + k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20211116191949-10158cf6d3ff ) diff --git a/go.sum b/go.sum index eef4db33..e83eaf0f 100644 --- a/go.sum +++ b/go.sum @@ -600,10 +600,10 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= -k8s.io/api v0.0.0-20211111072600-2d1f8bde0d9a h1:MbPJiiG2XCMeh1IySIFlmF9Eu2JksfHO1MdOp6oYPYs= -k8s.io/api v0.0.0-20211111072600-2d1f8bde0d9a/go.mod h1:E/YSMnKP4hG73T35MZ3pw2P2Hy9cFo7KBeUkEPevRjk= -k8s.io/apimachinery v0.0.0-20211111072039-19377c9f105d h1:rhNn3FjXMzfNpZLAQSrkW0YQoBEKl+d69o5F2Qw6ONs= -k8s.io/apimachinery v0.0.0-20211111072039-19377c9f105d/go.mod h1:/fTTuFZJpMy6M4dc6F6QbWWj88D/Yd/ZdqJMvTIcbkE= +k8s.io/api v0.0.0-20211115232129-3b96cd16e7e6 h1:/Rn6GeqmvNXzLe9ucZcuEsmdfh1n//ewpE6URUfcFaw= +k8s.io/api v0.0.0-20211115232129-3b96cd16e7e6/go.mod h1:E/YSMnKP4hG73T35MZ3pw2P2Hy9cFo7KBeUkEPevRjk= +k8s.io/apimachinery v0.0.0-20211116191949-10158cf6d3ff h1:48ZHXvUs3bTxiEDlE97wNDgMMCRLmVdqV+/W2d7Hvpg= +k8s.io/apimachinery v0.0.0-20211116191949-10158cf6d3ff/go.mod h1:/fTTuFZJpMy6M4dc6F6QbWWj88D/Yd/ZdqJMvTIcbkE= k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= diff --git a/rest/connection_test.go b/rest/connection_test.go index 70fd2aa1..89d1b351 100644 --- a/rest/connection_test.go +++ b/rest/connection_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" utilnet "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apimachinery/pkg/util/wait" ) type tcpLB struct { @@ -164,6 +165,189 @@ func TestReconnectBrokenTCP(t *testing.T) { } } +// 1. connect to https server with http1.1 using a TCP proxy +// 2. the connection has keepalive enabled so it will be reused +// 3. break the TCP connection stopping the proxy +// 4. close the idle connection to force creating a new connection +// 5. count that there are 2 connection to the server (we didn't reuse the original connection) +func TestReconnectBrokenTCP_HTTP1(t *testing.T) { + ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "Hello, %s", r.Proto) + })) + ts.EnableHTTP2 = false + ts.StartTLS() + defer ts.Close() + + u, err := url.Parse(ts.URL) + if err != nil { + t.Fatalf("failed to parse URL from %q: %v", ts.URL, err) + } + lb := newLB(t, u.Host) + defer lb.ln.Close() + stopCh := make(chan struct{}) + go lb.serve(stopCh) + transport, ok := ts.Client().Transport.(*http.Transport) + if !ok { + t.Fatal("failed to assert *http.Transport") + } + config := &Config{ + Host: "https://" + lb.ln.Addr().String(), + Transport: utilnet.SetTransportDefaults(transport), + // large timeout, otherwise the broken connection will be cleaned by it + Timeout: wait.ForeverTestTimeout, + // These fields are required to create a REST client. + ContentConfig: ContentConfig{ + GroupVersion: &schema.GroupVersion{}, + NegotiatedSerializer: &serializer.CodecFactory{}, + }, + } + config.TLSClientConfig.NextProtos = []string{"http/1.1"} + client, err := RESTClientFor(config) + if err != nil { + t.Fatalf("failed to create REST client: %v", err) + } + + data, err := client.Get().AbsPath("/").DoRaw(context.TODO()) + if err != nil { + t.Fatalf("unexpected err: %s: %v", data, err) + } + if string(data) != "Hello, HTTP/1.1" { + t.Fatalf("unexpected response: %s", data) + } + + // Deliberately let the LB stop proxying traffic for the current + // connection. This mimics a broken TCP connection that's not properly + // closed. + close(stopCh) + + stopCh = make(chan struct{}) + go lb.serve(stopCh) + // Close the idle connections + utilnet.CloseIdleConnectionsFor(client.Client.Transport) + + // If the client didn't close the idle connections, the broken connection + // would still be in the connection pool, the following request would + // then reuse the broken connection instead of creating a new one, and + // thus would fail. + data, err = client.Get().AbsPath("/").DoRaw(context.TODO()) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if string(data) != "Hello, HTTP/1.1" { + t.Fatalf("unexpected response: %s", data) + } + dials := atomic.LoadInt32(&lb.dials) + if dials != 2 { + t.Fatalf("expected %d dials, got %d", 2, dials) + } +} + +// 1. connect to https server with http1.1 using a TCP proxy making the connection to timeout +// 2. the connection has keepalive enabled so it will be reused +// 3. close the in-flight connection to force creating a new connection +// 4. count that there are 2 connection on the LB but only one succeeds +func TestReconnectBrokenTCPInFlight_HTTP1(t *testing.T) { + done := make(chan struct{}) + defer close(done) + received := make(chan struct{}) + + ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/hang" { + conn, _, _ := w.(http.Hijacker).Hijack() + close(received) + <-done + conn.Close() + } + fmt.Fprintf(w, "Hello, %s", r.Proto) + })) + ts.EnableHTTP2 = false + ts.StartTLS() + defer ts.Close() + + u, err := url.Parse(ts.URL) + if err != nil { + t.Fatalf("failed to parse URL from %q: %v", ts.URL, err) + } + + lb := newLB(t, u.Host) + defer lb.ln.Close() + stopCh := make(chan struct{}) + go lb.serve(stopCh) + + transport, ok := ts.Client().Transport.(*http.Transport) + if !ok { + t.Fatal("failed to assert *http.Transport") + } + config := &Config{ + Host: "https://" + lb.ln.Addr().String(), + Transport: utilnet.SetTransportDefaults(transport), + // Use something extraordinary large to not hit the timeout + Timeout: wait.ForeverTestTimeout, + // These fields are required to create a REST client. + ContentConfig: ContentConfig{ + GroupVersion: &schema.GroupVersion{}, + NegotiatedSerializer: &serializer.CodecFactory{}, + }, + } + config.TLSClientConfig.NextProtos = []string{"http/1.1"} + + client, err := RESTClientFor(config) + if err != nil { + t.Fatalf("failed to create REST client: %v", err) + } + + // The request will connect, hang and eventually time out + // but we can use a context to close once the test is done + // we are only interested in have an inflight connection + ctx, cancel := context.WithCancel(context.Background()) + reqErrCh := make(chan error, 1) + defer close(reqErrCh) + go func() { + _, err = client.Get().AbsPath("/hang").DoRaw(ctx) + reqErrCh <- err + }() + + // wait until it connect to the server + select { + case <-received: + case <-time.After(wait.ForeverTestTimeout): + t.Fatal("Test timed out waiting for first request to fail") + } + + // Deliberately let the LB stop proxying traffic for the current + // connection. This mimics a broken TCP connection that's not properly + // closed. + close(stopCh) + + stopCh = make(chan struct{}) + go lb.serve(stopCh) + + // New request will fail if tries to reuse the connection + data, err := client.Get().AbsPath("/").DoRaw(context.Background()) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if string(data) != "Hello, HTTP/1.1" { + t.Fatalf("unexpected response: %s", data) + } + dials := atomic.LoadInt32(&lb.dials) + if dials != 2 { + t.Fatalf("expected %d dials, got %d", 2, dials) + } + + // cancel the in-flight connection + cancel() + select { + case <-reqErrCh: + if err == nil { + t.Fatal("Connection succeeded but was expected to timeout") + } + case <-time.After(10 * time.Second): + t.Fatal("Test timed out waiting for the request to fail") + } + +} + func TestRestClientTimeout(t *testing.T) { ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { time.Sleep(2 * time.Second) diff --git a/rest/request_test.go b/rest/request_test.go index ab2df1b5..7b1d83ad 100644 --- a/rest/request_test.go +++ b/rest/request_test.go @@ -44,10 +44,12 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/runtime/serializer/streaming" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/httpstream" "k8s.io/apimachinery/pkg/util/intstr" + utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes/scheme" restclientwatch "k8s.io/client-go/rest/watch" @@ -2273,15 +2275,17 @@ func TestStream(t *testing.T) { func testRESTClientWithConfig(t testing.TB, srv *httptest.Server, contentConfig ClientContentConfig) *RESTClient { base, _ := url.Parse("http://localhost") + var c *http.Client if srv != nil { var err error base, err = url.Parse(srv.URL) if err != nil { t.Fatalf("failed to parse test URL: %v", err) } + c = srv.Client() } versionedAPIPath := defaultResourcePathWithPrefix("", "", "", "") - client, err := NewRESTClient(base, versionedAPIPath, contentConfig, nil, nil) + client, err := NewRESTClient(base, versionedAPIPath, contentConfig, nil, c) if err != nil { t.Fatalf("failed to create a client: %v", err) } @@ -2944,3 +2948,183 @@ func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Cont }) } } + +func TestReuseRequest(t *testing.T) { + var tests = []struct { + name string + enableHTTP2 bool + }{ + {"HTTP1", false}, + {"HTTP2", true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(r.RemoteAddr)) + })) + ts.EnableHTTP2 = tt.enableHTTP2 + ts.StartTLS() + defer ts.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := testRESTClient(t, ts) + + req1, err := c.Verb("GET"). + Prefix("foo"). + DoRaw(ctx) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + req2, err := c.Verb("GET"). + Prefix("foo"). + DoRaw(ctx) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if string(req1) != string(req2) { + t.Fatalf("Expected %v to be equal to %v", string(req1), string(req2)) + } + + }) + } +} + +func TestHTTP1DoNotReuseRequestAfterTimeout(t *testing.T) { + var tests = []struct { + name string + enableHTTP2 bool + }{ + {"HTTP1", false}, + {"HTTP2", true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + done := make(chan struct{}) + ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Logf("TEST Connected from %v on %v\n", r.RemoteAddr, r.URL.Path) + if r.URL.Path == "/hang" { + t.Logf("TEST hanging %v\n", r.RemoteAddr) + <-done + } + w.Write([]byte(r.RemoteAddr)) + })) + ts.EnableHTTP2 = tt.enableHTTP2 + ts.StartTLS() + defer ts.Close() + // close hanging connection before shutting down the http server + defer close(done) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + transport, ok := ts.Client().Transport.(*http.Transport) + if !ok { + t.Fatalf("failed to assert *http.Transport") + } + + config := &Config{ + Host: ts.URL, + Transport: utilnet.SetTransportDefaults(transport), + Timeout: 100 * time.Millisecond, + // These fields are required to create a REST client. + ContentConfig: ContentConfig{ + GroupVersion: &schema.GroupVersion{}, + NegotiatedSerializer: &serializer.CodecFactory{}, + }, + } + if !tt.enableHTTP2 { + config.TLSClientConfig.NextProtos = []string{"http/1.1"} + } + c, err := RESTClientFor(config) + if err != nil { + t.Fatalf("failed to create REST client: %v", err) + } + req1, err := c.Verb("GET"). + Prefix("foo"). + DoRaw(ctx) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + _, err = c.Verb("GET"). + Prefix("/hang"). + DoRaw(ctx) + if err == nil { + t.Fatalf("Expected error") + } + + req2, err := c.Verb("GET"). + Prefix("foo"). + DoRaw(ctx) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // http1 doesn't reuse the connection after it times + if tt.enableHTTP2 != (string(req1) == string(req2)) { + if tt.enableHTTP2 { + t.Fatalf("Expected %v to be the same as %v", string(req1), string(req2)) + } else { + t.Fatalf("Expected %v to be different to %v", string(req1), string(req2)) + } + } + }) + } +} + +func TestTransportConcurrency(t *testing.T) { + const numReqs = 10 + var tests = []struct { + name string + enableHTTP2 bool + }{ + {"HTTP1", false}, + {"HTTP2", true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Logf("Connected from %v %v", r.RemoteAddr, r.URL) + fmt.Fprintf(w, "%v", r.FormValue("echo")) + })) + ts.EnableHTTP2 = tt.enableHTTP2 + ts.StartTLS() + defer ts.Close() + var wg sync.WaitGroup + + wg.Add(numReqs) + c := testRESTClient(t, ts) + reqs := make(chan string) + defer close(reqs) + + for i := 0; i < 4; i++ { + go func() { + for req := range reqs { + res, err := c.Get().Param("echo", req).DoRaw(context.Background()) + if err != nil { + t.Errorf("error on req %s: %v", req, err) + wg.Done() + continue + } + + if string(res) != req { + t.Errorf("body of req %s = %q; want %q", req, res, req) + } + + wg.Done() + } + }() + } + for i := 0; i < numReqs; i++ { + reqs <- fmt.Sprintf("request-%d", i) + } + wg.Wait() + }) + } +}