e2e test: use one connection per stream

Sharing the same connection for multiple streams should have worked,
but ran into unexpected timeouts:

I0227 08:07:49.754263   80029 portproxy.go:109] container "mock" in pod csi-mock-volumes-4037-2061/csi-mockplugin-0 is running
E0227 08:07:49.779359   80029 portproxy.go:178] prepare forwarding csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: dialer failed: unable to upgrade connection: pod not found ("csi-mockplugin-0_csi-mock-volumes-4037-2061")
I0227 08:07:50.782705   80029 portproxy.go:109] container "mock" in pod csi-mock-volumes-4037-2061/csi-mockplugin-0 is running
I0227 08:07:50.809326   80029 portproxy.go:125] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: starting connection polling
I0227 08:07:50.909544   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #0, 0 open
I0227 08:07:50.912436   80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection #0
I0227 08:07:50.912503   80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection #0
I0227 08:07:50.913161   80029 portproxy.go:322] forward connection #0 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: remote side closed the stream
E0227 08:07:50.913324   80029 portproxy.go:242] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: an error occurred connecting to the remote port: error forwarding port 9000 to pod 66662ea1ab30b4193dac0102c49be840971d337c802cc0c8bbc074214522bd13, uid : failed to execute portforward in network namespace "/var/run/netns/cni-c15e4e36-dad9-8316-c301-33af9dad5717": failed to dial 9000: dial tcp4 127.0.0.1:9000: connect: connection refused
I0227 08:07:50.913371   80029 portproxy.go:340] forward connection #0 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: closing our side
W0227 08:07:50.913487   80029 server.go:669] grpc: Server.Serve failed to create ServerTransport: connection error: desc = "transport: http2Server.HandleStreams failed to receive the preface from client: EOF"
I0227 08:07:51.009519   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #1, 0 open
I0227 08:07:51.011912   80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection #1
I0227 08:07:51.011973   80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection #1
I0227 08:07:51.013677   80029 portproxy.go:322] forward connection #1 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: remote side closed the stream
I0227 08:07:51.013720   80029 portproxy.go:340] forward connection #1 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: closing our side
W0227 08:07:51.013794   80029 server.go:669] grpc: Server.Serve failed to create ServerTransport: connection error: desc = "transport: http2Server.HandleStreams failed to receive the preface from client: EOF"
E0227 08:07:51.017026   80029 portproxy.go:242] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: an error occurred connecting to the remote port: error forwarding port 9000 to pod 66662ea1ab30b4193dac0102c49be840971d337c802cc0c8bbc074214522bd13, uid : failed to execute portforward in network namespace "/var/run/netns/cni-c15e4e36-dad9-8316-c301-33af9dad5717": failed to dial 9000: dial tcp4 127.0.0.1:9000: connect: connection refused
I0227 08:07:51.109515   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #2, 0 open
I0227 08:07:51.111479   80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection #2
I0227 08:07:51.111519   80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection #2
I0227 08:07:51.209519   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #3, 1 open
I0227 08:07:51.766305   80029 csi.go:377] gRPC call: {"Method":"/csi.v1.Identity/Probe","Request":{},"Response":{"ready":{"value":true}},"Error":"","FullError":null}
I0227 08:07:51.768304   80029 csi.go:377] gRPC call: {"Method":"/csi.v1.Identity/GetPluginInfo","Request":{},"Response":{"name":"csi-mock-csi-mock-volumes-4037","vendor_version":"0.3.0","manifest":{"url":"https://k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/mock"}},"Error":"","FullError":null}
I0227 08:07:51.770494   80029 csi.go:377] gRPC call: {"Method":"/csi.v1.Identity/GetPluginCapabilities","Request":{},"Response":{"capabilities":[{"Type":{"Service":{"type":1}}},{"Type":{"VolumeExpansion":{"type":1}}},{"Type":{"Service":{"type":2}}}]},"Error":"","FullError":null}
I0227 08:07:51.772899   80029 csi.go:377] gRPC call: {"Method":"/csi.v1.Controller/ControllerGetCapabilities","Request":{},"Response":{"capabilities":[{"Type":{"Rpc":{"type":1}}},{"Type":{"Rpc":{"type":3}}},{"Type":{"Rpc":{"type":10}}},{"Type":{"Rpc":{"type":4}}},{"Type":{"Rpc":{"type":6}}},{"Type":{"Rpc":{"type":5}}},{"Type":{"Rpc":{"type":8}}},{"Type":{"Rpc":{"type":7}}},{"Type":{"Rpc":{"type":12}}},{"Type":{"Rpc":{"type":11}}},{"Type":{"Rpc":{"type":9}}}]},"Error":"","FullError":null}
I0227 08:08:21.209901   80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating error stream: Timeout occurred
I0227 08:08:21.209980   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #3, 1 open
I0227 08:08:51.211522   80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating data stream: Timeout occurred
I0227 08:08:51.211566   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #3, 1 open
I0227 08:08:51.213451   80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection #3
I0227 08:08:51.213498   80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection #3
I0227 08:08:51.309540   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #4, 2 open
I0227 08:08:52.215358   80029 portproxy.go:322] forward connection #3 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: remote side closed the stream
I0227 08:08:52.215475   80029 portproxy.go:340] forward connection #3 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: closing our side
I0227 08:09:21.310003   80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating error stream: Timeout occurred
I0227 08:09:21.310086   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #4, 1 open
I0227 08:09:51.311854   80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating data stream: Timeout occurred
I0227 08:09:51.311908   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #4, 1 open
I0227 08:09:51.314415   80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection #4
I0227 08:09:51.314497   80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection #4
I0227 08:09:51.409527   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #5, 2 open
I0227 08:09:52.326203   80029 portproxy.go:322] forward connection #4 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: remote side closed the stream
I0227 08:09:52.326277   80029 portproxy.go:340] forward connection #4 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: closing our side
I0227 08:10:21.409892   80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating error stream: Timeout occurred
I0227 08:10:21.409954   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #5, 1 open
I0227 08:10:51.411455   80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating data stream: Timeout occurred
I0227 08:10:51.411557   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #5, 1 open
I0227 08:10:51.413229   80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection #5
I0227 08:10:51.413274   80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection #5
I0227 08:10:51.509508   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #6, 2 open
I0227 08:10:52.414862   80029 portproxy.go:322] forward connection #5 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: remote side closed the stream
I0227 08:10:52.414931   80029 portproxy.go:340] forward connection #5 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: closing our side
I0227 08:11:21.509879   80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating error stream: Timeout occurred
I0227 08:11:21.509934   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #6, 1 open
I0227 08:11:51.511519   80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating data stream: Timeout occurred
I0227 08:11:51.511568   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #6, 1 open
I0227 08:11:51.513519   80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection #6
I0227 08:11:51.513571   80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection #6
I0227 08:11:51.609504   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #7, 2 open
I0227 08:11:52.517799   80029 portproxy.go:322] forward connection #6 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: remote side closed the stream
I0227 08:11:52.517918   80029 portproxy.go:340] forward connection #6 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: closing our side
I0227 08:12:21.609856   80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating error stream: Timeout occurred
I0227 08:12:21.609909   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #7, 1 open
I0227 08:12:51.611494   80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating data stream: Timeout occurred
I0227 08:12:51.611555   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #7, 1 open
I0227 08:12:51.613289   80029 portproxy.go:155] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: created a new connection #7
I0227 08:12:51.613343   80029 portproxy.go:286] forward listener for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: got a new connection #7
I0227 08:12:51.709535   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #8, 2 open
I0227 08:12:52.615858   80029 portproxy.go:322] forward connection #7 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: remote side closed the stream
I0227 08:12:52.615989   80029 portproxy.go:340] forward connection #7 for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: closing our side
W0227 08:12:52.616116   80029 server.go:669] grpc: Server.Serve failed to create ServerTransport: connection error: desc = "transport: http2Server.HandleStreams failed to receive the preface from client: EOF"
I0227 08:13:21.709934   80029 portproxy.go:151] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: no connection: error creating error stream: Timeout occurred
I0227 08:13:21.709997   80029 portproxy.go:148] port forwarding for csi-mock-volumes-4037-2061/csi-mockplugin-0:9000: trying to create a new connection #8, 1 open
Feb 27 08:13:30.916: FAIL: Failed to register CSIDriver csi-mock-csi-mock-volumes-4037
Unexpected error:
    <*errors.errorString | 0xc002666220>: {
        s: "error waiting for CSI driver csi-mock-csi-mock-volumes-4037 registration on node kind-worker2: timed out waiting for the condition",
    }
    error waiting for CSI driver csi-mock-csi-mock-volumes-4037 registration on node kind-worker2: timed out waiting for the condition
occurred
This commit is contained in:
Patrick Ohly 2021-02-27 08:45:28 +01:00
parent 5089af1f23
commit 06ffdbc784

View File

@ -24,13 +24,11 @@ import (
"io/ioutil"
"net"
"net/http"
"strconv"
"sync"
"sync/atomic"
"time"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/httpstream"
@ -94,35 +92,11 @@ func Listen(ctx context.Context, clientset kubernetes.Interface, restConfig *res
addr: addr,
}
// Port forwarding is allowed to fail and will be restarted when it does.
prepareForwarding := func() (*remotePort, error) {
pod, err := clientset.CoreV1().Pods(addr.Namespace).Get(ctx, addr.PodName, metav1.GetOptions{})
if err != nil {
return nil, err
}
for i, status := range pod.Status.ContainerStatuses {
if pod.Spec.Containers[i].Name == addr.ContainerName &&
status.State.Running == nil {
return nil, fmt.Errorf("container %q is not running", addr.ContainerName)
}
}
streamConn, _, err := dialer.Dial(portforward.PortForwardProtocolV1Name)
if err != nil {
return nil, fmt.Errorf("dialer failed: %v", err)
}
rp := &remotePort{
streamConn: streamConn,
}
return rp, nil
}
var connectionsCreated, connectionsClosed int32
runForwarding := func(rp *remotePort) {
defer rp.Close()
klog.V(5).Infof("%s: starting connection polling", prefix)
defer klog.V(5).Infof("%s: connection polling ended", prefix)
runForwarding := func() {
klog.V(2).Infof("%s: starting connection polling", prefix)
defer klog.V(2).Infof("%s: connection polling ended", prefix)
// This delay determines how quickly we notice when someone has
// connected inside the cluster. With socat, we cannot make this too small
@ -145,9 +119,9 @@ func Listen(ctx context.Context, clientset kubernetes.Interface, restConfig *res
}
klog.V(5).Infof("%s: trying to create a new connection #%d, %d open", prefix, connectionsCreated, openConnections)
stream, err := rp.dial(ctx, prefix, addr.Port)
stream, err := dial(ctx, fmt.Sprintf("%s #%d", prefix, connectionsCreated), dialer, addr.Port)
if err != nil {
klog.V(5).Infof("%s: no connection: %v", prefix, err)
klog.Errorf("%s: no connection: %v", prefix, err)
break
}
// Make the connection available to Accept below.
@ -166,18 +140,24 @@ func Listen(ctx context.Context, clientset kubernetes.Interface, restConfig *res
// Portforwarding and polling for connections run in the background.
go func() {
for {
fw, err := prepareForwarding()
if err == nil {
runForwarding(fw)
} else {
if apierrors.IsNotFound(err) {
// This is normal, the pod isn't running yet. Log with lower severity.
klog.V(5).Infof("prepare forwarding %s: %v", addr, err)
} else {
klog.Errorf("prepare forwarding %s: %v", addr, err)
running := false
pod, err := clientset.CoreV1().Pods(addr.Namespace).Get(ctx, addr.PodName, metav1.GetOptions{})
if err != nil {
klog.V(5).Infof("checking for container %q in pod %s/%s: %v", addr.ContainerName, addr.Namespace, addr.PodName, err)
}
for i, status := range pod.Status.ContainerStatuses {
if pod.Spec.Containers[i].Name == addr.ContainerName &&
status.State.Running != nil {
running = true
break
}
}
if running {
klog.V(2).Infof("container %q in pod %s/%s is running", addr.ContainerName, addr.Namespace, addr.PodName)
runForwarding()
}
select {
case <-ctx.Done():
return
@ -209,27 +189,32 @@ func (a Addr) String() string {
return fmt.Sprintf("%s/%s:%d", a.Namespace, a.PodName, a.Port)
}
// remotePort is a stripped down version of client-go/tools/portforward minus
// the local listeners.
type remotePort struct {
type stream struct {
httpstream.Stream
streamConn httpstream.Connection
requestIDLock sync.Mutex
requestID int
}
func (rp *remotePort) dial(ctx context.Context, prefix string, port int) (httpstream.Stream, error) {
requestID := rp.nextRequestID()
func dial(ctx context.Context, prefix string, dialer httpstream.Dialer, port int) (s *stream, finalErr error) {
streamConn, _, err := dialer.Dial(portforward.PortForwardProtocolV1Name)
if err != nil {
return nil, fmt.Errorf("dialer failed: %v", err)
}
requestID := "1"
defer func() {
if finalErr != nil {
streamConn.Close()
}
}()
// create error stream
headers := http.Header{}
headers.Set(v1.StreamType, v1.StreamTypeError)
headers.Set(v1.PortHeader, fmt.Sprintf("%d", port))
headers.Set(v1.PortForwardRequestIDHeader, strconv.Itoa(requestID))
headers.Set(v1.PortForwardRequestIDHeader, requestID)
// We're not writing to this stream, just reading an error message from it.
// This happens asynchronously.
errorStream, err := rp.streamConn.CreateStream(headers)
errorStream, err := streamConn.CreateStream(headers)
if err != nil {
return nil, fmt.Errorf("error creating error stream: %v", err)
}
@ -246,24 +231,20 @@ func (rp *remotePort) dial(ctx context.Context, prefix string, port int) (httpst
// create data stream
headers.Set(v1.StreamType, v1.StreamTypeData)
dataStream, err := rp.streamConn.CreateStream(headers)
dataStream, err := streamConn.CreateStream(headers)
if err != nil {
return nil, fmt.Errorf("error creating data stream: %v", err)
}
return dataStream, nil
return &stream{
Stream: dataStream,
streamConn: streamConn,
}, nil
}
func (rp *remotePort) Close() {
rp.streamConn.Close()
}
func (rp *remotePort) nextRequestID() int {
rp.requestIDLock.Lock()
defer rp.requestIDLock.Unlock()
id := rp.requestID
rp.requestID++
return id
func (s *stream) Close() {
s.Stream.Close()
s.streamConn.Close()
}
type listener struct {
@ -292,7 +273,7 @@ func (l *listener) Accept() (net.Conn, error) {
}
type connection struct {
stream httpstream.Stream
stream *stream
addr Addr
counter int32
closed *int32
@ -346,7 +327,8 @@ func (c *connection) Close() error {
atomic.AddInt32(c.closed, 1)
c.closed = nil
}
return c.stream.Close()
c.stream.Close()
return nil
}
func (l *listener) Addr() net.Addr {