diff --git a/staging/src/k8s.io/apimachinery/pkg/util/net/util.go b/staging/src/k8s.io/apimachinery/pkg/util/net/util.go index 1c2aba55f7b..1635e69a5c0 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/net/util.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/net/util.go @@ -20,6 +20,7 @@ import ( "errors" "net" "reflect" + "strings" "syscall" ) @@ -47,6 +48,11 @@ func IsConnectionReset(err error) bool { return false } +// Returns if the given err is "http2: client connection lost" error. +func IsHTTP2ConnectionLost(err error) bool { + return err != nil && strings.Contains(err.Error(), "http2: client connection lost") +} + // Returns if the given err is "connection refused" error func IsConnectionRefused(err error) bool { var errno syscall.Errno diff --git a/staging/src/k8s.io/apimachinery/pkg/util/net/util_test.go b/staging/src/k8s.io/apimachinery/pkg/util/net/util_test.go index 029f2f7116c..561fe4eebe7 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/net/util_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/net/util_test.go @@ -17,11 +17,19 @@ limitations under the License. package net import ( + "fmt" + "io" "net" + "net/http" + "net/http/httptest" "net/url" "os" + "sync/atomic" "syscall" "testing" + "time" + + "golang.org/x/net/http2" netutils "k8s.io/utils/net" ) @@ -96,3 +104,98 @@ func TestIsConnectionRefused(t *testing.T) { } } } + +type tcpLB struct { + t *testing.T + ln net.Listener + serverURL string + dials int32 +} + +func (lb *tcpLB) handleConnection(in net.Conn, stopCh chan struct{}) { + out, err := net.Dial("tcp", lb.serverURL) + if err != nil { + lb.t.Log(err) + return + } + go io.Copy(out, in) + go io.Copy(in, out) + <-stopCh + if err := out.Close(); err != nil { + lb.t.Fatalf("failed to close connection: %v", err) + } +} + +func (lb *tcpLB) serve(stopCh chan struct{}) { + conn, err := lb.ln.Accept() + if err != nil { + lb.t.Fatalf("failed to accept: %v", err) + } + atomic.AddInt32(&lb.dials, 1) + go lb.handleConnection(conn, stopCh) +} + +func newLB(t *testing.T, serverURL string) *tcpLB { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("failed to bind: %v", err) + } + lb := tcpLB{ + serverURL: serverURL, + ln: ln, + t: t, + } + return &lb +} + +func TestIsConnectionReset(t *testing.T) { + ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "Hello, %s", r.Proto) + })) + ts.EnableHTTP2 = true + ts.StartTLS() + defer ts.Close() + + u, err := url.Parse(ts.URL) + if err != nil { + t.Fatalf("failed to parse URL from %q: %v", ts.URL, err) + } + lb := newLB(t, u.Host) + defer lb.ln.Close() + stopCh := make(chan struct{}) + go lb.serve(stopCh) + + c := ts.Client() + transport, ok := ts.Client().Transport.(*http.Transport) + if !ok { + t.Fatalf("failed to assert *http.Transport") + } + t2, err := http2.ConfigureTransports(transport) + if err != nil { + t.Fatalf("failed to configure *http.Transport: %+v", err) + } + t2.ReadIdleTimeout = time.Second + t2.PingTimeout = time.Second + // Create an HTTP2 connection to reuse later + resp, err := c.Get("https://" + lb.ln.Addr().String()) + if err != nil { + t.Fatalf("unexpected error: %+v", err) + } + defer resp.Body.Close() + data, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("unexpected error: %+v", err) + } + if string(data) != "Hello, HTTP/2.0" { + t.Fatalf("unexpected response: %s", data) + } + + // Deliberately let the LB stop proxying traffic for the current + // connection. This mimics a broken TCP connection that's not properly + // closed. + close(stopCh) + _, err = c.Get("https://" + lb.ln.Addr().String()) + if !IsHTTP2ConnectionLost(err) { + t.Fatalf("expected HTTP2ConnectionLost error, got %v", err) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/webhook/webhook.go b/staging/src/k8s.io/apiserver/pkg/util/webhook/webhook.go index 45143bf6efb..b03640ae8df 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/webhook/webhook.go +++ b/staging/src/k8s.io/apiserver/pkg/util/webhook/webhook.go @@ -62,7 +62,7 @@ type GenericWebhook struct { // Otherwise it returns false for an immediate fail. func DefaultShouldRetry(err error) bool { // these errors indicate a transient error that should be retried. - if utilnet.IsConnectionReset(err) || apierrors.IsInternalError(err) || apierrors.IsTimeout(err) || apierrors.IsTooManyRequests(err) { + if utilnet.IsConnectionReset(err) || utilnet.IsHTTP2ConnectionLost(err) || apierrors.IsInternalError(err) || apierrors.IsTimeout(err) || apierrors.IsTooManyRequests(err) { return true } // if the error sends the Retry-After header, we respect it as an explicit confirmation we should retry.