Merge pull request #112251 from pohly/e2e-storage-proxy-shutdown

e2e storage: close all pending streams when proxy listener gets closed
This commit is contained in:
Kubernetes Prow Robot 2022-09-12 08:07:24 -07:00 committed by GitHub
commit 48256c55c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -24,7 +24,6 @@ import (
"net"
"net/http"
"sync"
"sync/atomic"
"time"
v1 "k8s.io/api/core/v1"
@ -43,8 +42,28 @@ import (
// need more than one per sidecar and kubelet. Keeping this reasonably
// small ensures that we don't establish connections through the apiserver
// and the remote kernel which then aren't needed.
//
// The proxy code below establishes this many connections in advance,
// without waiting for a client on the remote side. On the local side
// a gRPC server will accept the same number of connections and then wait
// for data from a future client.
//
// This approach has the advantage that a client on the remote side can
// immediately start communicating, without the delay caused by establishing
// the connection. That delay is large enough that clients like the
// node-driver-registrar with a very small timeout for gRPC did indeed
// time out unnecessarily.
const maxConcurrentConnections = 10
// This delay determines how quickly we notice when someone has
// connected inside the cluster. With socat, we cannot make this too small
// because otherwise we get many rejected connections. With the mock
// driver as proxy that doesn't happen as long as we don't
// ask for too many concurrent connections because the mock driver
// keeps the listening port open at all times and the Linux
// kernel automatically accepts our connection requests.
const connectionPollInterval = 100 * time.Millisecond
// Listen creates a listener which returns new connections whenever someone connects
// to a socat or mock driver proxy instance running inside the given pod.
//
@ -85,53 +104,50 @@ func Listen(ctx context.Context, clientset kubernetes.Interface, restConfig *res
prefix := fmt.Sprintf("port forwarding for %s", addr)
ctx, cancel := context.WithCancel(ctx)
l := &listener{
connections: make(chan *connection),
ctx: ctx,
cancel: cancel,
addr: addr,
ctx: ctx,
cancel: cancel,
addr: addr,
}
var connectionsCreated, connectionsClosed int32
var connectionsCreated int
runForwarding := func() {
klog.V(2).Infof("%s: starting connection polling", prefix)
defer klog.V(2).Infof("%s: connection polling ended", prefix)
// This delay determines how quickly we notice when someone has
// connected inside the cluster. With socat, we cannot make this too small
// because otherwise we get many rejected connections. With the mock
// driver as proxy that doesn't happen as long as we don't
// ask for too many concurrent connections because the mock driver
// keeps the listening port open at all times and the Linux
// kernel automatically accepts our connection requests.
tryConnect := time.NewTicker(100 * time.Millisecond)
tryConnect := time.NewTicker(connectionPollInterval)
defer tryConnect.Stop()
for {
select {
case <-ctx.Done():
return
case <-tryConnect.C:
currentClosed := atomic.LoadInt32(&connectionsClosed)
openConnections := connectionsCreated - currentClosed
if openConnections >= maxConcurrentConnections {
break
}
func() {
l.mutex.Lock()
defer l.mutex.Unlock()
klog.V(5).Infof("%s: trying to create a new connection #%d, %d open", prefix, connectionsCreated, openConnections)
stream, err := dial(ctx, fmt.Sprintf("%s #%d", prefix, connectionsCreated), dialer, addr.Port)
if err != nil {
klog.Errorf("%s: no connection: %v", prefix, err)
break
}
// Make the connection available to Accept below.
klog.V(5).Infof("%s: created a new connection #%d", prefix, connectionsCreated)
l.connections <- &connection{
stream: stream,
addr: addr,
counter: connectionsCreated,
closed: &connectionsClosed,
}
connectionsCreated++
for i, c := range l.connections {
if c == nil {
klog.V(5).Infof("%s: trying to create a new connection #%d", prefix, connectionsCreated)
stream, err := dial(ctx, fmt.Sprintf("%s #%d", prefix, connectionsCreated), dialer, addr.Port)
if err != nil {
klog.Errorf("%s: no connection: %v", prefix, err)
return
}
// Make the connection available to Accept below.
klog.V(5).Infof("%s: created a new connection #%d", prefix, connectionsCreated)
c := &connection{
l: l,
stream: stream,
addr: addr,
counter: connectionsCreated,
}
l.connections[i] = c
connectionsCreated++
return
}
}
}()
}
}
}
@ -247,10 +263,12 @@ func (s *stream) Close() {
}
type listener struct {
addr Addr
connections chan *connection
ctx context.Context
cancel func()
addr Addr
ctx context.Context
cancel func()
mutex sync.Mutex
connections [maxConcurrentConnections]*connection
}
var _ net.Listener = &listener{}
@ -258,25 +276,53 @@ var _ net.Listener = &listener{}
func (l *listener) Close() error {
klog.V(5).Infof("forward listener for %s: closing", l.addr)
l.cancel()
l.mutex.Lock()
defer l.mutex.Unlock()
for _, c := range l.connections {
if c != nil {
c.stream.Close()
}
}
return nil
}
func (l *listener) Accept() (net.Conn, error) {
select {
case <-l.ctx.Done():
return nil, errors.New("listening was stopped")
case c := <-l.connections:
klog.V(5).Infof("forward listener for %s: got a new connection #%d", l.addr, c.counter)
return c, nil
tryAccept := time.NewTicker(connectionPollInterval)
defer tryAccept.Stop()
for {
select {
case <-l.ctx.Done():
return nil, errors.New("listening was stopped")
case <-tryAccept.C:
conn := func() net.Conn {
l.mutex.Lock()
defer l.mutex.Unlock()
for _, c := range l.connections {
if c != nil && !c.accepted {
klog.V(5).Infof("forward listener for %s: got a new connection #%d", l.addr, c.counter)
c.accepted = true
return c
}
}
return nil
}()
if conn != nil {
return conn, nil
}
}
}
}
type connection struct {
stream *stream
addr Addr
counter int32
closed *int32
mutex sync.Mutex
l *listener
stream *stream
addr Addr
counter int
mutex sync.Mutex
accepted, closed bool
}
var _ net.Conn = &connection{}
@ -320,13 +366,21 @@ func (c *connection) Write(b []byte) (int, error) {
func (c *connection) Close() error {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.closed != nil {
if !c.closed {
// Do the logging and book-keeping only once. The function itself may be called more than once.
klog.V(5).Infof("forward connection #%d for %s: closing our side", c.counter, c.addr)
atomic.AddInt32(c.closed, 1)
c.closed = nil
c.l.mutex.Lock()
defer c.l.mutex.Unlock()
for i, c2 := range c.l.connections {
if c2 == c {
c.l.connections[i] = nil
break
}
}
}
c.stream.Close()
return nil
}