CloseIdleConnections for wrapped Transport

It iterates over the wrapped transports until it finds one
that implements the CloseIdleConnections method and executes it.

add test for closeidle http1 connections

add test for http1.1 reconnect with inflight request

add test to reuse connection request

add test for request connect after timeout

add test for client-go request concurrency

Kubernetes-commit: b9d865a8185b62d83e9ff81b0e3499a26ac6960d
This commit is contained in:
Antonio Ojea
2021-09-10 15:27:23 +02:00
committed by Kubernetes Publisher
parent 2f764f976b
commit f34af1a39f
2 changed files with 369 additions and 1 deletions

View File

@@ -44,10 +44,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/intstr"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/scheme"
restclientwatch "k8s.io/client-go/rest/watch"
@@ -2273,15 +2275,17 @@ func TestStream(t *testing.T) {
func testRESTClientWithConfig(t testing.TB, srv *httptest.Server, contentConfig ClientContentConfig) *RESTClient {
base, _ := url.Parse("http://localhost")
var c *http.Client
if srv != nil {
var err error
base, err = url.Parse(srv.URL)
if err != nil {
t.Fatalf("failed to parse test URL: %v", err)
}
c = srv.Client()
}
versionedAPIPath := defaultResourcePathWithPrefix("", "", "", "")
client, err := NewRESTClient(base, versionedAPIPath, contentConfig, nil, nil)
client, err := NewRESTClient(base, versionedAPIPath, contentConfig, nil, c)
if err != nil {
t.Fatalf("failed to create a client: %v", err)
}
@@ -2944,3 +2948,183 @@ func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Cont
})
}
}
func TestReuseRequest(t *testing.T) {
var tests = []struct {
name string
enableHTTP2 bool
}{
{"HTTP1", false},
{"HTTP2", true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(r.RemoteAddr))
}))
ts.EnableHTTP2 = tt.enableHTTP2
ts.StartTLS()
defer ts.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := testRESTClient(t, ts)
req1, err := c.Verb("GET").
Prefix("foo").
DoRaw(ctx)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
req2, err := c.Verb("GET").
Prefix("foo").
DoRaw(ctx)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if string(req1) != string(req2) {
t.Fatalf("Expected %v to be equal to %v", string(req1), string(req2))
}
})
}
}
func TestHTTP1DoNotReuseRequestAfterTimeout(t *testing.T) {
var tests = []struct {
name string
enableHTTP2 bool
}{
{"HTTP1", false},
{"HTTP2", true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
done := make(chan struct{})
ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
t.Logf("TEST Connected from %v on %v\n", r.RemoteAddr, r.URL.Path)
if r.URL.Path == "/hang" {
t.Logf("TEST hanging %v\n", r.RemoteAddr)
<-done
}
w.Write([]byte(r.RemoteAddr))
}))
ts.EnableHTTP2 = tt.enableHTTP2
ts.StartTLS()
defer ts.Close()
// close hanging connection before shutting down the http server
defer close(done)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
transport, ok := ts.Client().Transport.(*http.Transport)
if !ok {
t.Fatalf("failed to assert *http.Transport")
}
config := &Config{
Host: ts.URL,
Transport: utilnet.SetTransportDefaults(transport),
Timeout: 100 * time.Millisecond,
// These fields are required to create a REST client.
ContentConfig: ContentConfig{
GroupVersion: &schema.GroupVersion{},
NegotiatedSerializer: &serializer.CodecFactory{},
},
}
if !tt.enableHTTP2 {
config.TLSClientConfig.NextProtos = []string{"http/1.1"}
}
c, err := RESTClientFor(config)
if err != nil {
t.Fatalf("failed to create REST client: %v", err)
}
req1, err := c.Verb("GET").
Prefix("foo").
DoRaw(ctx)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
_, err = c.Verb("GET").
Prefix("/hang").
DoRaw(ctx)
if err == nil {
t.Fatalf("Expected error")
}
req2, err := c.Verb("GET").
Prefix("foo").
DoRaw(ctx)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// http1 doesn't reuse the connection after it times
if tt.enableHTTP2 != (string(req1) == string(req2)) {
if tt.enableHTTP2 {
t.Fatalf("Expected %v to be the same as %v", string(req1), string(req2))
} else {
t.Fatalf("Expected %v to be different to %v", string(req1), string(req2))
}
}
})
}
}
func TestTransportConcurrency(t *testing.T) {
const numReqs = 10
var tests = []struct {
name string
enableHTTP2 bool
}{
{"HTTP1", false},
{"HTTP2", true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
t.Logf("Connected from %v %v", r.RemoteAddr, r.URL)
fmt.Fprintf(w, "%v", r.FormValue("echo"))
}))
ts.EnableHTTP2 = tt.enableHTTP2
ts.StartTLS()
defer ts.Close()
var wg sync.WaitGroup
wg.Add(numReqs)
c := testRESTClient(t, ts)
reqs := make(chan string)
defer close(reqs)
for i := 0; i < 4; i++ {
go func() {
for req := range reqs {
res, err := c.Get().Param("echo", req).DoRaw(context.Background())
if err != nil {
t.Errorf("error on req %s: %v", req, err)
wg.Done()
continue
}
if string(res) != req {
t.Errorf("body of req %s = %q; want %q", req, res, req)
}
wg.Done()
}
}()
}
for i := 0; i < numReqs; i++ {
reqs <- fmt.Sprintf("request-%d", i)
}
wg.Wait()
})
}
}