Merge pull request #94170 from awly/spdy-pings

spdy: add optional periodic Pings on the connection
This commit is contained in:
Kubernetes Prow Robot 2020-09-22 16:52:22 -07:00 committed by GitHub
commit 4332d3416e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 218 additions and 13 deletions

View File

@ -16,6 +16,7 @@ go_test(
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/httpstream:go_default_library",
"//vendor/github.com/docker/spdystream:go_default_library",
"//vendor/github.com/elazarl/goproxy:go_default_library",
],
)

View File

@ -34,38 +34,62 @@ type connection struct {
streams []httpstream.Stream
streamLock sync.Mutex
newStreamHandler httpstream.NewStreamHandler
ping func() (time.Duration, error)
}
// NewClientConnection creates a new SPDY client connection.
func NewClientConnection(conn net.Conn) (httpstream.Connection, error) {
return NewClientConnectionWithPings(conn, 0)
}
// NewClientConnectionWithPings creates a new SPDY client connection.
//
// If pingPeriod is non-zero, a background goroutine will send periodic Ping
// frames to the server. Use this to keep idle connections through certain load
// balancers alive longer.
func NewClientConnectionWithPings(conn net.Conn, pingPeriod time.Duration) (httpstream.Connection, error) {
spdyConn, err := spdystream.NewConnection(conn, false)
if err != nil {
defer conn.Close()
return nil, err
}
return newConnection(spdyConn, httpstream.NoOpNewStreamHandler), nil
return newConnection(spdyConn, httpstream.NoOpNewStreamHandler, pingPeriod, spdyConn.Ping), nil
}
// NewServerConnection creates a new SPDY server connection. newStreamHandler
// will be invoked when the server receives a newly created stream from the
// client.
func NewServerConnection(conn net.Conn, newStreamHandler httpstream.NewStreamHandler) (httpstream.Connection, error) {
return NewServerConnectionWithPings(conn, newStreamHandler, 0)
}
// NewServerConnectionWithPings creates a new SPDY server connection.
// newStreamHandler will be invoked when the server receives a newly created
// stream from the client.
//
// If pingPeriod is non-zero, a background goroutine will send periodic Ping
// frames to the server. Use this to keep idle connections through certain load
// balancers alive longer.
func NewServerConnectionWithPings(conn net.Conn, newStreamHandler httpstream.NewStreamHandler, pingPeriod time.Duration) (httpstream.Connection, error) {
spdyConn, err := spdystream.NewConnection(conn, true)
if err != nil {
defer conn.Close()
return nil, err
}
return newConnection(spdyConn, newStreamHandler), nil
return newConnection(spdyConn, newStreamHandler, pingPeriod, spdyConn.Ping), nil
}
// newConnection returns a new connection wrapping conn. newStreamHandler
// will be invoked when the server receives a newly created stream from the
// client.
func newConnection(conn *spdystream.Connection, newStreamHandler httpstream.NewStreamHandler) httpstream.Connection {
c := &connection{conn: conn, newStreamHandler: newStreamHandler}
func newConnection(conn *spdystream.Connection, newStreamHandler httpstream.NewStreamHandler, pingPeriod time.Duration, pingFn func() (time.Duration, error)) httpstream.Connection {
c := &connection{conn: conn, newStreamHandler: newStreamHandler, ping: pingFn}
go conn.Serve(c.newSpdyStream)
if pingPeriod > 0 && pingFn != nil {
go c.sendPings(pingPeriod)
}
return c
}
@ -143,3 +167,21 @@ func (c *connection) newSpdyStream(stream *spdystream.Stream) {
func (c *connection) SetIdleTimeout(timeout time.Duration) {
c.conn.SetIdleTimeout(timeout)
}
func (c *connection) sendPings(period time.Duration) {
t := time.NewTicker(period)
defer t.Stop()
for {
select {
case <-c.conn.CloseChan():
return
case <-t.C:
}
if _, err := c.ping(); err != nil {
klog.V(3).Infof("SPDY Ping failed: %v", err)
// Continue, in case this is a transient failure.
// c.conn.CloseChan above will tell us when the connection is
// actually closed.
}
}
}

View File

@ -17,13 +17,16 @@ limitations under the License.
package spdy
import (
"fmt"
"io"
"net"
"net/http"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/docker/spdystream"
"k8s.io/apimachinery/pkg/util/httpstream"
)
@ -178,3 +181,112 @@ func TestConnectionCloseIsImmediateThroughAProxy(t *testing.T) {
}
}
}
func TestConnectionPings(t *testing.T) {
const pingPeriod = 10 * time.Millisecond
timeout := time.After(10 * time.Second)
// Set up server connection.
listener, err := net.Listen("tcp4", "localhost:0")
if err != nil {
t.Fatal(err)
}
defer listener.Close()
srvErr := make(chan error, 1)
go func() {
defer close(srvErr)
srvConn, err := listener.Accept()
if err != nil {
srvErr <- fmt.Errorf("server: error accepting connection: %v", err)
return
}
defer srvConn.Close()
spdyConn, err := spdystream.NewConnection(srvConn, true)
if err != nil {
srvErr <- fmt.Errorf("server: error creating spdy connection: %v", err)
return
}
var pingsSent int64
srvSPDYConn := newConnection(
spdyConn,
func(stream httpstream.Stream, replySent <-chan struct{}) error {
// Echo all the incoming data.
go io.Copy(stream, stream)
return nil
},
pingPeriod,
func() (time.Duration, error) {
atomic.AddInt64(&pingsSent, 1)
return 0, nil
})
defer srvSPDYConn.Close()
// Wait for the connection to close, to prevent defers from running
// early.
select {
case <-timeout:
srvErr <- fmt.Errorf("server: timeout waiting for connection to close")
return
case <-srvSPDYConn.CloseChan():
}
// Count pings sent by the server.
gotPings := atomic.LoadInt64(&pingsSent)
if gotPings < 1 {
t.Errorf("server: failed to send any pings (check logs)")
}
}()
// Set up client connection.
clConn, err := net.Dial("tcp4", listener.Addr().String())
if err != nil {
t.Fatalf("client: error connecting to proxy: %v", err)
}
defer clConn.Close()
start := time.Now()
clSPDYConn, err := NewClientConnection(clConn)
if err != nil {
t.Fatalf("client: error creating spdy connection: %v", err)
}
defer clSPDYConn.Close()
clSPDYStream, err := clSPDYConn.CreateStream(http.Header{})
if err != nil {
t.Fatalf("client: error creating stream: %v", err)
}
defer clSPDYStream.Close()
// Send some data both ways, to make sure pings don't interfere with
// regular messages.
in := "foo"
if _, err := fmt.Fprintln(clSPDYStream, in); err != nil {
t.Fatalf("client: error writing data to stream: %v", err)
}
var out string
if _, err := fmt.Fscanln(clSPDYStream, &out); err != nil {
t.Fatalf("client: error reading data from stream: %v", err)
}
if in != out {
t.Errorf("client: received data doesn't match sent data: got %q, want %q", out, in)
}
// Wait for at least 2 pings to get sent each way before closing the
// connection.
elapsed := time.Since(start)
if elapsed < 3*pingPeriod {
time.Sleep(3*pingPeriod - elapsed)
}
clSPDYConn.Close()
select {
case err, ok := <-srvErr:
if ok && err != nil {
t.Error(err)
}
case <-timeout:
t.Errorf("timed out waiting for server to exit")
}
}

View File

@ -30,6 +30,7 @@ import (
"net/http/httputil"
"net/url"
"strings"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -70,6 +71,9 @@ type SpdyRoundTripper struct {
// requireSameHostRedirects restricts redirect following to only follow redirects to the same host
// as the original request.
requireSameHostRedirects bool
// pingPeriod is a period for sending Ping frames over established
// connections.
pingPeriod time.Duration
}
var _ utilnet.TLSClientConfigHolder = &SpdyRoundTripper{}
@ -79,18 +83,51 @@ var _ utilnet.Dialer = &SpdyRoundTripper{}
// NewRoundTripper creates a new SpdyRoundTripper that will use the specified
// tlsConfig.
func NewRoundTripper(tlsConfig *tls.Config, followRedirects, requireSameHostRedirects bool) *SpdyRoundTripper {
return NewRoundTripperWithProxy(tlsConfig, followRedirects, requireSameHostRedirects, utilnet.NewProxierWithNoProxyCIDR(http.ProxyFromEnvironment))
return NewRoundTripperWithConfig(RoundTripperConfig{
TLS: tlsConfig,
FollowRedirects: followRedirects,
RequireSameHostRedirects: requireSameHostRedirects,
})
}
// NewRoundTripperWithProxy creates a new SpdyRoundTripper that will use the
// specified tlsConfig and proxy func.
func NewRoundTripperWithProxy(tlsConfig *tls.Config, followRedirects, requireSameHostRedirects bool, proxier func(*http.Request) (*url.URL, error)) *SpdyRoundTripper {
return &SpdyRoundTripper{
tlsConfig: tlsConfig,
followRedirects: followRedirects,
requireSameHostRedirects: requireSameHostRedirects,
proxier: proxier,
return NewRoundTripperWithConfig(RoundTripperConfig{
TLS: tlsConfig,
FollowRedirects: followRedirects,
RequireSameHostRedirects: requireSameHostRedirects,
Proxier: proxier,
})
}
// NewRoundTripperWithProxy creates a new SpdyRoundTripper with the specified
// configuration.
func NewRoundTripperWithConfig(cfg RoundTripperConfig) *SpdyRoundTripper {
if cfg.Proxier == nil {
cfg.Proxier = utilnet.NewProxierWithNoProxyCIDR(http.ProxyFromEnvironment)
}
return &SpdyRoundTripper{
tlsConfig: cfg.TLS,
followRedirects: cfg.FollowRedirects,
requireSameHostRedirects: cfg.RequireSameHostRedirects,
proxier: cfg.Proxier,
pingPeriod: cfg.PingPeriod,
}
}
// RoundTripperConfig is a set of options for an SpdyRoundTripper.
type RoundTripperConfig struct {
// TLS configuration used by the round tripper.
TLS *tls.Config
// Proxier is a proxy function invoked on each request. Optional.
Proxier func(*http.Request) (*url.URL, error)
// PingPeriod is a period for sending SPDY Pings on the connection.
// Optional.
PingPeriod time.Duration
FollowRedirects bool
RequireSameHostRedirects bool
}
// TLSClientConfig implements pkg/util/net.TLSClientConfigHolder for proper TLS checking during
@ -316,7 +353,7 @@ func (s *SpdyRoundTripper) NewConnection(resp *http.Response) (httpstream.Connec
return nil, fmt.Errorf("unable to upgrade connection: %s", responseError)
}
return NewClientConnection(s.conn)
return NewClientConnectionWithPings(s.conn, s.pingPeriod)
}
// statusScheme is private scheme for the decoding here until someone fixes the TODO in NewConnection

View File

@ -24,6 +24,7 @@ import (
"net/http"
"strings"
"sync/atomic"
"time"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/runtime"
@ -34,6 +35,7 @@ const HeaderSpdy31 = "SPDY/3.1"
// responseUpgrader knows how to upgrade HTTP responses. It
// implements the httpstream.ResponseUpgrader interface.
type responseUpgrader struct {
pingPeriod time.Duration
}
// connWrapper is used to wrap a hijacked connection and its bufio.Reader. All
@ -64,7 +66,18 @@ func (w *connWrapper) Close() error {
// capable of upgrading HTTP responses using SPDY/3.1 via the
// spdystream package.
func NewResponseUpgrader() httpstream.ResponseUpgrader {
return responseUpgrader{}
return NewResponseUpgraderWithPings(0)
}
// NewResponseUpgraderWithPings returns a new httpstream.ResponseUpgrader that
// is capable of upgrading HTTP responses using SPDY/3.1 via the spdystream
// package.
//
// If pingPeriod is non-zero, for each incoming connection a background
// goroutine will send periodic Ping frames to the server. Use this to keep
// idle connections through certain load balancers alive longer.
func NewResponseUpgraderWithPings(pingPeriod time.Duration) httpstream.ResponseUpgrader {
return responseUpgrader{pingPeriod: pingPeriod}
}
// UpgradeResponse upgrades an HTTP response to one that supports multiplexed
@ -97,7 +110,7 @@ func (u responseUpgrader) UpgradeResponse(w http.ResponseWriter, req *http.Reque
}
connWithBuf := &connWrapper{Conn: conn, bufReader: bufrw.Reader}
spdyConn, err := NewServerConnection(connWithBuf, newStreamHandler)
spdyConn, err := NewServerConnectionWithPings(connWithBuf, newStreamHandler, u.pingPeriod)
if err != nil {
runtime.HandleError(fmt.Errorf("unable to upgrade: error creating SPDY server connection: %v", err))
return nil