client-go portforward + apiserver proxy: structured, contextual logging

When debugging, it helps to keep output from different connections
separate. This can be done with contextual logging and using different loggers
for each connection.

Cancellation is handled separately for requests. Therefore the new APIs only
add support for passing a logger instance.

Kubernetes-commit: dfdf07bb531aa8a397f3c74e5eec851130325971
This commit is contained in:
Patrick Ohly
2024-12-04 15:21:11 +01:00
committed by Kubernetes Publisher
parent 8101e94f49
commit e3793792ba
7 changed files with 93 additions and 37 deletions

View File

@@ -50,6 +50,7 @@ func NewFallbackDialer(primary, secondary httpstream.Dialer, shouldFallback func
func (f *FallbackDialer) Dial(protocols ...string) (httpstream.Connection, string, error) {
conn, version, err := f.primary.Dial(protocols...)
if err != nil && f.shouldFallback(err) {
//nolint:logcheck // This code is only used by kubectl where contextual logging is not that useful.
klog.V(4).Infof("fallback to secondary dialer from primary dialer err: %v", err)
return f.secondary.Dial(protocols...)
}

View File

@@ -40,7 +40,7 @@ func TestFallbackDialer(t *testing.T) {
assert.Equal(t, primaryProtocol, negotiated, "primary negotiated protocol returned")
require.NoError(t, err, "error from primary dialer should be nil")
// If primary dialer error is upgrade error, then fallback returning secondary dial response.
primary = &fakeDialer{dialed: false, negotiatedProtocol: primaryProtocol, err: &httpstream.UpgradeFailureError{}}
primary = &fakeDialer{dialed: false, negotiatedProtocol: primaryProtocol, err: &httpstream.UpgradeFailureError{Cause: fmt.Errorf("fake error")}}
secondary = &fakeDialer{dialed: false, negotiatedProtocol: secondaryProtocol}
fallbackDialer = NewFallbackDialer(primary, secondary, httpstream.IsUpgradeFailure)
_, negotiated, err = fallbackDialer.Dial(protocols...)

View File

@@ -17,6 +17,7 @@ limitations under the License.
package portforward
import (
"context"
"errors"
"fmt"
"io"
@@ -30,6 +31,8 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
netutils "k8s.io/utils/net"
)
@@ -52,6 +55,7 @@ type PortForwarder struct {
ports []ForwardedPort
stopChan <-chan struct{}
logger klog.Logger
dialer httpstream.Dialer
streamConn httpstream.Connection
listeners []io.Closer
@@ -165,7 +169,14 @@ func New(dialer httpstream.Dialer, ports []string, stopChan <-chan struct{}, rea
}
// NewOnAddresses creates a new PortForwarder with custom listen addresses.
//
//logcheck:context // NewOnAddressesWithContext should be used instead of NewOnAddresses in code which supports contextual logging.
func NewOnAddresses(dialer httpstream.Dialer, addresses []string, ports []string, stopChan <-chan struct{}, readyChan chan struct{}, out, errOut io.Writer) (*PortForwarder, error) {
return NewOnAddressesWithContext(wait.ContextForChannel(stopChan), dialer, addresses, ports, readyChan, out, errOut)
}
// NewOnAddressesWithContext creates a new PortForwarder with custom listen addresses.
func NewOnAddressesWithContext(ctx context.Context, dialer httpstream.Dialer, addresses []string, ports []string, readyChan chan struct{}, out, errOut io.Writer) (*PortForwarder, error) {
if len(addresses) == 0 {
return nil, errors.New("you must specify at least 1 address")
}
@@ -181,10 +192,11 @@ func NewOnAddresses(dialer httpstream.Dialer, addresses []string, ports []string
return nil, err
}
return &PortForwarder{
logger: klog.FromContext(ctx),
dialer: dialer,
addresses: parsedAddresses,
ports: parsedPorts,
stopChan: stopChan,
stopChan: ctx.Done(),
Ready: readyChan,
out: out,
errOut: errOut,
@@ -319,7 +331,7 @@ func (pf *PortForwarder) waitForConnection(listener net.Listener, port Forwarded
if err != nil {
// TODO consider using something like https://github.com/hydrogen18/stoppableListener?
if !strings.Contains(strings.ToLower(err.Error()), networkClosedError) {
runtime.HandleError(fmt.Errorf("error accepting connection on port %d: %v", port.Local, err))
runtime.HandleErrorWithLogger(pf.logger, err, "Error accepting connection", "localPort", port.Local)
}
return
}
@@ -354,21 +366,23 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
headers.Set(v1.PortForwardRequestIDHeader, strconv.Itoa(requestID))
errorStream, err := pf.streamConn.CreateStream(headers)
if err != nil {
runtime.HandleError(fmt.Errorf("error creating error stream for port %d -> %d: %v", port.Local, port.Remote, err))
runtime.HandleErrorWithLogger(pf.logger, err, "Error creating error stream", "localPort", port.Local, "remotePort", port.Remote)
return
}
// we're not writing to this stream
errorStream.Close()
defer pf.streamConn.RemoveStreams(errorStream)
errorChan := make(chan error)
type readAllResult struct {
message []byte
err error
}
errorChan := make(chan readAllResult)
go func() {
message, err := io.ReadAll(errorStream)
switch {
case err != nil:
errorChan <- fmt.Errorf("error reading from error stream for port %d -> %d: %v", port.Local, port.Remote, err)
case len(message) > 0:
errorChan <- fmt.Errorf("an error occurred forwarding %d -> %d: %v", port.Local, port.Remote, string(message))
errorChan <- readAllResult{
message: message,
err: err,
}
close(errorChan)
}()
@@ -377,7 +391,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
headers.Set(v1.StreamType, v1.StreamTypeData)
dataStream, err := pf.streamConn.CreateStream(headers)
if err != nil {
runtime.HandleError(fmt.Errorf("error creating forwarding stream for port %d -> %d: %v", port.Local, port.Remote, err))
runtime.HandleErrorWithLogger(pf.logger, err, "Error creating forwarding stream", "localPort", port.Local, "remotePort", port.Remote)
return
}
defer pf.streamConn.RemoveStreams(dataStream)
@@ -388,7 +402,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
go func() {
// Copy from the remote side to the local port.
if _, err := io.Copy(conn, dataStream); err != nil && !strings.Contains(strings.ToLower(err.Error()), networkClosedError) {
runtime.HandleError(fmt.Errorf("error copying from remote stream to local connection: %v", err))
runtime.HandleErrorWithLogger(pf.logger, err, "Error copying from remote stream to local connection", "localPort", port.Local, "remotePort", port.Remote)
}
// inform the select below that the remote copy is done
@@ -401,7 +415,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
// Copy from the local port to the remote side.
if _, err := io.Copy(dataStream, conn); err != nil && !strings.Contains(strings.ToLower(err.Error()), networkClosedError) {
runtime.HandleError(fmt.Errorf("error copying from local connection to remote stream: %v", err))
runtime.HandleErrorWithLogger(pf.logger, err, "Error copying from local connection to remote stream", "localPort", port.Local, "remotePort", port.Remote)
// break out of the select below without waiting for the other copy to finish
close(localError)
}
@@ -418,10 +432,14 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
// the blocking data will affect errorStream and cause <-errorChan to block indefinitely.
_ = dataStream.Reset()
// always expect something on errorChan (it may be nil)
err = <-errorChan
if err != nil {
runtime.HandleError(err)
// always expect something on errorChan (it may be empty)
errResult := <-errorChan
switch {
case errResult.err != nil:
runtime.HandleErrorWithLogger(pf.logger, errResult.err, "Error reading from error stream", "localPort", port.Local, "remotePort", port.Remote)
pf.streamConn.Close()
case len(errResult.message) > 0:
runtime.HandleErrorWithLogger(pf.logger, errors.New(string(errResult.message)), "An error occurred forwarding", "localPort", port.Local, "remotePort", port.Remote)
pf.streamConn.Close()
}
}
@@ -431,7 +449,7 @@ func (pf *PortForwarder) Close() {
// stop all listeners
for _, l := range pf.listeners {
if err := l.Close(); err != nil {
runtime.HandleError(fmt.Errorf("error closing listener: %v", err))
runtime.HandleErrorWithLogger(pf.logger, err, "Error closing listener")
}
}
}

View File

@@ -299,6 +299,7 @@ func TestParsePortsAndNew(t *testing.T) {
var pf *PortForwarder
if len(test.addresses) > 0 {
//nolint:logcheck // Testing the original function.
pf, err = NewOnAddresses(dialer, test.addresses, test.input, expectedStopChan, readyChan, os.Stdout, os.Stderr)
} else {
pf, err = New(dialer, test.input, expectedStopChan, readyChan, os.Stdout, os.Stderr)

View File

@@ -34,7 +34,7 @@ var _ net.Conn = &TunnelingConnection{}
// TunnelingConnection implements the "httpstream.Connection" interface, wrapping
// a websocket connection that tunnels SPDY.
type TunnelingConnection struct {
name string
logger klog.Logger
conn *gwebsocket.Conn
inProgressMessage io.Reader
closeOnce sync.Once
@@ -42,29 +42,46 @@ type TunnelingConnection struct {
// NewTunnelingConnection wraps the passed gorilla/websockets connection
// with the TunnelingConnection struct (implementing net.Conn).
// The name is added to all log entries with [klog.LoggerWithName].
//
//logcheck:context // NewTunnelingConnectionWithLogger should be used instead of NewTunnelingConnection in code which supports contextual logging.
func NewTunnelingConnection(name string, conn *gwebsocket.Conn) *TunnelingConnection {
logger := klog.LoggerWithName(klog.Background(), name)
return NewTunnelingConnectionWithLogger(logger, conn)
}
// NewTunnelingConnectionWithLogger is a variant of NewTunnelingConnection where
// the caller is in control of logging. For example, [klog.LoggerWithName] can be used
// to add a common name for all log entries to identify the connection.
func NewTunnelingConnectionWithLogger(logger klog.Logger, conn *gwebsocket.Conn) *TunnelingConnection {
return &TunnelingConnection{
name: name,
conn: conn,
logger: logger,
conn: conn,
}
}
// Read implements "io.Reader" interface, reading from the stored connection
// into the passed buffer "p". Returns the number of bytes read and an error.
// Can keep track of the "inProgress" messsage from the tunneled connection.
func (c *TunnelingConnection) Read(p []byte) (int, error) {
klog.V(7).Infof("%s: tunneling connection read...", c.name)
defer klog.V(7).Infof("%s: tunneling connection read...complete", c.name)
func (c *TunnelingConnection) Read(p []byte) (len int, err error) {
c.logger.V(7).Info("Tunneling connection read...")
defer func() {
if loggerV := c.logger.V(8); loggerV.Enabled() {
loggerV.Info("Tunneling connection read...complete", "length", len, "data", p[:len], "err", err)
} else {
c.logger.V(7).Info("Tunneling connection read...complete")
}
}()
for {
if c.inProgressMessage == nil {
klog.V(8).Infof("%s: tunneling connection read before NextReader()...", c.name)
c.logger.V(8).Info("Tunneling connection read before NextReader()...")
messageType, nextReader, err := c.conn.NextReader()
if err != nil {
closeError := &gwebsocket.CloseError{}
if errors.As(err, &closeError) && closeError.Code == gwebsocket.CloseNormalClosure {
return 0, io.EOF
}
klog.V(4).Infof("%s:tunneling connection NextReader() error: %v", c.name, err)
c.logger.V(4).Info("Tunneling connection NextReader() failed", "err", err)
return 0, err
}
if messageType != gwebsocket.BinaryMessage {
@@ -72,12 +89,11 @@ func (c *TunnelingConnection) Read(p []byte) (int, error) {
}
c.inProgressMessage = nextReader
}
klog.V(8).Infof("%s: tunneling connection read in progress message...", c.name)
c.logger.V(8).Info("Tunneling connection read in progress...")
i, err := c.inProgressMessage.Read(p)
if i == 0 && err == io.EOF {
c.inProgressMessage = nil
} else {
klog.V(8).Infof("%s: read %d bytes, error=%v, bytes=% X", c.name, i, err, p[:i])
return i, err
}
}
@@ -87,8 +103,8 @@ func (c *TunnelingConnection) Read(p []byte) (int, error) {
// byte array "p" into the stored tunneled connection. Returns the number
// of bytes written and an error.
func (c *TunnelingConnection) Write(p []byte) (n int, err error) {
klog.V(7).Infof("%s: write: %d bytes, bytes=% X", c.name, len(p), p)
defer klog.V(7).Infof("%s: tunneling connection write...complete", c.name)
c.logger.V(7).Info("Tunneling connection write", "length", len(p), "data", p)
defer c.logger.V(7).Info("Tunneling connection write...complete")
w, err := c.conn.NextWriter(gwebsocket.BinaryMessage)
if err != nil {
return 0, err
@@ -111,7 +127,7 @@ func (c *TunnelingConnection) Write(p []byte) (n int, err error) {
func (c *TunnelingConnection) Close() error {
var err error
c.closeOnce.Do(func() {
klog.V(7).Infof("%s: tunneling connection Close()...", c.name)
c.logger.V(7).Info("Tunneling connection Close()...")
// Signal other endpoint that websocket connection is closing; ignore error.
normalCloseMsg := gwebsocket.FormatCloseMessage(gwebsocket.CloseNormalClosure, "")
writeControlErr := c.conn.WriteControl(gwebsocket.CloseMessage, normalCloseMsg, time.Now().Add(time.Second))

View File

@@ -36,8 +36,13 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"
"k8s.io/client-go/transport/websocket"
"k8s.io/klog/v2"
)
func init() {
klog.InitFlags(nil)
}
func TestTunnelingConnection_ReadWriteClose(t *testing.T) {
// Stream channel that will receive streams created on upstream SPDY server.
streamChan := make(chan httpstream.Stream)
@@ -60,7 +65,7 @@ func TestTunnelingConnection_ReadWriteClose(t *testing.T) {
t.Errorf("Not acceptable agreement Subprotocol: %v", conn.Subprotocol())
return
}
tunnelingConn := NewTunnelingConnection("server", conn)
tunnelingConn := NewTunnelingConnectionWithLogger(klog.LoggerWithName(klog.Background(), "server"), conn)
spdyConn, err := spdy.NewServerConnection(tunnelingConn, justQueueStream(streamChan))
if err != nil {
t.Errorf("unexpected error %v", err)
@@ -73,6 +78,7 @@ func TestTunnelingConnection_ReadWriteClose(t *testing.T) {
// Dial the client tunneling connection to the tunneling server.
url, err := url.Parse(tunnelingServer.URL)
require.NoError(t, err)
//nolint:logcheck // Intentionally uses the old API.
dialer, err := NewSPDYOverWebsocketDialer(url, &rest.Config{Host: url.Host})
require.NoError(t, err)
spdyClient, protocol, err := dialer.Dial(constants.PortForwardV1Name)
@@ -205,6 +211,7 @@ func dialForTunnelingConnection(url *url.URL) (*TunnelingConnection, error) {
if err != nil {
return nil, err
}
//nolint:logcheck // Intentionally uses the old API.
return NewTunnelingConnection("client", conn), nil
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package portforward
import (
"context"
"fmt"
"net/http"
"net/url"
@@ -35,6 +36,7 @@ const PingPeriod = 10 * time.Second
// tunnelingDialer implements "httpstream.Dial" interface
type tunnelingDialer struct {
logger klog.Logger
url *url.URL
transport http.RoundTripper
holder websocket.ConnectionHolder
@@ -43,12 +45,22 @@ type tunnelingDialer struct {
// NewTunnelingDialer creates and returns the tunnelingDialer structure which implemements the "httpstream.Dialer"
// interface. The dialer can upgrade a websocket request, creating a websocket connection. This function
// returns an error if one occurs.
//
//logcheck:context // NewSPDYOverWebsocketDialerWithLogger should be used instead of NewSPDYOverWebsocketDialer in code which supports contextual logging.
func NewSPDYOverWebsocketDialer(url *url.URL, config *restclient.Config) (httpstream.Dialer, error) {
return NewSPDYOverWebsocketDialerWithLogger(klog.Background(), url, config)
}
// NewTunnelingDialer creates and returns the tunnelingDialer structure which implemements the "httpstream.Dialer"
// interface. The dialer can upgrade a websocket request, creating a websocket connection. This function
// returns an error if one occurs.
func NewSPDYOverWebsocketDialerWithLogger(logger klog.Logger, url *url.URL, config *restclient.Config) (httpstream.Dialer, error) {
transport, holder, err := websocket.RoundTripperFor(config)
if err != nil {
return nil, err
}
return &tunnelingDialer{
logger: logger,
url: url,
transport: transport,
holder: holder,
@@ -59,9 +71,10 @@ func NewSPDYOverWebsocketDialer(url *url.URL, config *restclient.Config) (httpst
// containing a WebSockets connection (which implements "net.Conn"). Also
// returns the protocol negotiated, or an error.
func (d *tunnelingDialer) Dial(protocols ...string) (httpstream.Connection, string, error) {
// There is no passed context, so skip the context when creating request for now.
// There is no passed context, so use the background context when creating request for now.
ctx := klog.NewContext(context.Background(), d.logger)
// Websockets requires "GET" method: RFC 6455 Sec. 4.1 (page 17).
req, err := http.NewRequest("GET", d.url.String(), nil)
req, err := http.NewRequestWithContext(ctx, "GET", d.url.String(), nil)
if err != nil {
return nil, "", err
}
@@ -72,7 +85,7 @@ func (d *tunnelingDialer) Dial(protocols ...string) (httpstream.Connection, stri
tunnelingProtocol := constants.WebsocketsSPDYTunnelingPrefix + protocol
tunnelingProtocols = append(tunnelingProtocols, tunnelingProtocol)
}
klog.V(4).Infoln("Before WebSocket Upgrade Connection...")
d.logger.V(4).Info("Before WebSocket Upgrade Connection...")
conn, err := websocket.Negotiate(d.transport, d.holder, req, tunnelingProtocols...)
if err != nil {
return nil, "", err
@@ -82,10 +95,10 @@ func (d *tunnelingDialer) Dial(protocols ...string) (httpstream.Connection, stri
}
protocol := conn.Subprotocol()
protocol = strings.TrimPrefix(protocol, constants.WebsocketsSPDYTunnelingPrefix)
klog.V(4).Infof("negotiated protocol: %s", protocol)
d.logger.V(4).Info("Negotiation complete", "protocol", protocol)
// Wrap the websocket connection which implements "net.Conn".
tConn := NewTunnelingConnection("client", conn)
tConn := NewTunnelingConnectionWithLogger(klog.LoggerWithName(d.logger, "client"), conn)
// Create SPDY connection injecting the previously created tunneling connection.
spdyConn, err := spdy.NewClientConnectionWithPings(tConn, PingPeriod)