Merge pull request #117251 from linxiulei/wh

Make connection lost retryable in webhook
This commit is contained in:
Kubernetes Prow Robot 2023-05-22 09:34:58 -07:00 committed by GitHub
commit d9df6b0331
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 110 additions and 1 deletions

View File

@ -20,6 +20,7 @@ import (
"errors"
"net"
"reflect"
"strings"
"syscall"
)
@ -47,6 +48,11 @@ func IsConnectionReset(err error) bool {
return false
}
// Returns if the given err is "http2: client connection lost" error.
func IsHTTP2ConnectionLost(err error) bool {
return err != nil && strings.Contains(err.Error(), "http2: client connection lost")
}
// Returns if the given err is "connection refused" error
func IsConnectionRefused(err error) bool {
var errno syscall.Errno

View File

@ -17,11 +17,19 @@ limitations under the License.
package net
import (
"fmt"
"io"
"net"
"net/http"
"net/http/httptest"
"net/url"
"os"
"sync/atomic"
"syscall"
"testing"
"time"
"golang.org/x/net/http2"
netutils "k8s.io/utils/net"
)
@ -96,3 +104,98 @@ func TestIsConnectionRefused(t *testing.T) {
}
}
}
type tcpLB struct {
t *testing.T
ln net.Listener
serverURL string
dials int32
}
func (lb *tcpLB) handleConnection(in net.Conn, stopCh chan struct{}) {
out, err := net.Dial("tcp", lb.serverURL)
if err != nil {
lb.t.Log(err)
return
}
go io.Copy(out, in)
go io.Copy(in, out)
<-stopCh
if err := out.Close(); err != nil {
lb.t.Fatalf("failed to close connection: %v", err)
}
}
func (lb *tcpLB) serve(stopCh chan struct{}) {
conn, err := lb.ln.Accept()
if err != nil {
lb.t.Fatalf("failed to accept: %v", err)
}
atomic.AddInt32(&lb.dials, 1)
go lb.handleConnection(conn, stopCh)
}
func newLB(t *testing.T, serverURL string) *tcpLB {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("failed to bind: %v", err)
}
lb := tcpLB{
serverURL: serverURL,
ln: ln,
t: t,
}
return &lb
}
func TestIsConnectionReset(t *testing.T) {
ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Hello, %s", r.Proto)
}))
ts.EnableHTTP2 = true
ts.StartTLS()
defer ts.Close()
u, err := url.Parse(ts.URL)
if err != nil {
t.Fatalf("failed to parse URL from %q: %v", ts.URL, err)
}
lb := newLB(t, u.Host)
defer lb.ln.Close()
stopCh := make(chan struct{})
go lb.serve(stopCh)
c := ts.Client()
transport, ok := ts.Client().Transport.(*http.Transport)
if !ok {
t.Fatalf("failed to assert *http.Transport")
}
t2, err := http2.ConfigureTransports(transport)
if err != nil {
t.Fatalf("failed to configure *http.Transport: %+v", err)
}
t2.ReadIdleTimeout = time.Second
t2.PingTimeout = time.Second
// Create an HTTP2 connection to reuse later
resp, err := c.Get("https://" + lb.ln.Addr().String())
if err != nil {
t.Fatalf("unexpected error: %+v", err)
}
defer resp.Body.Close()
data, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("unexpected error: %+v", err)
}
if string(data) != "Hello, HTTP/2.0" {
t.Fatalf("unexpected response: %s", data)
}
// Deliberately let the LB stop proxying traffic for the current
// connection. This mimics a broken TCP connection that's not properly
// closed.
close(stopCh)
_, err = c.Get("https://" + lb.ln.Addr().String())
if !IsHTTP2ConnectionLost(err) {
t.Fatalf("expected HTTP2ConnectionLost error, got %v", err)
}
}

View File

@ -62,7 +62,7 @@ type GenericWebhook struct {
// Otherwise it returns false for an immediate fail.
func DefaultShouldRetry(err error) bool {
// these errors indicate a transient error that should be retried.
if utilnet.IsConnectionReset(err) || apierrors.IsInternalError(err) || apierrors.IsTimeout(err) || apierrors.IsTooManyRequests(err) {
if utilnet.IsConnectionReset(err) || utilnet.IsHTTP2ConnectionLost(err) || apierrors.IsInternalError(err) || apierrors.IsTimeout(err) || apierrors.IsTooManyRequests(err) {
return true
}
// if the error sends the Retry-After header, we respect it as an explicit confirmation we should retry.