From d43308e64c24b4cfaed3ad7931c0997dcb5e864a Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Fri, 26 Feb 2021 17:47:00 +0100 Subject: [PATCH] e2e storage: simpler port forwarding Instead of trying to use the client-go portforward package as-is it is simpler to copy some code from it and then use the http stream directly. That way we don't need to go through a local listening socket and error handling and logging becomes simpler. --- test/e2e/storage/drivers/proxy/portproxy.go | 188 +++++++++++--------- 1 file changed, 99 insertions(+), 89 deletions(-) diff --git a/test/e2e/storage/drivers/proxy/portproxy.go b/test/e2e/storage/drivers/proxy/portproxy.go index 9050cac075e..7f1d604882c 100644 --- a/test/e2e/storage/drivers/proxy/portproxy.go +++ b/test/e2e/storage/drivers/proxy/portproxy.go @@ -17,20 +17,23 @@ limitations under the License. package proxy import ( - "bufio" "context" "errors" "fmt" "io" + "io/ioutil" "net" "net/http" + "strconv" "sync" "sync/atomic" "time" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/httpstream" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -92,7 +95,7 @@ func Listen(ctx context.Context, clientset kubernetes.Interface, restConfig *res } // Port forwarding is allowed to fail and will be restarted when it does. - prepareForwarding := func() (*portforward.PortForwarder, error) { + prepareForwarding := func() (*remotePort, error) { pod, err := clientset.CoreV1().Pods(addr.Namespace).Get(ctx, addr.PodName, metav1.GetOptions{}) if err != nil { return nil, err @@ -103,55 +106,24 @@ func Listen(ctx context.Context, clientset kubernetes.Interface, restConfig *res return nil, fmt.Errorf("container %q is not running", addr.ContainerName) } } - readyChannel := make(chan struct{}) - fw, err := portforward.New(dialer, - []string{fmt.Sprintf("0:%d", addr.Port)}, - ctx.Done(), - readyChannel, - klogWriter(false, prefix), - klogWriter(true, prefix)) + + streamConn, _, err := dialer.Dial(portforward.PortForwardProtocolV1Name) if err != nil { - return nil, err + return nil, fmt.Errorf("dialer failed: %v", err) } - return fw, nil + rp := &remotePort{ + streamConn: streamConn, + } + return rp, nil } var connectionsCreated, connectionsClosed int32 - runForwarding := func(fw *portforward.PortForwarder) { - defer fw.Close() + runForwarding := func(rp *remotePort) { + defer rp.Close() klog.V(5).Infof("%s: starting connection polling", prefix) defer klog.V(5).Infof("%s: connection polling ended", prefix) - failed := make(chan struct{}) - go func() { - defer close(failed) - klog.V(5).Infof("%s: starting port forwarding", prefix) - defer klog.V(5).Infof("%s: port forwarding ended", prefix) - - err := fw.ForwardPorts() - if err != nil { - if ctx.Err() == nil { - // Something failed unexpectedly. - klog.Errorf("%s: %v", prefix, err) - } else { - // Context is done, log error anyway. - klog.V(5).Infof("%s: %v", prefix, err) - } - } - }() - - // Wait for port forwarding to be ready. - select { - case <-ctx.Done(): - return - case <-failed: - // The reason was logged above. - return - case <-fw.Ready: - // Proceed... - } - // This delay determines how quickly we notice when someone has // connected inside the cluster. With socat, we cannot make this too small // because otherwise we get many rejected connections. With the mock @@ -165,9 +137,6 @@ func Listen(ctx context.Context, clientset kubernetes.Interface, restConfig *res select { case <-ctx.Done(): return - case <-failed: - // The reason was logged above. - return case <-tryConnect.C: currentClosed := atomic.LoadInt32(&connectionsClosed) openConnections := connectionsCreated - currentClosed @@ -175,29 +144,8 @@ func Listen(ctx context.Context, clientset kubernetes.Interface, restConfig *res break } - // Check whether we can establish a connection through the - // forwarded port. - ports, err := fw.GetPorts() - if err != nil { - // We checked for "port forwarding ready" above, so this - // shouldn't happen. - klog.Errorf("%s: no forwarded ports: %v", prefix, err) - return - } - - // We don't want to be blocked to long because we need to check - // for a port forwarding failure occasionally. - timeout := 10 * time.Second - deadline, ok := ctx.Deadline() - if ok { - untilDeadline := deadline.Sub(time.Now()) - if untilDeadline < timeout { - timeout = untilDeadline - } - } - klog.V(5).Infof("%s: trying to create a new connection #%d, %d open", prefix, connectionsCreated, openConnections) - c, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", ports[0].Local), timeout) + stream, err := rp.dial(ctx, prefix, addr.Port) if err != nil { klog.V(5).Infof("%s: no connection: %v", prefix, err) break @@ -205,7 +153,7 @@ func Listen(ctx context.Context, clientset kubernetes.Interface, restConfig *res // Make the connection available to Accept below. klog.V(5).Infof("%s: created a new connection #%d", prefix, connectionsCreated) l.connections <- &connection{ - Conn: c, + stream: stream, addr: addr, counter: connectionsCreated, closed: &connectionsClosed, @@ -261,6 +209,63 @@ func (a Addr) String() string { return fmt.Sprintf("%s/%s:%d", a.Namespace, a.PodName, a.Port) } +// remotePort is a stripped down version of client-go/tools/portforward minus +// the local listeners. +type remotePort struct { + streamConn httpstream.Connection + + requestIDLock sync.Mutex + requestID int +} + +func (rp *remotePort) dial(ctx context.Context, prefix string, port int) (httpstream.Stream, error) { + requestID := rp.nextRequestID() + + // create error stream + headers := http.Header{} + headers.Set(v1.StreamType, v1.StreamTypeError) + headers.Set(v1.PortHeader, fmt.Sprintf("%d", port)) + headers.Set(v1.PortForwardRequestIDHeader, strconv.Itoa(requestID)) + + // We're not writing to this stream, just reading an error message from it. + // This happens asynchronously. + errorStream, err := rp.streamConn.CreateStream(headers) + if err != nil { + return nil, fmt.Errorf("error creating error stream: %v", err) + } + errorStream.Close() + go func() { + message, err := ioutil.ReadAll(errorStream) + switch { + case err != nil: + klog.Errorf("%s: error reading from error stream: %v", prefix, err) + case len(message) > 0: + klog.Errorf("%s: an error occurred connecting to the remote port: %v", prefix, string(message)) + } + }() + + // create data stream + headers.Set(v1.StreamType, v1.StreamTypeData) + dataStream, err := rp.streamConn.CreateStream(headers) + if err != nil { + return nil, fmt.Errorf("error creating data stream: %v", err) + } + + return dataStream, nil +} + +func (rp *remotePort) Close() { + rp.streamConn.Close() +} + +func (rp *remotePort) nextRequestID() int { + rp.requestIDLock.Lock() + defer rp.requestIDLock.Unlock() + id := rp.requestID + rp.requestID++ + return id +} + type listener struct { addr Addr connections chan *connection @@ -287,15 +292,37 @@ func (l *listener) Accept() (net.Conn, error) { } type connection struct { - net.Conn + stream httpstream.Stream addr Addr counter int32 closed *int32 mutex sync.Mutex } +var _ net.Conn = &connection{} + +func (c *connection) LocalAddr() net.Addr { + return c.addr +} + +func (c *connection) RemoteAddr() net.Addr { + return c.addr +} + +func (c *connection) SetDeadline(t time.Time) error { + return nil +} + +func (c *connection) SetReadDeadline(t time.Time) error { + return nil +} + +func (c *connection) SetWriteDeadline(t time.Time) error { + return nil +} + func (c *connection) Read(b []byte) (int, error) { - n, err := c.Conn.Read(b) + n, err := c.stream.Read(b) if errors.Is(err, io.EOF) { klog.V(5).Infof("forward connection #%d for %s: remote side closed the stream", c.counter, c.addr) } @@ -303,7 +330,7 @@ func (c *connection) Read(b []byte) (int, error) { } func (c *connection) Write(b []byte) (int, error) { - n, err := c.Conn.Write(b) + n, err := c.stream.Write(b) if errors.Is(err, io.EOF) { klog.V(5).Infof("forward connection #%d for %s: remote side closed the stream", c.counter, c.addr) } @@ -319,26 +346,9 @@ func (c *connection) Close() error { atomic.AddInt32(c.closed, 1) c.closed = nil } - return c.Conn.Close() + return c.stream.Close() } func (l *listener) Addr() net.Addr { return l.addr } - -func klogWriter(isError bool, prefix string) io.Writer { - reader, writer := io.Pipe() - go func() { - scanner := bufio.NewScanner(reader) - for scanner.Scan() { - text := scanner.Text() - if isError { - klog.Errorf("%s: %s", prefix, text) - } else { - klog.V(5).Infof("%s: %s", prefix, text) - } - } - }() - - return writer -}