e2e storage: simpler port forwarding

Instead of trying to use the client-go portforward package as-is it is
simpler to copy some code from it and then use the http stream
directly. That way we don't need to go through a local listening
socket and error handling and logging becomes simpler.
This commit is contained in:
Patrick Ohly 2021-02-26 17:47:00 +01:00
parent 3adcf11b45
commit d43308e64c

View File

@ -17,20 +17,23 @@ limitations under the License.
package proxy
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"strconv"
"sync"
"sync/atomic"
"time"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
@ -92,7 +95,7 @@ func Listen(ctx context.Context, clientset kubernetes.Interface, restConfig *res
}
// Port forwarding is allowed to fail and will be restarted when it does.
prepareForwarding := func() (*portforward.PortForwarder, error) {
prepareForwarding := func() (*remotePort, error) {
pod, err := clientset.CoreV1().Pods(addr.Namespace).Get(ctx, addr.PodName, metav1.GetOptions{})
if err != nil {
return nil, err
@ -103,55 +106,24 @@ func Listen(ctx context.Context, clientset kubernetes.Interface, restConfig *res
return nil, fmt.Errorf("container %q is not running", addr.ContainerName)
}
}
readyChannel := make(chan struct{})
fw, err := portforward.New(dialer,
[]string{fmt.Sprintf("0:%d", addr.Port)},
ctx.Done(),
readyChannel,
klogWriter(false, prefix),
klogWriter(true, prefix))
streamConn, _, err := dialer.Dial(portforward.PortForwardProtocolV1Name)
if err != nil {
return nil, err
return nil, fmt.Errorf("dialer failed: %v", err)
}
return fw, nil
rp := &remotePort{
streamConn: streamConn,
}
return rp, nil
}
var connectionsCreated, connectionsClosed int32
runForwarding := func(fw *portforward.PortForwarder) {
defer fw.Close()
runForwarding := func(rp *remotePort) {
defer rp.Close()
klog.V(5).Infof("%s: starting connection polling", prefix)
defer klog.V(5).Infof("%s: connection polling ended", prefix)
failed := make(chan struct{})
go func() {
defer close(failed)
klog.V(5).Infof("%s: starting port forwarding", prefix)
defer klog.V(5).Infof("%s: port forwarding ended", prefix)
err := fw.ForwardPorts()
if err != nil {
if ctx.Err() == nil {
// Something failed unexpectedly.
klog.Errorf("%s: %v", prefix, err)
} else {
// Context is done, log error anyway.
klog.V(5).Infof("%s: %v", prefix, err)
}
}
}()
// Wait for port forwarding to be ready.
select {
case <-ctx.Done():
return
case <-failed:
// The reason was logged above.
return
case <-fw.Ready:
// Proceed...
}
// This delay determines how quickly we notice when someone has
// connected inside the cluster. With socat, we cannot make this too small
// because otherwise we get many rejected connections. With the mock
@ -165,9 +137,6 @@ func Listen(ctx context.Context, clientset kubernetes.Interface, restConfig *res
select {
case <-ctx.Done():
return
case <-failed:
// The reason was logged above.
return
case <-tryConnect.C:
currentClosed := atomic.LoadInt32(&connectionsClosed)
openConnections := connectionsCreated - currentClosed
@ -175,29 +144,8 @@ func Listen(ctx context.Context, clientset kubernetes.Interface, restConfig *res
break
}
// Check whether we can establish a connection through the
// forwarded port.
ports, err := fw.GetPorts()
if err != nil {
// We checked for "port forwarding ready" above, so this
// shouldn't happen.
klog.Errorf("%s: no forwarded ports: %v", prefix, err)
return
}
// We don't want to be blocked to long because we need to check
// for a port forwarding failure occasionally.
timeout := 10 * time.Second
deadline, ok := ctx.Deadline()
if ok {
untilDeadline := deadline.Sub(time.Now())
if untilDeadline < timeout {
timeout = untilDeadline
}
}
klog.V(5).Infof("%s: trying to create a new connection #%d, %d open", prefix, connectionsCreated, openConnections)
c, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", ports[0].Local), timeout)
stream, err := rp.dial(ctx, prefix, addr.Port)
if err != nil {
klog.V(5).Infof("%s: no connection: %v", prefix, err)
break
@ -205,7 +153,7 @@ func Listen(ctx context.Context, clientset kubernetes.Interface, restConfig *res
// Make the connection available to Accept below.
klog.V(5).Infof("%s: created a new connection #%d", prefix, connectionsCreated)
l.connections <- &connection{
Conn: c,
stream: stream,
addr: addr,
counter: connectionsCreated,
closed: &connectionsClosed,
@ -261,6 +209,63 @@ func (a Addr) String() string {
return fmt.Sprintf("%s/%s:%d", a.Namespace, a.PodName, a.Port)
}
// remotePort is a stripped down version of client-go/tools/portforward minus
// the local listeners.
type remotePort struct {
streamConn httpstream.Connection
requestIDLock sync.Mutex
requestID int
}
func (rp *remotePort) dial(ctx context.Context, prefix string, port int) (httpstream.Stream, error) {
requestID := rp.nextRequestID()
// create error stream
headers := http.Header{}
headers.Set(v1.StreamType, v1.StreamTypeError)
headers.Set(v1.PortHeader, fmt.Sprintf("%d", port))
headers.Set(v1.PortForwardRequestIDHeader, strconv.Itoa(requestID))
// We're not writing to this stream, just reading an error message from it.
// This happens asynchronously.
errorStream, err := rp.streamConn.CreateStream(headers)
if err != nil {
return nil, fmt.Errorf("error creating error stream: %v", err)
}
errorStream.Close()
go func() {
message, err := ioutil.ReadAll(errorStream)
switch {
case err != nil:
klog.Errorf("%s: error reading from error stream: %v", prefix, err)
case len(message) > 0:
klog.Errorf("%s: an error occurred connecting to the remote port: %v", prefix, string(message))
}
}()
// create data stream
headers.Set(v1.StreamType, v1.StreamTypeData)
dataStream, err := rp.streamConn.CreateStream(headers)
if err != nil {
return nil, fmt.Errorf("error creating data stream: %v", err)
}
return dataStream, nil
}
func (rp *remotePort) Close() {
rp.streamConn.Close()
}
func (rp *remotePort) nextRequestID() int {
rp.requestIDLock.Lock()
defer rp.requestIDLock.Unlock()
id := rp.requestID
rp.requestID++
return id
}
type listener struct {
addr Addr
connections chan *connection
@ -287,15 +292,37 @@ func (l *listener) Accept() (net.Conn, error) {
}
type connection struct {
net.Conn
stream httpstream.Stream
addr Addr
counter int32
closed *int32
mutex sync.Mutex
}
var _ net.Conn = &connection{}
func (c *connection) LocalAddr() net.Addr {
return c.addr
}
func (c *connection) RemoteAddr() net.Addr {
return c.addr
}
func (c *connection) SetDeadline(t time.Time) error {
return nil
}
func (c *connection) SetReadDeadline(t time.Time) error {
return nil
}
func (c *connection) SetWriteDeadline(t time.Time) error {
return nil
}
func (c *connection) Read(b []byte) (int, error) {
n, err := c.Conn.Read(b)
n, err := c.stream.Read(b)
if errors.Is(err, io.EOF) {
klog.V(5).Infof("forward connection #%d for %s: remote side closed the stream", c.counter, c.addr)
}
@ -303,7 +330,7 @@ func (c *connection) Read(b []byte) (int, error) {
}
func (c *connection) Write(b []byte) (int, error) {
n, err := c.Conn.Write(b)
n, err := c.stream.Write(b)
if errors.Is(err, io.EOF) {
klog.V(5).Infof("forward connection #%d for %s: remote side closed the stream", c.counter, c.addr)
}
@ -319,26 +346,9 @@ func (c *connection) Close() error {
atomic.AddInt32(c.closed, 1)
c.closed = nil
}
return c.Conn.Close()
return c.stream.Close()
}
func (l *listener) Addr() net.Addr {
return l.addr
}
func klogWriter(isError bool, prefix string) io.Writer {
reader, writer := io.Pipe()
go func() {
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
text := scanner.Text()
if isError {
klog.Errorf("%s: %s", prefix, text)
} else {
klog.V(5).Infof("%s: %s", prefix, text)
}
}
}()
return writer
}