diff --git a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/BUILD b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/BUILD index 4b12052def1..fab9cc90f75 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/BUILD +++ b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/BUILD @@ -16,6 +16,7 @@ go_test( embed = [":go_default_library"], deps = [ "//staging/src/k8s.io/apimachinery/pkg/util/httpstream:go_default_library", + "//vendor/github.com/docker/spdystream:go_default_library", "//vendor/github.com/elazarl/goproxy:go_default_library", ], ) diff --git a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection.go b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection.go index 7a68812500f..336b4908b4c 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection.go @@ -34,38 +34,62 @@ type connection struct { streams []httpstream.Stream streamLock sync.Mutex newStreamHandler httpstream.NewStreamHandler + ping func() (time.Duration, error) } // NewClientConnection creates a new SPDY client connection. func NewClientConnection(conn net.Conn) (httpstream.Connection, error) { + return NewClientConnectionWithPings(conn, 0) +} + +// NewClientConnectionWithPings creates a new SPDY client connection. +// +// If pingPeriod is non-zero, a background goroutine will send periodic Ping +// frames to the server. Use this to keep idle connections through certain load +// balancers alive longer. +func NewClientConnectionWithPings(conn net.Conn, pingPeriod time.Duration) (httpstream.Connection, error) { spdyConn, err := spdystream.NewConnection(conn, false) if err != nil { defer conn.Close() return nil, err } - return newConnection(spdyConn, httpstream.NoOpNewStreamHandler), nil + return newConnection(spdyConn, httpstream.NoOpNewStreamHandler, pingPeriod, spdyConn.Ping), nil } // NewServerConnection creates a new SPDY server connection. newStreamHandler // will be invoked when the server receives a newly created stream from the // client. func NewServerConnection(conn net.Conn, newStreamHandler httpstream.NewStreamHandler) (httpstream.Connection, error) { + return NewServerConnectionWithPings(conn, newStreamHandler, 0) +} + +// NewServerConnectionWithPings creates a new SPDY server connection. +// newStreamHandler will be invoked when the server receives a newly created +// stream from the client. +// +// If pingPeriod is non-zero, a background goroutine will send periodic Ping +// frames to the server. Use this to keep idle connections through certain load +// balancers alive longer. +func NewServerConnectionWithPings(conn net.Conn, newStreamHandler httpstream.NewStreamHandler, pingPeriod time.Duration) (httpstream.Connection, error) { spdyConn, err := spdystream.NewConnection(conn, true) if err != nil { defer conn.Close() return nil, err } - return newConnection(spdyConn, newStreamHandler), nil + return newConnection(spdyConn, newStreamHandler, pingPeriod, spdyConn.Ping), nil } // newConnection returns a new connection wrapping conn. newStreamHandler // will be invoked when the server receives a newly created stream from the // client. -func newConnection(conn *spdystream.Connection, newStreamHandler httpstream.NewStreamHandler) httpstream.Connection { - c := &connection{conn: conn, newStreamHandler: newStreamHandler} +func newConnection(conn *spdystream.Connection, newStreamHandler httpstream.NewStreamHandler, pingPeriod time.Duration, pingFn func() (time.Duration, error)) httpstream.Connection { + c := &connection{conn: conn, newStreamHandler: newStreamHandler, ping: pingFn} go conn.Serve(c.newSpdyStream) + if pingPeriod > 0 && pingFn != nil { + go c.sendPings(pingPeriod) + } return c } @@ -143,3 +167,21 @@ func (c *connection) newSpdyStream(stream *spdystream.Stream) { func (c *connection) SetIdleTimeout(timeout time.Duration) { c.conn.SetIdleTimeout(timeout) } + +func (c *connection) sendPings(period time.Duration) { + t := time.NewTicker(period) + defer t.Stop() + for { + select { + case <-c.conn.CloseChan(): + return + case <-t.C: + } + if _, err := c.ping(); err != nil { + klog.V(3).Infof("SPDY Ping failed: %v", err) + // Continue, in case this is a transient failure. + // c.conn.CloseChan above will tell us when the connection is + // actually closed. + } + } +} diff --git a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection_test.go b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection_test.go index e00b29c461e..fa228d71fb5 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection_test.go @@ -17,13 +17,16 @@ limitations under the License. package spdy import ( + "fmt" "io" "net" "net/http" "sync" + "sync/atomic" "testing" "time" + "github.com/docker/spdystream" "k8s.io/apimachinery/pkg/util/httpstream" ) @@ -178,3 +181,112 @@ func TestConnectionCloseIsImmediateThroughAProxy(t *testing.T) { } } } + +func TestConnectionPings(t *testing.T) { + const pingPeriod = 10 * time.Millisecond + timeout := time.After(10 * time.Second) + + // Set up server connection. + listener, err := net.Listen("tcp4", "localhost:0") + if err != nil { + t.Fatal(err) + } + defer listener.Close() + + srvErr := make(chan error, 1) + go func() { + defer close(srvErr) + + srvConn, err := listener.Accept() + if err != nil { + srvErr <- fmt.Errorf("server: error accepting connection: %v", err) + return + } + defer srvConn.Close() + + spdyConn, err := spdystream.NewConnection(srvConn, true) + if err != nil { + srvErr <- fmt.Errorf("server: error creating spdy connection: %v", err) + return + } + + var pingsSent int64 + srvSPDYConn := newConnection( + spdyConn, + func(stream httpstream.Stream, replySent <-chan struct{}) error { + // Echo all the incoming data. + go io.Copy(stream, stream) + return nil + }, + pingPeriod, + func() (time.Duration, error) { + atomic.AddInt64(&pingsSent, 1) + return 0, nil + }) + defer srvSPDYConn.Close() + + // Wait for the connection to close, to prevent defers from running + // early. + select { + case <-timeout: + srvErr <- fmt.Errorf("server: timeout waiting for connection to close") + return + case <-srvSPDYConn.CloseChan(): + } + + // Count pings sent by the server. + gotPings := atomic.LoadInt64(&pingsSent) + if gotPings < 1 { + t.Errorf("server: failed to send any pings (check logs)") + } + }() + + // Set up client connection. + clConn, err := net.Dial("tcp4", listener.Addr().String()) + if err != nil { + t.Fatalf("client: error connecting to proxy: %v", err) + } + defer clConn.Close() + start := time.Now() + clSPDYConn, err := NewClientConnection(clConn) + if err != nil { + t.Fatalf("client: error creating spdy connection: %v", err) + } + defer clSPDYConn.Close() + clSPDYStream, err := clSPDYConn.CreateStream(http.Header{}) + if err != nil { + t.Fatalf("client: error creating stream: %v", err) + } + defer clSPDYStream.Close() + + // Send some data both ways, to make sure pings don't interfere with + // regular messages. + in := "foo" + if _, err := fmt.Fprintln(clSPDYStream, in); err != nil { + t.Fatalf("client: error writing data to stream: %v", err) + } + var out string + if _, err := fmt.Fscanln(clSPDYStream, &out); err != nil { + t.Fatalf("client: error reading data from stream: %v", err) + } + if in != out { + t.Errorf("client: received data doesn't match sent data: got %q, want %q", out, in) + } + + // Wait for at least 2 pings to get sent each way before closing the + // connection. + elapsed := time.Since(start) + if elapsed < 3*pingPeriod { + time.Sleep(3*pingPeriod - elapsed) + } + clSPDYConn.Close() + + select { + case err, ok := <-srvErr: + if ok && err != nil { + t.Error(err) + } + case <-timeout: + t.Errorf("timed out waiting for server to exit") + } +} diff --git a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/roundtripper.go b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/roundtripper.go index 6309fbc26bb..4cb1cfadcb0 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/roundtripper.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/roundtripper.go @@ -30,6 +30,7 @@ import ( "net/http/httputil" "net/url" "strings" + "time" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -70,6 +71,9 @@ type SpdyRoundTripper struct { // requireSameHostRedirects restricts redirect following to only follow redirects to the same host // as the original request. requireSameHostRedirects bool + // pingPeriod is a period for sending Ping frames over established + // connections. + pingPeriod time.Duration } var _ utilnet.TLSClientConfigHolder = &SpdyRoundTripper{} @@ -79,18 +83,51 @@ var _ utilnet.Dialer = &SpdyRoundTripper{} // NewRoundTripper creates a new SpdyRoundTripper that will use the specified // tlsConfig. func NewRoundTripper(tlsConfig *tls.Config, followRedirects, requireSameHostRedirects bool) *SpdyRoundTripper { - return NewRoundTripperWithProxy(tlsConfig, followRedirects, requireSameHostRedirects, utilnet.NewProxierWithNoProxyCIDR(http.ProxyFromEnvironment)) + return NewRoundTripperWithConfig(RoundTripperConfig{ + TLS: tlsConfig, + FollowRedirects: followRedirects, + RequireSameHostRedirects: requireSameHostRedirects, + }) } // NewRoundTripperWithProxy creates a new SpdyRoundTripper that will use the // specified tlsConfig and proxy func. func NewRoundTripperWithProxy(tlsConfig *tls.Config, followRedirects, requireSameHostRedirects bool, proxier func(*http.Request) (*url.URL, error)) *SpdyRoundTripper { - return &SpdyRoundTripper{ - tlsConfig: tlsConfig, - followRedirects: followRedirects, - requireSameHostRedirects: requireSameHostRedirects, - proxier: proxier, + return NewRoundTripperWithConfig(RoundTripperConfig{ + TLS: tlsConfig, + FollowRedirects: followRedirects, + RequireSameHostRedirects: requireSameHostRedirects, + Proxier: proxier, + }) +} + +// NewRoundTripperWithProxy creates a new SpdyRoundTripper with the specified +// configuration. +func NewRoundTripperWithConfig(cfg RoundTripperConfig) *SpdyRoundTripper { + if cfg.Proxier == nil { + cfg.Proxier = utilnet.NewProxierWithNoProxyCIDR(http.ProxyFromEnvironment) } + return &SpdyRoundTripper{ + tlsConfig: cfg.TLS, + followRedirects: cfg.FollowRedirects, + requireSameHostRedirects: cfg.RequireSameHostRedirects, + proxier: cfg.Proxier, + pingPeriod: cfg.PingPeriod, + } +} + +// RoundTripperConfig is a set of options for an SpdyRoundTripper. +type RoundTripperConfig struct { + // TLS configuration used by the round tripper. + TLS *tls.Config + // Proxier is a proxy function invoked on each request. Optional. + Proxier func(*http.Request) (*url.URL, error) + // PingPeriod is a period for sending SPDY Pings on the connection. + // Optional. + PingPeriod time.Duration + + FollowRedirects bool + RequireSameHostRedirects bool } // TLSClientConfig implements pkg/util/net.TLSClientConfigHolder for proper TLS checking during @@ -316,7 +353,7 @@ func (s *SpdyRoundTripper) NewConnection(resp *http.Response) (httpstream.Connec return nil, fmt.Errorf("unable to upgrade connection: %s", responseError) } - return NewClientConnection(s.conn) + return NewClientConnectionWithPings(s.conn, s.pingPeriod) } // statusScheme is private scheme for the decoding here until someone fixes the TODO in NewConnection diff --git a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go index 045d214d2b7..f17eb09e960 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go @@ -24,6 +24,7 @@ import ( "net/http" "strings" "sync/atomic" + "time" "k8s.io/apimachinery/pkg/util/httpstream" "k8s.io/apimachinery/pkg/util/runtime" @@ -34,6 +35,7 @@ const HeaderSpdy31 = "SPDY/3.1" // responseUpgrader knows how to upgrade HTTP responses. It // implements the httpstream.ResponseUpgrader interface. type responseUpgrader struct { + pingPeriod time.Duration } // connWrapper is used to wrap a hijacked connection and its bufio.Reader. All @@ -64,7 +66,18 @@ func (w *connWrapper) Close() error { // capable of upgrading HTTP responses using SPDY/3.1 via the // spdystream package. func NewResponseUpgrader() httpstream.ResponseUpgrader { - return responseUpgrader{} + return NewResponseUpgraderWithPings(0) +} + +// NewResponseUpgraderWithPings returns a new httpstream.ResponseUpgrader that +// is capable of upgrading HTTP responses using SPDY/3.1 via the spdystream +// package. +// +// If pingPeriod is non-zero, for each incoming connection a background +// goroutine will send periodic Ping frames to the server. Use this to keep +// idle connections through certain load balancers alive longer. +func NewResponseUpgraderWithPings(pingPeriod time.Duration) httpstream.ResponseUpgrader { + return responseUpgrader{pingPeriod: pingPeriod} } // UpgradeResponse upgrades an HTTP response to one that supports multiplexed @@ -97,7 +110,7 @@ func (u responseUpgrader) UpgradeResponse(w http.ResponseWriter, req *http.Reque } connWithBuf := &connWrapper{Conn: conn, bufReader: bufrw.Reader} - spdyConn, err := NewServerConnection(connWithBuf, newStreamHandler) + spdyConn, err := NewServerConnectionWithPings(connWithBuf, newStreamHandler, u.pingPeriod) if err != nil { runtime.HandleError(fmt.Errorf("unable to upgrade: error creating SPDY server connection: %v", err)) return nil