From 06ffdbc784a0795121e893deefc22fa9d1716751 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Sat, 27 Feb 2021 08:45:28 +0100 Subject: [PATCH] e2e test: use one connection per stream Sharing the same connection for multiple streams should have worked, but ran into unexpected timeouts: I0227 08:07:49.754263 80029 portproxy.go:109] container "mock" in pod csi-mock-volumes-4037-2061/csi-mockplugin-0 is running E0227 08:07:49.779359 80029 portproxy.go:178] prepare forwarding csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: dialer failed: unable to upgrade connection: pod not found ("csi-mockplugin-0_csi-mock-volumes-4037-2061") I0227 08:07:50.782705 80029 portproxy.go:109] container "mock" in pod csi-mock-volumes-4037-2061/csi-mockplugin-0 is running I0227 08:07:50.809326 80029 portproxy.go:125] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: starting connection polling I0227 08:07:50.909544 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #0, 0 open I0227 08:07:50.912436 80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection #0 I0227 08:07:50.912503 80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection #0 I0227 08:07:50.913161 80029 portproxy.go:322] forward connection #0 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: remote side closed the stream E0227 08:07:50.913324 80029 portproxy.go:242] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: an error occurred connecting to the remote port: error forwarding port 9000 to pod 66662ea1ab30b4193dac0102c49be840971d337c802cc0c8bbc074214522bd13, uid : failed to execute portforward in network namespace "/var/run/netns/cni-c15e4e36-dad9-8316-c301-33af9dad5717": failed to dial 9000: dial tcp4 127.0.0.1:9000: connect: connection refused I0227 08:07:50.913371 80029 portproxy.go:340] forward connection #0 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: closing our side W0227 08:07:50.913487 80029 server.go:669] grpc: Server.Serve failed to create ServerTransport: connection error: desc = "transport: http2Server.HandleStreams failed to receive the preface from client: EOF" I0227 08:07:51.009519 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #1, 0 open I0227 08:07:51.011912 80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection #1 I0227 08:07:51.011973 80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection #1 I0227 08:07:51.013677 80029 portproxy.go:322] forward connection #1 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: remote side closed the stream I0227 08:07:51.013720 80029 portproxy.go:340] forward connection #1 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: closing our side W0227 08:07:51.013794 80029 server.go:669] grpc: Server.Serve failed to create ServerTransport: connection error: desc = "transport: http2Server.HandleStreams failed to receive the preface from client: EOF" E0227 08:07:51.017026 80029 portproxy.go:242] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: an error occurred connecting to the remote port: error forwarding port 9000 to pod 66662ea1ab30b4193dac0102c49be840971d337c802cc0c8bbc074214522bd13, uid : failed to execute portforward in network namespace "/var/run/netns/cni-c15e4e36-dad9-8316-c301-33af9dad5717": failed to dial 9000: dial tcp4 127.0.0.1:9000: connect: connection refused I0227 08:07:51.109515 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #2, 0 open I0227 08:07:51.111479 80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection #2 I0227 08:07:51.111519 80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection #2 I0227 08:07:51.209519 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #3, 1 open I0227 08:07:51.766305 80029 csi.go:377] gRPC call: {"Method":"/csi.v1.Identity/Probe","Request":{},"Response":{"ready":{"value":true}},"Error":"","FullError":null} I0227 08:07:51.768304 80029 csi.go:377] gRPC call: {"Method":"/csi.v1.Identity/GetPluginInfo","Request":{},"Response":{"name":"csi-mock-csi-mock-volumes-4037","vendor_version":"0.3.0","manifest":{"url":"https://k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/mock"}},"Error":"","FullError":null} I0227 08:07:51.770494 80029 csi.go:377] gRPC call: {"Method":"/csi.v1.Identity/GetPluginCapabilities","Request":{},"Response":{"capabilities":[{"Type":{"Service":{"type":1}}},{"Type":{"VolumeExpansion":{"type":1}}},{"Type":{"Service":{"type":2}}}]},"Error":"","FullError":null} I0227 08:07:51.772899 80029 csi.go:377] gRPC call: {"Method":"/csi.v1.Controller/ControllerGetCapabilities","Request":{},"Response":{"capabilities":[{"Type":{"Rpc":{"type":1}}},{"Type":{"Rpc":{"type":3}}},{"Type":{"Rpc":{"type":10}}},{"Type":{"Rpc":{"type":4}}},{"Type":{"Rpc":{"type":6}}},{"Type":{"Rpc":{"type":5}}},{"Type":{"Rpc":{"type":8}}},{"Type":{"Rpc":{"type":7}}},{"Type":{"Rpc":{"type":12}}},{"Type":{"Rpc":{"type":11}}},{"Type":{"Rpc":{"type":9}}}]},"Error":"","FullError":null} I0227 08:08:21.209901 80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating error stream: Timeout occurred I0227 08:08:21.209980 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #3, 1 open I0227 08:08:51.211522 80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating data stream: Timeout occurred I0227 08:08:51.211566 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #3, 1 open I0227 08:08:51.213451 80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection #3 I0227 08:08:51.213498 80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection #3 I0227 08:08:51.309540 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #4, 2 open I0227 08:08:52.215358 80029 portproxy.go:322] forward connection #3 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: remote side closed the stream I0227 08:08:52.215475 80029 portproxy.go:340] forward connection #3 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: closing our side I0227 08:09:21.310003 80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating error stream: Timeout occurred I0227 08:09:21.310086 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #4, 1 open I0227 08:09:51.311854 80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating data stream: Timeout occurred I0227 08:09:51.311908 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #4, 1 open I0227 08:09:51.314415 80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection #4 I0227 08:09:51.314497 80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection #4 I0227 08:09:51.409527 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #5, 2 open I0227 08:09:52.326203 80029 portproxy.go:322] forward connection #4 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: remote side closed the stream I0227 08:09:52.326277 80029 portproxy.go:340] forward connection #4 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: closing our side I0227 08:10:21.409892 80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating error stream: Timeout occurred I0227 08:10:21.409954 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #5, 1 open I0227 08:10:51.411455 80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating data stream: Timeout occurred I0227 08:10:51.411557 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #5, 1 open I0227 08:10:51.413229 80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection #5 I0227 08:10:51.413274 80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection #5 I0227 08:10:51.509508 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #6, 2 open I0227 08:10:52.414862 80029 portproxy.go:322] forward connection #5 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: remote side closed the stream I0227 08:10:52.414931 80029 portproxy.go:340] forward connection #5 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: closing our side I0227 08:11:21.509879 80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating error stream: Timeout occurred I0227 08:11:21.509934 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #6, 1 open I0227 08:11:51.511519 80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating data stream: Timeout occurred I0227 08:11:51.511568 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #6, 1 open I0227 08:11:51.513519 80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection #6 I0227 08:11:51.513571 80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection #6 I0227 08:11:51.609504 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #7, 2 open I0227 08:11:52.517799 80029 portproxy.go:322] forward connection #6 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: remote side closed the stream I0227 08:11:52.517918 80029 portproxy.go:340] forward connection #6 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: closing our side I0227 08:12:21.609856 80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating error stream: Timeout occurred I0227 08:12:21.609909 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #7, 1 open I0227 08:12:51.611494 80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating data stream: Timeout occurred I0227 08:12:51.611555 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #7, 1 open I0227 08:12:51.613289 80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection #7 I0227 08:12:51.613343 80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection #7 I0227 08:12:51.709535 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #8, 2 open I0227 08:12:52.615858 80029 portproxy.go:322] forward connection #7 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: remote side closed the stream I0227 08:12:52.615989 80029 portproxy.go:340] forward connection #7 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: closing our side W0227 08:12:52.616116 80029 server.go:669] grpc: Server.Serve failed to create ServerTransport: connection error: desc = "transport: http2Server.HandleStreams failed to receive the preface from client: EOF" I0227 08:13:21.709934 80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating error stream: Timeout occurred I0227 08:13:21.709997 80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #8, 1 open Feb 27 08:13:30.916: FAIL: Failed to register CSIDriver csi-mock-csi-mock-volumes-4037 Unexpected error: <*errors.errorString | 0xc002666220>: { s: "error waiting for CSI driver csi-mock-csi-mock-volumes-4037 registration on node kind-worker2: timed out waiting for the condition", } error waiting for CSI driver csi-mock-csi-mock-volumes-4037 registration on node kind-worker2: timed out waiting for the condition occurred --- test/e2e/storage/drivers/proxy/portproxy.go | 110 ++++++++------------ 1 file changed, 46 insertions(+), 64 deletions(-) diff --git a/test/e2e/storage/drivers/proxy/portproxy.go b/test/e2e/storage/drivers/proxy/portproxy.go index 7f1d604882c..aef56974ad8 100644 --- a/test/e2e/storage/drivers/proxy/portproxy.go +++ b/test/e2e/storage/drivers/proxy/portproxy.go @@ -24,13 +24,11 @@ import ( "io/ioutil" "net" "net/http" - "strconv" "sync" "sync/atomic" "time" v1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/httpstream" @@ -94,35 +92,11 @@ func Listen(ctx context.Context, clientset kubernetes.Interface, restConfig *res addr: addr, } - // Port forwarding is allowed to fail and will be restarted when it does. - prepareForwarding := func() (*remotePort, error) { - pod, err := clientset.CoreV1().Pods(addr.Namespace).Get(ctx, addr.PodName, metav1.GetOptions{}) - if err != nil { - return nil, err - } - for i, status := range pod.Status.ContainerStatuses { - if pod.Spec.Containers[i].Name == addr.ContainerName && - status.State.Running == nil { - return nil, fmt.Errorf("container %q is not running", addr.ContainerName) - } - } - - streamConn, _, err := dialer.Dial(portforward.PortForwardProtocolV1Name) - if err != nil { - return nil, fmt.Errorf("dialer failed: %v", err) - } - rp := &remotePort{ - streamConn: streamConn, - } - return rp, nil - } - var connectionsCreated, connectionsClosed int32 - runForwarding := func(rp *remotePort) { - defer rp.Close() - klog.V(5).Infof("%s: starting connection polling", prefix) - defer klog.V(5).Infof("%s: connection polling ended", prefix) + runForwarding := func() { + klog.V(2).Infof("%s: starting connection polling", prefix) + defer klog.V(2).Infof("%s: connection polling ended", prefix) // This delay determines how quickly we notice when someone has // connected inside the cluster. With socat, we cannot make this too small @@ -145,9 +119,9 @@ func Listen(ctx context.Context, clientset kubernetes.Interface, restConfig *res } klog.V(5).Infof("%s: trying to create a new connection #%d, %d open", prefix, connectionsCreated, openConnections) - stream, err := rp.dial(ctx, prefix, addr.Port) + stream, err := dial(ctx, fmt.Sprintf("%s #%d", prefix, connectionsCreated), dialer, addr.Port) if err != nil { - klog.V(5).Infof("%s: no connection: %v", prefix, err) + klog.Errorf("%s: no connection: %v", prefix, err) break } // Make the connection available to Accept below. @@ -166,18 +140,24 @@ func Listen(ctx context.Context, clientset kubernetes.Interface, restConfig *res // Portforwarding and polling for connections run in the background. go func() { for { - fw, err := prepareForwarding() - if err == nil { - runForwarding(fw) - } else { - if apierrors.IsNotFound(err) { - // This is normal, the pod isn't running yet. Log with lower severity. - klog.V(5).Infof("prepare forwarding %s: %v", addr, err) - } else { - klog.Errorf("prepare forwarding %s: %v", addr, err) + running := false + pod, err := clientset.CoreV1().Pods(addr.Namespace).Get(ctx, addr.PodName, metav1.GetOptions{}) + if err != nil { + klog.V(5).Infof("checking for container %q in pod %s/%s: %v", addr.ContainerName, addr.Namespace, addr.PodName, err) + } + for i, status := range pod.Status.ContainerStatuses { + if pod.Spec.Containers[i].Name == addr.ContainerName && + status.State.Running != nil { + running = true + break } } + if running { + klog.V(2).Infof("container %q in pod %s/%s is running", addr.ContainerName, addr.Namespace, addr.PodName) + runForwarding() + } + select { case <-ctx.Done(): return @@ -209,27 +189,32 @@ func (a Addr) String() string { return fmt.Sprintf("%s/%s:%d", a.Namespace, a.PodName, a.Port) } -// remotePort is a stripped down version of client-go/tools/portforward minus -// the local listeners. -type remotePort struct { +type stream struct { + httpstream.Stream streamConn httpstream.Connection - - requestIDLock sync.Mutex - requestID int } -func (rp *remotePort) dial(ctx context.Context, prefix string, port int) (httpstream.Stream, error) { - requestID := rp.nextRequestID() +func dial(ctx context.Context, prefix string, dialer httpstream.Dialer, port int) (s *stream, finalErr error) { + streamConn, _, err := dialer.Dial(portforward.PortForwardProtocolV1Name) + if err != nil { + return nil, fmt.Errorf("dialer failed: %v", err) + } + requestID := "1" + defer func() { + if finalErr != nil { + streamConn.Close() + } + }() // create error stream headers := http.Header{} headers.Set(v1.StreamType, v1.StreamTypeError) headers.Set(v1.PortHeader, fmt.Sprintf("%d", port)) - headers.Set(v1.PortForwardRequestIDHeader, strconv.Itoa(requestID)) + headers.Set(v1.PortForwardRequestIDHeader, requestID) // We're not writing to this stream, just reading an error message from it. // This happens asynchronously. - errorStream, err := rp.streamConn.CreateStream(headers) + errorStream, err := streamConn.CreateStream(headers) if err != nil { return nil, fmt.Errorf("error creating error stream: %v", err) } @@ -246,24 +231,20 @@ func (rp *remotePort) dial(ctx context.Context, prefix string, port int) (httpst // create data stream headers.Set(v1.StreamType, v1.StreamTypeData) - dataStream, err := rp.streamConn.CreateStream(headers) + dataStream, err := streamConn.CreateStream(headers) if err != nil { return nil, fmt.Errorf("error creating data stream: %v", err) } - return dataStream, nil + return &stream{ + Stream: dataStream, + streamConn: streamConn, + }, nil } -func (rp *remotePort) Close() { - rp.streamConn.Close() -} - -func (rp *remotePort) nextRequestID() int { - rp.requestIDLock.Lock() - defer rp.requestIDLock.Unlock() - id := rp.requestID - rp.requestID++ - return id +func (s *stream) Close() { + s.Stream.Close() + s.streamConn.Close() } type listener struct { @@ -292,7 +273,7 @@ func (l *listener) Accept() (net.Conn, error) { } type connection struct { - stream httpstream.Stream + stream *stream addr Addr counter int32 closed *int32 @@ -346,7 +327,8 @@ func (c *connection) Close() error { atomic.AddInt32(c.closed, 1) c.closed = nil } - return c.stream.Close() + c.stream.Close() + return nil } func (l *listener) Addr() net.Addr {