From 9269da53f3bf544595376f8ee4c06cc8d5455bf8 Mon Sep 17 00:00:00 2001 From: Andrew Lytvynov Date: Fri, 21 Aug 2020 14:41:31 -0700 Subject: [PATCH] spdy: add optional periodic Pings on the connection When an SPDY connection goes over an intermediate box (proxy or load balancer, e.g. AWS ELB), it can get interrupted due to idleness (default in ELB is 60s). For example, this happens with `kubectl exec` sessions are left open without any activity for a while. TCP-level keep-alives are not sufficient for all intermediate boxes, they may pay attention to application-layer traffic only. SPDY pings make the connection appear active, letting it survive a period of idleness. Note: this commit adds support for pings in `k8s.io/apimachinery/pkg/util/httpstream/spdy`, but doesn't enable it anywhere in the calling code. There is no behavior change for existing callers. --- .../pkg/util/httpstream/spdy/BUILD | 1 + .../pkg/util/httpstream/spdy/connection.go | 50 +++++++- .../util/httpstream/spdy/connection_test.go | 112 ++++++++++++++++++ .../pkg/util/httpstream/spdy/roundtripper.go | 51 ++++++-- .../pkg/util/httpstream/spdy/upgrade.go | 17 ++- 5 files changed, 218 insertions(+), 13 deletions(-) diff --git a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/BUILD b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/BUILD index 4b12052def1..fab9cc90f75 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/BUILD +++ b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/BUILD @@ -16,6 +16,7 @@ go_test( embed = [":go_default_library"], deps = [ "//staging/src/k8s.io/apimachinery/pkg/util/httpstream:go_default_library", + "//vendor/github.com/docker/spdystream:go_default_library", "//vendor/github.com/elazarl/goproxy:go_default_library", ], ) diff --git a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection.go b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection.go index 7a68812500f..336b4908b4c 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection.go @@ -34,38 +34,62 @@ type connection struct { streams []httpstream.Stream streamLock sync.Mutex newStreamHandler httpstream.NewStreamHandler + ping func() (time.Duration, error) } // NewClientConnection creates a new SPDY client connection. func NewClientConnection(conn net.Conn) (httpstream.Connection, error) { + return NewClientConnectionWithPings(conn, 0) +} + +// NewClientConnectionWithPings creates a new SPDY client connection. +// +// If pingPeriod is non-zero, a background goroutine will send periodic Ping +// frames to the server. Use this to keep idle connections through certain load +// balancers alive longer. +func NewClientConnectionWithPings(conn net.Conn, pingPeriod time.Duration) (httpstream.Connection, error) { spdyConn, err := spdystream.NewConnection(conn, false) if err != nil { defer conn.Close() return nil, err } - return newConnection(spdyConn, httpstream.NoOpNewStreamHandler), nil + return newConnection(spdyConn, httpstream.NoOpNewStreamHandler, pingPeriod, spdyConn.Ping), nil } // NewServerConnection creates a new SPDY server connection. newStreamHandler // will be invoked when the server receives a newly created stream from the // client. func NewServerConnection(conn net.Conn, newStreamHandler httpstream.NewStreamHandler) (httpstream.Connection, error) { + return NewServerConnectionWithPings(conn, newStreamHandler, 0) +} + +// NewServerConnectionWithPings creates a new SPDY server connection. +// newStreamHandler will be invoked when the server receives a newly created +// stream from the client. +// +// If pingPeriod is non-zero, a background goroutine will send periodic Ping +// frames to the server. Use this to keep idle connections through certain load +// balancers alive longer. +func NewServerConnectionWithPings(conn net.Conn, newStreamHandler httpstream.NewStreamHandler, pingPeriod time.Duration) (httpstream.Connection, error) { spdyConn, err := spdystream.NewConnection(conn, true) if err != nil { defer conn.Close() return nil, err } - return newConnection(spdyConn, newStreamHandler), nil + return newConnection(spdyConn, newStreamHandler, pingPeriod, spdyConn.Ping), nil } // newConnection returns a new connection wrapping conn. newStreamHandler // will be invoked when the server receives a newly created stream from the // client. -func newConnection(conn *spdystream.Connection, newStreamHandler httpstream.NewStreamHandler) httpstream.Connection { - c := &connection{conn: conn, newStreamHandler: newStreamHandler} +func newConnection(conn *spdystream.Connection, newStreamHandler httpstream.NewStreamHandler, pingPeriod time.Duration, pingFn func() (time.Duration, error)) httpstream.Connection { + c := &connection{conn: conn, newStreamHandler: newStreamHandler, ping: pingFn} go conn.Serve(c.newSpdyStream) + if pingPeriod > 0 && pingFn != nil { + go c.sendPings(pingPeriod) + } return c } @@ -143,3 +167,21 @@ func (c *connection) newSpdyStream(stream *spdystream.Stream) { func (c *connection) SetIdleTimeout(timeout time.Duration) { c.conn.SetIdleTimeout(timeout) } + +func (c *connection) sendPings(period time.Duration) { + t := time.NewTicker(period) + defer t.Stop() + for { + select { + case <-c.conn.CloseChan(): + return + case <-t.C: + } + if _, err := c.ping(); err != nil { + klog.V(3).Infof("SPDY Ping failed: %v", err) + // Continue, in case this is a transient failure. + // c.conn.CloseChan above will tell us when the connection is + // actually closed. + } + } +} diff --git a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection_test.go b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection_test.go index e00b29c461e..fa228d71fb5 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection_test.go @@ -17,13 +17,16 @@ limitations under the License. package spdy import ( + "fmt" "io" "net" "net/http" "sync" + "sync/atomic" "testing" "time" + "github.com/docker/spdystream" "k8s.io/apimachinery/pkg/util/httpstream" ) @@ -178,3 +181,112 @@ func TestConnectionCloseIsImmediateThroughAProxy(t *testing.T) { } } } + +func TestConnectionPings(t *testing.T) { + const pingPeriod = 10 * time.Millisecond + timeout := time.After(10 * time.Second) + + // Set up server connection. + listener, err := net.Listen("tcp4", "localhost:0") + if err != nil { + t.Fatal(err) + } + defer listener.Close() + + srvErr := make(chan error, 1) + go func() { + defer close(srvErr) + + srvConn, err := listener.Accept() + if err != nil { + srvErr <- fmt.Errorf("server: error accepting connection: %v", err) + return + } + defer srvConn.Close() + + spdyConn, err := spdystream.NewConnection(srvConn, true) + if err != nil { + srvErr <- fmt.Errorf("server: error creating spdy connection: %v", err) + return + } + + var pingsSent int64 + srvSPDYConn := newConnection( + spdyConn, + func(stream httpstream.Stream, replySent <-chan struct{}) error { + // Echo all the incoming data. + go io.Copy(stream, stream) + return nil + }, + pingPeriod, + func() (time.Duration, error) { + atomic.AddInt64(&pingsSent, 1) + return 0, nil + }) + defer srvSPDYConn.Close() + + // Wait for the connection to close, to prevent defers from running + // early. + select { + case <-timeout: + srvErr <- fmt.Errorf("server: timeout waiting for connection to close") + return + case <-srvSPDYConn.CloseChan(): + } + + // Count pings sent by the server. + gotPings := atomic.LoadInt64(&pingsSent) + if gotPings < 1 { + t.Errorf("server: failed to send any pings (check logs)") + } + }() + + // Set up client connection. + clConn, err := net.Dial("tcp4", listener.Addr().String()) + if err != nil { + t.Fatalf("client: error connecting to proxy: %v", err) + } + defer clConn.Close() + start := time.Now() + clSPDYConn, err := NewClientConnection(clConn) + if err != nil { + t.Fatalf("client: error creating spdy connection: %v", err) + } + defer clSPDYConn.Close() + clSPDYStream, err := clSPDYConn.CreateStream(http.Header{}) + if err != nil { + t.Fatalf("client: error creating stream: %v", err) + } + defer clSPDYStream.Close() + + // Send some data both ways, to make sure pings don't interfere with + // regular messages. + in := "foo" + if _, err := fmt.Fprintln(clSPDYStream, in); err != nil { + t.Fatalf("client: error writing data to stream: %v", err) + } + var out string + if _, err := fmt.Fscanln(clSPDYStream, &out); err != nil { + t.Fatalf("client: error reading data from stream: %v", err) + } + if in != out { + t.Errorf("client: received data doesn't match sent data: got %q, want %q", out, in) + } + + // Wait for at least 2 pings to get sent each way before closing the + // connection. + elapsed := time.Since(start) + if elapsed < 3*pingPeriod { + time.Sleep(3*pingPeriod - elapsed) + } + clSPDYConn.Close() + + select { + case err, ok := <-srvErr: + if ok && err != nil { + t.Error(err) + } + case <-timeout: + t.Errorf("timed out waiting for server to exit") + } +} diff --git a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/roundtripper.go b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/roundtripper.go index 6309fbc26bb..4cb1cfadcb0 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/roundtripper.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/roundtripper.go @@ -30,6 +30,7 @@ import ( "net/http/httputil" "net/url" "strings" + "time" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -70,6 +71,9 @@ type SpdyRoundTripper struct { // requireSameHostRedirects restricts redirect following to only follow redirects to the same host // as the original request. requireSameHostRedirects bool + // pingPeriod is a period for sending Ping frames over established + // connections. + pingPeriod time.Duration } var _ utilnet.TLSClientConfigHolder = &SpdyRoundTripper{} @@ -79,18 +83,51 @@ var _ utilnet.Dialer = &SpdyRoundTripper{} // NewRoundTripper creates a new SpdyRoundTripper that will use the specified // tlsConfig. func NewRoundTripper(tlsConfig *tls.Config, followRedirects, requireSameHostRedirects bool) *SpdyRoundTripper { - return NewRoundTripperWithProxy(tlsConfig, followRedirects, requireSameHostRedirects, utilnet.NewProxierWithNoProxyCIDR(http.ProxyFromEnvironment)) + return NewRoundTripperWithConfig(RoundTripperConfig{ + TLS: tlsConfig, + FollowRedirects: followRedirects, + RequireSameHostRedirects: requireSameHostRedirects, + }) } // NewRoundTripperWithProxy creates a new SpdyRoundTripper that will use the // specified tlsConfig and proxy func. func NewRoundTripperWithProxy(tlsConfig *tls.Config, followRedirects, requireSameHostRedirects bool, proxier func(*http.Request) (*url.URL, error)) *SpdyRoundTripper { - return &SpdyRoundTripper{ - tlsConfig: tlsConfig, - followRedirects: followRedirects, - requireSameHostRedirects: requireSameHostRedirects, - proxier: proxier, + return NewRoundTripperWithConfig(RoundTripperConfig{ + TLS: tlsConfig, + FollowRedirects: followRedirects, + RequireSameHostRedirects: requireSameHostRedirects, + Proxier: proxier, + }) +} + +// NewRoundTripperWithProxy creates a new SpdyRoundTripper with the specified +// configuration. +func NewRoundTripperWithConfig(cfg RoundTripperConfig) *SpdyRoundTripper { + if cfg.Proxier == nil { + cfg.Proxier = utilnet.NewProxierWithNoProxyCIDR(http.ProxyFromEnvironment) } + return &SpdyRoundTripper{ + tlsConfig: cfg.TLS, + followRedirects: cfg.FollowRedirects, + requireSameHostRedirects: cfg.RequireSameHostRedirects, + proxier: cfg.Proxier, + pingPeriod: cfg.PingPeriod, + } +} + +// RoundTripperConfig is a set of options for an SpdyRoundTripper. +type RoundTripperConfig struct { + // TLS configuration used by the round tripper. + TLS *tls.Config + // Proxier is a proxy function invoked on each request. Optional. + Proxier func(*http.Request) (*url.URL, error) + // PingPeriod is a period for sending SPDY Pings on the connection. + // Optional. + PingPeriod time.Duration + + FollowRedirects bool + RequireSameHostRedirects bool } // TLSClientConfig implements pkg/util/net.TLSClientConfigHolder for proper TLS checking during @@ -316,7 +353,7 @@ func (s *SpdyRoundTripper) NewConnection(resp *http.Response) (httpstream.Connec return nil, fmt.Errorf("unable to upgrade connection: %s", responseError) } - return NewClientConnection(s.conn) + return NewClientConnectionWithPings(s.conn, s.pingPeriod) } // statusScheme is private scheme for the decoding here until someone fixes the TODO in NewConnection diff --git a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go index 045d214d2b7..f17eb09e960 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go @@ -24,6 +24,7 @@ import ( "net/http" "strings" "sync/atomic" + "time" "k8s.io/apimachinery/pkg/util/httpstream" "k8s.io/apimachinery/pkg/util/runtime" @@ -34,6 +35,7 @@ const HeaderSpdy31 = "SPDY/3.1" // responseUpgrader knows how to upgrade HTTP responses. It // implements the httpstream.ResponseUpgrader interface. type responseUpgrader struct { + pingPeriod time.Duration } // connWrapper is used to wrap a hijacked connection and its bufio.Reader. All @@ -64,7 +66,18 @@ func (w *connWrapper) Close() error { // capable of upgrading HTTP responses using SPDY/3.1 via the // spdystream package. func NewResponseUpgrader() httpstream.ResponseUpgrader { - return responseUpgrader{} + return NewResponseUpgraderWithPings(0) +} + +// NewResponseUpgraderWithPings returns a new httpstream.ResponseUpgrader that +// is capable of upgrading HTTP responses using SPDY/3.1 via the spdystream +// package. +// +// If pingPeriod is non-zero, for each incoming connection a background +// goroutine will send periodic Ping frames to the server. Use this to keep +// idle connections through certain load balancers alive longer. +func NewResponseUpgraderWithPings(pingPeriod time.Duration) httpstream.ResponseUpgrader { + return responseUpgrader{pingPeriod: pingPeriod} } // UpgradeResponse upgrades an HTTP response to one that supports multiplexed @@ -97,7 +110,7 @@ func (u responseUpgrader) UpgradeResponse(w http.ResponseWriter, req *http.Reque } connWithBuf := &connWrapper{Conn: conn, bufReader: bufrw.Reader} - spdyConn, err := NewServerConnection(connWithBuf, newStreamHandler) + spdyConn, err := NewServerConnectionWithPings(connWithBuf, newStreamHandler, u.pingPeriod) if err != nil { runtime.HandleError(fmt.Errorf("unable to upgrade: error creating SPDY server connection: %v", err)) return nil