From 0bd9a1a29a63c61a79ad1e25ae1fbd7df09056e6 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Mon, 5 Sep 2022 20:59:51 +0200 Subject: [PATCH 1/2] e2e storage: close all pending streams when proxy listener gets closed After updating gRPC in node-driver-registrar from v1.40.0 to v1.47.0 the behavior of gRPC change in a way such that it no longer detected the single-sided closing of the stream as a loss of connection. This caused gRPC in the e2e.test to get stuck, possibly in a Read or Write for the HTTP stream because those have neither a context nor a timeout. Changing the connection handling so that all active connections are tracking in the listener and closing them when the listener gets closed fixed this problem. --- test/e2e/storage/drivers/proxy/portproxy.go | 147 +++++++++++++------- 1 file changed, 95 insertions(+), 52 deletions(-) diff --git a/test/e2e/storage/drivers/proxy/portproxy.go b/test/e2e/storage/drivers/proxy/portproxy.go index 396cb8e3279..20455bd551a 100644 --- a/test/e2e/storage/drivers/proxy/portproxy.go +++ b/test/e2e/storage/drivers/proxy/portproxy.go @@ -24,7 +24,6 @@ import ( "net" "net/http" "sync" - "sync/atomic" "time" v1 "k8s.io/api/core/v1" @@ -45,6 +44,15 @@ import ( // and the remote kernel which then aren't needed. const maxConcurrentConnections = 10 +// This delay determines how quickly we notice when someone has +// connected inside the cluster. With socat, we cannot make this too small +// because otherwise we get many rejected connections. With the mock +// driver as proxy that doesn't happen as long as we don't +// ask for too many concurrent connections because the mock driver +// keeps the listening port open at all times and the Linux +// kernel automatically accepts our connection requests. +const connectionPollInterval = 100 * time.Millisecond + // Listen creates a listener which returns new connections whenever someone connects // to a socat or mock driver proxy instance running inside the given pod. // @@ -85,53 +93,50 @@ func Listen(ctx context.Context, clientset kubernetes.Interface, restConfig *res prefix := fmt.Sprintf("port forwarding for %s", addr) ctx, cancel := context.WithCancel(ctx) l := &listener{ - connections: make(chan *connection), - ctx: ctx, - cancel: cancel, - addr: addr, + ctx: ctx, + cancel: cancel, + addr: addr, } - var connectionsCreated, connectionsClosed int32 + var connectionsCreated int runForwarding := func() { klog.V(2).Infof("%s: starting connection polling", prefix) defer klog.V(2).Infof("%s: connection polling ended", prefix) - // This delay determines how quickly we notice when someone has - // connected inside the cluster. With socat, we cannot make this too small - // because otherwise we get many rejected connections. With the mock - // driver as proxy that doesn't happen as long as we don't - // ask for too many concurrent connections because the mock driver - // keeps the listening port open at all times and the Linux - // kernel automatically accepts our connection requests. - tryConnect := time.NewTicker(100 * time.Millisecond) + tryConnect := time.NewTicker(connectionPollInterval) defer tryConnect.Stop() for { select { case <-ctx.Done(): return case <-tryConnect.C: - currentClosed := atomic.LoadInt32(&connectionsClosed) - openConnections := connectionsCreated - currentClosed - if openConnections >= maxConcurrentConnections { - break - } + func() { + l.mutex.Lock() + defer l.mutex.Unlock() - klog.V(5).Infof("%s: trying to create a new connection #%d, %d open", prefix, connectionsCreated, openConnections) - stream, err := dial(ctx, fmt.Sprintf("%s #%d", prefix, connectionsCreated), dialer, addr.Port) - if err != nil { - klog.Errorf("%s: no connection: %v", prefix, err) - break - } - // Make the connection available to Accept below. - klog.V(5).Infof("%s: created a new connection #%d", prefix, connectionsCreated) - l.connections <- &connection{ - stream: stream, - addr: addr, - counter: connectionsCreated, - closed: &connectionsClosed, - } - connectionsCreated++ + for i, c := range l.connections { + if c == nil { + klog.V(5).Infof("%s: trying to create a new connection #%d", prefix, connectionsCreated) + stream, err := dial(ctx, fmt.Sprintf("%s #%d", prefix, connectionsCreated), dialer, addr.Port) + if err != nil { + klog.Errorf("%s: no connection: %v", prefix, err) + return + } + // Make the connection available to Accept below. + klog.V(5).Infof("%s: created a new connection #%d", prefix, connectionsCreated) + c := &connection{ + l: l, + stream: stream, + addr: addr, + counter: connectionsCreated, + } + l.connections[i] = c + connectionsCreated++ + return + } + } + }() } } } @@ -247,10 +252,12 @@ func (s *stream) Close() { } type listener struct { - addr Addr - connections chan *connection - ctx context.Context - cancel func() + addr Addr + ctx context.Context + cancel func() + + mutex sync.Mutex + connections [maxConcurrentConnections]*connection } var _ net.Listener = &listener{} @@ -258,25 +265,53 @@ var _ net.Listener = &listener{} func (l *listener) Close() error { klog.V(5).Infof("forward listener for %s: closing", l.addr) l.cancel() + + l.mutex.Lock() + defer l.mutex.Unlock() + for _, c := range l.connections { + if c != nil { + c.stream.Close() + } + } + return nil } func (l *listener) Accept() (net.Conn, error) { - select { - case <-l.ctx.Done(): - return nil, errors.New("listening was stopped") - case c := <-l.connections: - klog.V(5).Infof("forward listener for %s: got a new connection #%d", l.addr, c.counter) - return c, nil + tryAccept := time.NewTicker(connectionPollInterval) + defer tryAccept.Stop() + for { + select { + case <-l.ctx.Done(): + return nil, errors.New("listening was stopped") + case <-tryAccept.C: + conn := func() net.Conn { + l.mutex.Lock() + defer l.mutex.Unlock() + + for _, c := range l.connections { + if c != nil && !c.accepted { + klog.V(5).Infof("forward listener for %s: got a new connection #%d", l.addr, c.counter) + c.accepted = true + return c + } + } + return nil + }() + if conn != nil { + return conn, nil + } + } } } type connection struct { - stream *stream - addr Addr - counter int32 - closed *int32 - mutex sync.Mutex + l *listener + stream *stream + addr Addr + counter int + mutex sync.Mutex + accepted, closed bool } var _ net.Conn = &connection{} @@ -320,13 +355,21 @@ func (c *connection) Write(b []byte) (int, error) { func (c *connection) Close() error { c.mutex.Lock() defer c.mutex.Unlock() - if c.closed != nil { + if !c.closed { // Do the logging and book-keeping only once. The function itself may be called more than once. klog.V(5).Infof("forward connection #%d for %s: closing our side", c.counter, c.addr) - atomic.AddInt32(c.closed, 1) - c.closed = nil + + c.l.mutex.Lock() + defer c.l.mutex.Unlock() + for i, c2 := range c.l.connections { + if c2 == c { + c.l.connections[i] = nil + break + } + } } c.stream.Close() + return nil } From 8349c77e8288f050f700864cdcf58bf7f7c31e52 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Wed, 7 Sep 2022 17:05:22 +0200 Subject: [PATCH 2/2] e2e storage: better explanation for maxConcurrentConnections This the same behavior as before, it just wasn't documented. --- test/e2e/storage/drivers/proxy/portproxy.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/test/e2e/storage/drivers/proxy/portproxy.go b/test/e2e/storage/drivers/proxy/portproxy.go index 20455bd551a..43073dffbda 100644 --- a/test/e2e/storage/drivers/proxy/portproxy.go +++ b/test/e2e/storage/drivers/proxy/portproxy.go @@ -42,6 +42,17 @@ import ( // need more than one per sidecar and kubelet. Keeping this reasonably // small ensures that we don't establish connections through the apiserver // and the remote kernel which then aren't needed. +// +// The proxy code below establishes this many connections in advance, +// without waiting for a client on the remote side. On the local side +// a gRPC server will accept the same number of connections and then wait +// for data from a future client. +// +// This approach has the advantage that a client on the remote side can +// immediately start communicating, without the delay caused by establishing +// the connection. That delay is large enough that clients like the +// node-driver-registrar with a very small timeout for gRPC did indeed +// time out unnecessarily. const maxConcurrentConnections = 10 // This delay determines how quickly we notice when someone has