mirror of
https://github.com/kubernetes/client-go.git
synced 2026-05-14 19:17:56 +00:00
Revert "apimachinery: contextual logging in network util code"
Kubernetes-commit: 9d65b9be20e5ee0a4ef34f0ba071d35987da4ab0
This commit is contained in:
committed by
Kubernetes Publisher
parent
5b20bd05fc
commit
b0c7207279
6
tools/cache/listwatch.go
vendored
6
tools/cache/listwatch.go
vendored
@@ -90,7 +90,7 @@ func ToWatcherWithContext(w Watcher) WatcherWithContext {
|
||||
if w, ok := w.(WatcherWithContext); ok {
|
||||
return w
|
||||
}
|
||||
return &watcherWrapper{
|
||||
return watcherWrapper{
|
||||
parent: w,
|
||||
}
|
||||
}
|
||||
@@ -99,7 +99,7 @@ type watcherWrapper struct {
|
||||
parent Watcher
|
||||
}
|
||||
|
||||
func (l *watcherWrapper) WatchWithContext(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) {
|
||||
func (l watcherWrapper) WatchWithContext(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) {
|
||||
return l.parent.Watch(options)
|
||||
}
|
||||
|
||||
@@ -121,7 +121,7 @@ func ToListerWatcherWithContext(lw ListerWatcher) ListerWatcherWithContext {
|
||||
if lw, ok := lw.(ListerWatcherWithContext); ok {
|
||||
return lw
|
||||
}
|
||||
return &listerWatcherWrapper{
|
||||
return listerWatcherWrapper{
|
||||
ListerWithContext: ToListerWithContext(lw),
|
||||
WatcherWithContext: ToWatcherWithContext(lw),
|
||||
}
|
||||
|
||||
@@ -679,13 +679,11 @@ func (config *inClusterClientConfig) Possible() bool {
|
||||
// to the default config.
|
||||
func BuildConfigFromFlags(masterUrl, kubeconfigPath string) (*restclient.Config, error) {
|
||||
if kubeconfigPath == "" && masterUrl == "" {
|
||||
//nolint:logcheck // A helper function like this should not log. But this is probably part of the the established client-go API and not worth changing.
|
||||
klog.Warning("Neither --kubeconfig nor --master was specified. Using the inClusterConfig. This might not work.")
|
||||
kubeconfig, err := restclient.InClusterConfig()
|
||||
if err == nil {
|
||||
return kubeconfig, nil
|
||||
}
|
||||
//nolint:logcheck // A helper function like this should not log. But this is probably part of the the established client-go API and not worth changing.
|
||||
klog.Warning("error creating inClusterConfig, falling back to default config: ", err)
|
||||
}
|
||||
return NewNonInteractiveDeferredLoadingClientConfig(
|
||||
|
||||
@@ -492,7 +492,6 @@ func getConfigFromFile(filename string) (*clientcmdapi.Config, error) {
|
||||
func GetConfigFromFileOrDie(filename string) *clientcmdapi.Config {
|
||||
config, err := getConfigFromFile(filename)
|
||||
if err != nil {
|
||||
//nolint:logcheck // A helper function like this should not log. But this is probably part of the the established client-go API and not worth changing.
|
||||
klog.FatalDepth(1, err)
|
||||
}
|
||||
|
||||
|
||||
@@ -137,7 +137,6 @@ type WarningHandler func(error)
|
||||
|
||||
func (handler WarningHandler) Warn(err error) {
|
||||
if handler == nil {
|
||||
//nolint:logcheck // This is the fallback when logging is not initialized. With nothing provided, using the global logger is the only option.
|
||||
klog.V(1).Info(err)
|
||||
} else {
|
||||
handler(err)
|
||||
@@ -403,7 +402,6 @@ func LoadFromFile(filename string) (*clientcmdapi.Config, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
//nolint:logcheck // A helper function like this should not log. But this is probably part of the the established client-go API and not worth changing.
|
||||
klog.V(6).Infoln("Config loaded from file: ", filename)
|
||||
|
||||
// set LocationOfOrigin on every Cluster, User, and Context
|
||||
|
||||
@@ -122,7 +122,6 @@ func TestNonExistentCommandLineFile(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
//nolint:logcheck // Tests klog APIs.
|
||||
func TestToleratingMissingFiles(t *testing.T) {
|
||||
envVarValue := "bogus"
|
||||
loadingRules := ClientConfigLoadingRules{
|
||||
@@ -147,7 +146,6 @@ func TestToleratingMissingFiles(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
//nolint:logcheck // Tests klog APIs.
|
||||
func TestWarningMissingFiles(t *testing.T) {
|
||||
envVarValue := "bogus"
|
||||
t.Setenv(RecommendedConfigPathEnvVar, envVarValue)
|
||||
@@ -173,7 +171,6 @@ func TestWarningMissingFiles(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
//nolint:logcheck // Tests klog APIs.
|
||||
func TestNoWarningMissingFiles(t *testing.T) {
|
||||
envVarValue := "bogus"
|
||||
t.Setenv(RecommendedConfigPathEnvVar, envVarValue)
|
||||
|
||||
@@ -118,7 +118,6 @@ func (config *DeferredLoadingClientConfig) ClientConfig() (*restclient.Config, e
|
||||
|
||||
// check for in-cluster configuration and use it
|
||||
if config.icc.Possible() {
|
||||
//nolint:logcheck // A helper function like this should not log. But this is probably part of the the established client-go API and not worth changing.
|
||||
klog.V(4).Infof("Using in-cluster configuration")
|
||||
return config.icc.ClientConfig()
|
||||
}
|
||||
@@ -161,7 +160,6 @@ func (config *DeferredLoadingClientConfig) Namespace() (string, bool, error) {
|
||||
}
|
||||
}
|
||||
|
||||
//nolint:logcheck // A helper function like this should not log. But this is probably part of the the established client-go API and not worth changing.
|
||||
klog.V(4).Infof("Using in-cluster namespace")
|
||||
|
||||
// allow the namespace from the service account token directory to be used.
|
||||
|
||||
@@ -50,7 +50,6 @@ func NewFallbackDialer(primary, secondary httpstream.Dialer, shouldFallback func
|
||||
func (f *FallbackDialer) Dial(protocols ...string) (httpstream.Connection, string, error) {
|
||||
conn, version, err := f.primary.Dial(protocols...)
|
||||
if err != nil && f.shouldFallback(err) {
|
||||
//nolint:logcheck // This code is only used by kubectl where contextual logging is not that useful.
|
||||
klog.V(4).Infof("fallback to secondary dialer from primary dialer err: %v", err)
|
||||
return f.secondary.Dial(protocols...)
|
||||
}
|
||||
|
||||
@@ -40,7 +40,7 @@ func TestFallbackDialer(t *testing.T) {
|
||||
assert.Equal(t, primaryProtocol, negotiated, "primary negotiated protocol returned")
|
||||
require.NoError(t, err, "error from primary dialer should be nil")
|
||||
// If primary dialer error is upgrade error, then fallback returning secondary dial response.
|
||||
primary = &fakeDialer{dialed: false, negotiatedProtocol: primaryProtocol, err: &httpstream.UpgradeFailureError{Cause: fmt.Errorf("fake error")}}
|
||||
primary = &fakeDialer{dialed: false, negotiatedProtocol: primaryProtocol, err: &httpstream.UpgradeFailureError{}}
|
||||
secondary = &fakeDialer{dialed: false, negotiatedProtocol: secondaryProtocol}
|
||||
fallbackDialer = NewFallbackDialer(primary, secondary, httpstream.IsUpgradeFailure)
|
||||
_, negotiated, err = fallbackDialer.Dial(protocols...)
|
||||
|
||||
@@ -17,7 +17,6 @@ limitations under the License.
|
||||
package portforward
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -31,8 +30,6 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/httpstream"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/klog/v2"
|
||||
netutils "k8s.io/utils/net"
|
||||
)
|
||||
|
||||
@@ -55,7 +52,6 @@ type PortForwarder struct {
|
||||
ports []ForwardedPort
|
||||
stopChan <-chan struct{}
|
||||
|
||||
logger klog.Logger
|
||||
dialer httpstream.Dialer
|
||||
streamConn httpstream.Connection
|
||||
listeners []io.Closer
|
||||
@@ -169,14 +165,7 @@ func New(dialer httpstream.Dialer, ports []string, stopChan <-chan struct{}, rea
|
||||
}
|
||||
|
||||
// NewOnAddresses creates a new PortForwarder with custom listen addresses.
|
||||
//
|
||||
//logcheck:context // NewOnAddressesWithContext should be used instead of NewOnAddresses in code which supports contextual logging.
|
||||
func NewOnAddresses(dialer httpstream.Dialer, addresses []string, ports []string, stopChan <-chan struct{}, readyChan chan struct{}, out, errOut io.Writer) (*PortForwarder, error) {
|
||||
return NewOnAddressesWithContext(wait.ContextForChannel(stopChan), dialer, addresses, ports, readyChan, out, errOut)
|
||||
}
|
||||
|
||||
// NewOnAddressesWithContext creates a new PortForwarder with custom listen addresses.
|
||||
func NewOnAddressesWithContext(ctx context.Context, dialer httpstream.Dialer, addresses []string, ports []string, readyChan chan struct{}, out, errOut io.Writer) (*PortForwarder, error) {
|
||||
if len(addresses) == 0 {
|
||||
return nil, errors.New("you must specify at least 1 address")
|
||||
}
|
||||
@@ -192,11 +181,10 @@ func NewOnAddressesWithContext(ctx context.Context, dialer httpstream.Dialer, ad
|
||||
return nil, err
|
||||
}
|
||||
return &PortForwarder{
|
||||
logger: klog.FromContext(ctx),
|
||||
dialer: dialer,
|
||||
addresses: parsedAddresses,
|
||||
ports: parsedPorts,
|
||||
stopChan: ctx.Done(),
|
||||
stopChan: stopChan,
|
||||
Ready: readyChan,
|
||||
out: out,
|
||||
errOut: errOut,
|
||||
@@ -331,7 +319,7 @@ func (pf *PortForwarder) waitForConnection(listener net.Listener, port Forwarded
|
||||
if err != nil {
|
||||
// TODO consider using something like https://github.com/hydrogen18/stoppableListener?
|
||||
if !strings.Contains(strings.ToLower(err.Error()), networkClosedError) {
|
||||
runtime.HandleErrorWithLogger(pf.logger, err, "Error accepting connection", "localPort", port.Local)
|
||||
runtime.HandleError(fmt.Errorf("error accepting connection on port %d: %v", port.Local, err))
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -366,23 +354,21 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
|
||||
headers.Set(v1.PortForwardRequestIDHeader, strconv.Itoa(requestID))
|
||||
errorStream, err := pf.streamConn.CreateStream(headers)
|
||||
if err != nil {
|
||||
runtime.HandleErrorWithLogger(pf.logger, err, "Error creating error stream", "localPort", port.Local, "remotePort", port.Remote)
|
||||
runtime.HandleError(fmt.Errorf("error creating error stream for port %d -> %d: %v", port.Local, port.Remote, err))
|
||||
return
|
||||
}
|
||||
// we're not writing to this stream
|
||||
errorStream.Close()
|
||||
defer pf.streamConn.RemoveStreams(errorStream)
|
||||
|
||||
type readAllResult struct {
|
||||
message []byte
|
||||
err error
|
||||
}
|
||||
errorChan := make(chan readAllResult)
|
||||
errorChan := make(chan error)
|
||||
go func() {
|
||||
message, err := io.ReadAll(errorStream)
|
||||
errorChan <- readAllResult{
|
||||
message: message,
|
||||
err: err,
|
||||
switch {
|
||||
case err != nil:
|
||||
errorChan <- fmt.Errorf("error reading from error stream for port %d -> %d: %v", port.Local, port.Remote, err)
|
||||
case len(message) > 0:
|
||||
errorChan <- fmt.Errorf("an error occurred forwarding %d -> %d: %v", port.Local, port.Remote, string(message))
|
||||
}
|
||||
close(errorChan)
|
||||
}()
|
||||
@@ -391,7 +377,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
|
||||
headers.Set(v1.StreamType, v1.StreamTypeData)
|
||||
dataStream, err := pf.streamConn.CreateStream(headers)
|
||||
if err != nil {
|
||||
runtime.HandleErrorWithLogger(pf.logger, err, "Error creating forwarding stream", "localPort", port.Local, "remotePort", port.Remote)
|
||||
runtime.HandleError(fmt.Errorf("error creating forwarding stream for port %d -> %d: %v", port.Local, port.Remote, err))
|
||||
return
|
||||
}
|
||||
defer pf.streamConn.RemoveStreams(dataStream)
|
||||
@@ -402,7 +388,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
|
||||
go func() {
|
||||
// Copy from the remote side to the local port.
|
||||
if _, err := io.Copy(conn, dataStream); err != nil && !strings.Contains(strings.ToLower(err.Error()), networkClosedError) {
|
||||
runtime.HandleErrorWithLogger(pf.logger, err, "Error copying from remote stream to local connection", "localPort", port.Local, "remotePort", port.Remote)
|
||||
runtime.HandleError(fmt.Errorf("error copying from remote stream to local connection: %v", err))
|
||||
}
|
||||
|
||||
// inform the select below that the remote copy is done
|
||||
@@ -415,7 +401,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
|
||||
|
||||
// Copy from the local port to the remote side.
|
||||
if _, err := io.Copy(dataStream, conn); err != nil && !strings.Contains(strings.ToLower(err.Error()), networkClosedError) {
|
||||
runtime.HandleErrorWithLogger(pf.logger, err, "Error copying from local connection to remote stream", "localPort", port.Local, "remotePort", port.Remote)
|
||||
runtime.HandleError(fmt.Errorf("error copying from local connection to remote stream: %v", err))
|
||||
// break out of the select below without waiting for the other copy to finish
|
||||
close(localError)
|
||||
}
|
||||
@@ -432,14 +418,10 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
|
||||
// the blocking data will affect errorStream and cause <-errorChan to block indefinitely.
|
||||
_ = dataStream.Reset()
|
||||
|
||||
// always expect something on errorChan (it may be empty)
|
||||
errResult := <-errorChan
|
||||
switch {
|
||||
case errResult.err != nil:
|
||||
runtime.HandleErrorWithLogger(pf.logger, errResult.err, "Error reading from error stream", "localPort", port.Local, "remotePort", port.Remote)
|
||||
pf.streamConn.Close()
|
||||
case len(errResult.message) > 0:
|
||||
runtime.HandleErrorWithLogger(pf.logger, errors.New(string(errResult.message)), "An error occurred forwarding", "localPort", port.Local, "remotePort", port.Remote)
|
||||
// always expect something on errorChan (it may be nil)
|
||||
err = <-errorChan
|
||||
if err != nil {
|
||||
runtime.HandleError(err)
|
||||
pf.streamConn.Close()
|
||||
}
|
||||
}
|
||||
@@ -449,7 +431,7 @@ func (pf *PortForwarder) Close() {
|
||||
// stop all listeners
|
||||
for _, l := range pf.listeners {
|
||||
if err := l.Close(); err != nil {
|
||||
runtime.HandleErrorWithLogger(pf.logger, err, "Error closing listener")
|
||||
runtime.HandleError(fmt.Errorf("error closing listener: %v", err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -299,7 +299,6 @@ func TestParsePortsAndNew(t *testing.T) {
|
||||
|
||||
var pf *PortForwarder
|
||||
if len(test.addresses) > 0 {
|
||||
//nolint:logcheck // Testing the original function.
|
||||
pf, err = NewOnAddresses(dialer, test.addresses, test.input, expectedStopChan, readyChan, os.Stdout, os.Stderr)
|
||||
} else {
|
||||
pf, err = New(dialer, test.input, expectedStopChan, readyChan, os.Stdout, os.Stderr)
|
||||
|
||||
@@ -34,7 +34,7 @@ var _ net.Conn = &TunnelingConnection{}
|
||||
// TunnelingConnection implements the "httpstream.Connection" interface, wrapping
|
||||
// a websocket connection that tunnels SPDY.
|
||||
type TunnelingConnection struct {
|
||||
logger klog.Logger
|
||||
name string
|
||||
conn *gwebsocket.Conn
|
||||
inProgressMessage io.Reader
|
||||
closeOnce sync.Once
|
||||
@@ -42,46 +42,29 @@ type TunnelingConnection struct {
|
||||
|
||||
// NewTunnelingConnection wraps the passed gorilla/websockets connection
|
||||
// with the TunnelingConnection struct (implementing net.Conn).
|
||||
// The name is added to all log entries with [klog.LoggerWithName].
|
||||
//
|
||||
//logcheck:context // NewTunnelingConnectionWithLogger should be used instead of NewTunnelingConnection in code which supports contextual logging.
|
||||
func NewTunnelingConnection(name string, conn *gwebsocket.Conn) *TunnelingConnection {
|
||||
logger := klog.LoggerWithName(klog.Background(), name)
|
||||
return NewTunnelingConnectionWithLogger(logger, conn)
|
||||
}
|
||||
|
||||
// NewTunnelingConnectionWithLogger is a variant of NewTunnelingConnection where
|
||||
// the caller is in control of logging. For example, [klog.LoggerWithName] can be used
|
||||
// to add a common name for all log entries to identify the connection.
|
||||
func NewTunnelingConnectionWithLogger(logger klog.Logger, conn *gwebsocket.Conn) *TunnelingConnection {
|
||||
return &TunnelingConnection{
|
||||
logger: logger,
|
||||
conn: conn,
|
||||
name: name,
|
||||
conn: conn,
|
||||
}
|
||||
}
|
||||
|
||||
// Read implements "io.Reader" interface, reading from the stored connection
|
||||
// into the passed buffer "p". Returns the number of bytes read and an error.
|
||||
// Can keep track of the "inProgress" messsage from the tunneled connection.
|
||||
func (c *TunnelingConnection) Read(p []byte) (len int, err error) {
|
||||
c.logger.V(7).Info("Tunneling connection read...")
|
||||
defer func() {
|
||||
if loggerV := c.logger.V(8); loggerV.Enabled() {
|
||||
loggerV.Info("Tunneling connection read...complete", "length", len, "data", p[:len], "err", err)
|
||||
} else {
|
||||
c.logger.V(7).Info("Tunneling connection read...complete")
|
||||
}
|
||||
}()
|
||||
func (c *TunnelingConnection) Read(p []byte) (int, error) {
|
||||
klog.V(7).Infof("%s: tunneling connection read...", c.name)
|
||||
defer klog.V(7).Infof("%s: tunneling connection read...complete", c.name)
|
||||
for {
|
||||
if c.inProgressMessage == nil {
|
||||
c.logger.V(8).Info("Tunneling connection read before NextReader()...")
|
||||
klog.V(8).Infof("%s: tunneling connection read before NextReader()...", c.name)
|
||||
messageType, nextReader, err := c.conn.NextReader()
|
||||
if err != nil {
|
||||
closeError := &gwebsocket.CloseError{}
|
||||
if errors.As(err, &closeError) && closeError.Code == gwebsocket.CloseNormalClosure {
|
||||
return 0, io.EOF
|
||||
}
|
||||
c.logger.V(4).Info("Tunneling connection NextReader() failed", "err", err)
|
||||
klog.V(4).Infof("%s:tunneling connection NextReader() error: %v", c.name, err)
|
||||
return 0, err
|
||||
}
|
||||
if messageType != gwebsocket.BinaryMessage {
|
||||
@@ -89,11 +72,12 @@ func (c *TunnelingConnection) Read(p []byte) (len int, err error) {
|
||||
}
|
||||
c.inProgressMessage = nextReader
|
||||
}
|
||||
c.logger.V(8).Info("Tunneling connection read in progress...")
|
||||
klog.V(8).Infof("%s: tunneling connection read in progress message...", c.name)
|
||||
i, err := c.inProgressMessage.Read(p)
|
||||
if i == 0 && err == io.EOF {
|
||||
c.inProgressMessage = nil
|
||||
} else {
|
||||
klog.V(8).Infof("%s: read %d bytes, error=%v, bytes=% X", c.name, i, err, p[:i])
|
||||
return i, err
|
||||
}
|
||||
}
|
||||
@@ -103,8 +87,8 @@ func (c *TunnelingConnection) Read(p []byte) (len int, err error) {
|
||||
// byte array "p" into the stored tunneled connection. Returns the number
|
||||
// of bytes written and an error.
|
||||
func (c *TunnelingConnection) Write(p []byte) (n int, err error) {
|
||||
c.logger.V(7).Info("Tunneling connection write", "length", len(p), "data", p)
|
||||
defer c.logger.V(7).Info("Tunneling connection write...complete")
|
||||
klog.V(7).Infof("%s: write: %d bytes, bytes=% X", c.name, len(p), p)
|
||||
defer klog.V(7).Infof("%s: tunneling connection write...complete", c.name)
|
||||
w, err := c.conn.NextWriter(gwebsocket.BinaryMessage)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@@ -127,7 +111,7 @@ func (c *TunnelingConnection) Write(p []byte) (n int, err error) {
|
||||
func (c *TunnelingConnection) Close() error {
|
||||
var err error
|
||||
c.closeOnce.Do(func() {
|
||||
c.logger.V(7).Info("Tunneling connection Close()...")
|
||||
klog.V(7).Infof("%s: tunneling connection Close()...", c.name)
|
||||
// Signal other endpoint that websocket connection is closing; ignore error.
|
||||
normalCloseMsg := gwebsocket.FormatCloseMessage(gwebsocket.CloseNormalClosure, "")
|
||||
writeControlErr := c.conn.WriteControl(gwebsocket.CloseMessage, normalCloseMsg, time.Now().Add(time.Second))
|
||||
|
||||
@@ -36,13 +36,8 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/transport/websocket"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
func init() {
|
||||
klog.InitFlags(nil)
|
||||
}
|
||||
|
||||
func TestTunnelingConnection_ReadWriteClose(t *testing.T) {
|
||||
// Stream channel that will receive streams created on upstream SPDY server.
|
||||
streamChan := make(chan httpstream.Stream)
|
||||
@@ -65,7 +60,7 @@ func TestTunnelingConnection_ReadWriteClose(t *testing.T) {
|
||||
t.Errorf("Not acceptable agreement Subprotocol: %v", conn.Subprotocol())
|
||||
return
|
||||
}
|
||||
tunnelingConn := NewTunnelingConnectionWithLogger(klog.LoggerWithName(klog.Background(), "server"), conn)
|
||||
tunnelingConn := NewTunnelingConnection("server", conn)
|
||||
spdyConn, err := spdy.NewServerConnection(tunnelingConn, justQueueStream(streamChan))
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error %v", err)
|
||||
@@ -78,7 +73,6 @@ func TestTunnelingConnection_ReadWriteClose(t *testing.T) {
|
||||
// Dial the client tunneling connection to the tunneling server.
|
||||
url, err := url.Parse(tunnelingServer.URL)
|
||||
require.NoError(t, err)
|
||||
//nolint:logcheck // Intentionally uses the old API.
|
||||
dialer, err := NewSPDYOverWebsocketDialer(url, &rest.Config{Host: url.Host})
|
||||
require.NoError(t, err)
|
||||
spdyClient, protocol, err := dialer.Dial(constants.PortForwardV1Name)
|
||||
@@ -211,7 +205,6 @@ func dialForTunnelingConnection(url *url.URL) (*TunnelingConnection, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
//nolint:logcheck // Intentionally uses the old API.
|
||||
return NewTunnelingConnection("client", conn), nil
|
||||
}
|
||||
|
||||
|
||||
@@ -17,7 +17,6 @@ limitations under the License.
|
||||
package portforward
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
@@ -36,7 +35,6 @@ const PingPeriod = 10 * time.Second
|
||||
|
||||
// tunnelingDialer implements "httpstream.Dial" interface
|
||||
type tunnelingDialer struct {
|
||||
logger klog.Logger
|
||||
url *url.URL
|
||||
transport http.RoundTripper
|
||||
holder websocket.ConnectionHolder
|
||||
@@ -45,22 +43,12 @@ type tunnelingDialer struct {
|
||||
// NewTunnelingDialer creates and returns the tunnelingDialer structure which implemements the "httpstream.Dialer"
|
||||
// interface. The dialer can upgrade a websocket request, creating a websocket connection. This function
|
||||
// returns an error if one occurs.
|
||||
//
|
||||
//logcheck:context // NewSPDYOverWebsocketDialerWithLogger should be used instead of NewSPDYOverWebsocketDialer in code which supports contextual logging.
|
||||
func NewSPDYOverWebsocketDialer(url *url.URL, config *restclient.Config) (httpstream.Dialer, error) {
|
||||
return NewSPDYOverWebsocketDialerWithLogger(klog.Background(), url, config)
|
||||
}
|
||||
|
||||
// NewTunnelingDialer creates and returns the tunnelingDialer structure which implemements the "httpstream.Dialer"
|
||||
// interface. The dialer can upgrade a websocket request, creating a websocket connection. This function
|
||||
// returns an error if one occurs.
|
||||
func NewSPDYOverWebsocketDialerWithLogger(logger klog.Logger, url *url.URL, config *restclient.Config) (httpstream.Dialer, error) {
|
||||
transport, holder, err := websocket.RoundTripperFor(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &tunnelingDialer{
|
||||
logger: logger,
|
||||
url: url,
|
||||
transport: transport,
|
||||
holder: holder,
|
||||
@@ -71,10 +59,9 @@ func NewSPDYOverWebsocketDialerWithLogger(logger klog.Logger, url *url.URL, conf
|
||||
// containing a WebSockets connection (which implements "net.Conn"). Also
|
||||
// returns the protocol negotiated, or an error.
|
||||
func (d *tunnelingDialer) Dial(protocols ...string) (httpstream.Connection, string, error) {
|
||||
// There is no passed context, so use the background context when creating request for now.
|
||||
ctx := klog.NewContext(context.Background(), d.logger)
|
||||
// There is no passed context, so skip the context when creating request for now.
|
||||
// Websockets requires "GET" method: RFC 6455 Sec. 4.1 (page 17).
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", d.url.String(), nil)
|
||||
req, err := http.NewRequest("GET", d.url.String(), nil)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
@@ -85,7 +72,7 @@ func (d *tunnelingDialer) Dial(protocols ...string) (httpstream.Connection, stri
|
||||
tunnelingProtocol := constants.WebsocketsSPDYTunnelingPrefix + protocol
|
||||
tunnelingProtocols = append(tunnelingProtocols, tunnelingProtocol)
|
||||
}
|
||||
d.logger.V(4).Info("Before WebSocket Upgrade Connection...")
|
||||
klog.V(4).Infoln("Before WebSocket Upgrade Connection...")
|
||||
conn, err := websocket.Negotiate(d.transport, d.holder, req, tunnelingProtocols...)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
@@ -95,10 +82,10 @@ func (d *tunnelingDialer) Dial(protocols ...string) (httpstream.Connection, stri
|
||||
}
|
||||
protocol := conn.Subprotocol()
|
||||
protocol = strings.TrimPrefix(protocol, constants.WebsocketsSPDYTunnelingPrefix)
|
||||
d.logger.V(4).Info("Negotiation complete", "protocol", protocol)
|
||||
klog.V(4).Infof("negotiated protocol: %s", protocol)
|
||||
|
||||
// Wrap the websocket connection which implements "net.Conn".
|
||||
tConn := NewTunnelingConnectionWithLogger(klog.LoggerWithName(d.logger, "client"), conn)
|
||||
tConn := NewTunnelingConnection("client", conn)
|
||||
// Create SPDY connection injecting the previously created tunneling connection.
|
||||
spdyConn, err := spdy.NewClientConnectionWithPings(tConn, PingPeriod)
|
||||
|
||||
|
||||
@@ -21,7 +21,6 @@ import (
|
||||
"io"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// errorStreamDecoder interprets the data on the error channel and creates a go error object from it.
|
||||
@@ -33,11 +32,11 @@ type errorStreamDecoder interface {
|
||||
// decodes it with the given errorStreamDecoder, sends the decoded error (or nil if the remote
|
||||
// command exited successfully) to the returned error channel, and closes it.
|
||||
// This function returns immediately.
|
||||
func watchErrorStream(logger klog.Logger, errorStream io.Reader, d errorStreamDecoder) chan error {
|
||||
func watchErrorStream(errorStream io.Reader, d errorStreamDecoder) chan error {
|
||||
errorChan := make(chan error)
|
||||
|
||||
go func() {
|
||||
defer runtime.HandleCrashWithLogger(logger)
|
||||
defer runtime.HandleCrash()
|
||||
|
||||
message, err := io.ReadAll(errorStream)
|
||||
switch {
|
||||
|
||||
@@ -53,7 +53,7 @@ func (f *FallbackExecutor) Stream(options StreamOptions) error {
|
||||
func (f *FallbackExecutor) StreamWithContext(ctx context.Context, options StreamOptions) error {
|
||||
err := f.primary.StreamWithContext(ctx, options)
|
||||
if err != nil && f.shouldFallback(err) {
|
||||
klog.FromContext(ctx).V(4).Info("RemoteCommand fallback", "err", err)
|
||||
klog.V(4).Infof("RemoteCommand fallback: %v", err)
|
||||
return f.secondary.StreamWithContext(ctx, options)
|
||||
}
|
||||
return err
|
||||
|
||||
@@ -22,7 +22,6 @@ import (
|
||||
"net/http"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/httpstream"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// StreamOptions holds information pertaining to the current streaming session:
|
||||
@@ -55,5 +54,5 @@ type streamCreator interface {
|
||||
}
|
||||
|
||||
type streamProtocolHandler interface {
|
||||
stream(logger klog.Logger, conn streamCreator, ready chan<- struct{}) error
|
||||
stream(conn streamCreator, ready chan<- struct{}) error
|
||||
}
|
||||
|
||||
@@ -121,7 +121,6 @@ func (e *spdyStreamExecutor) newConnectionAndStream(ctx context.Context, options
|
||||
|
||||
var streamer streamProtocolHandler
|
||||
|
||||
logger := klog.FromContext(ctx)
|
||||
switch protocol {
|
||||
case remotecommand.StreamProtocolV5Name:
|
||||
streamer = newStreamProtocolV5(options)
|
||||
@@ -132,7 +131,7 @@ func (e *spdyStreamExecutor) newConnectionAndStream(ctx context.Context, options
|
||||
case remotecommand.StreamProtocolV2Name:
|
||||
streamer = newStreamProtocolV2(options)
|
||||
case "":
|
||||
logger.V(4).Info("The server did not negotiate a streaming protocol version, falling back", "protocol", remotecommand.StreamProtocolV1Name)
|
||||
klog.V(4).Infof("The server did not negotiate a streaming protocol version. Falling back to %s", remotecommand.StreamProtocolV1Name)
|
||||
fallthrough
|
||||
case remotecommand.StreamProtocolV1Name:
|
||||
streamer = newStreamProtocolV1(options)
|
||||
@@ -162,7 +161,7 @@ func (e *spdyStreamExecutor) StreamWithContext(ctx context.Context, options Stre
|
||||
// The SPDY executor does not need to synchronize stream creation, so we pass a nil
|
||||
// ready channel. The underlying spdystream library handles stream multiplexing
|
||||
// without a race condition.
|
||||
errorChan <- streamer.stream(klog.FromContext(ctx), conn, nil)
|
||||
errorChan <- streamer.stream(conn, nil)
|
||||
}()
|
||||
|
||||
select {
|
||||
|
||||
@@ -38,7 +38,6 @@ import (
|
||||
remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
)
|
||||
|
||||
type AttachFunc func(in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan TerminalSize) error
|
||||
@@ -329,7 +328,6 @@ func (w *writeDetector) Write(p []byte) (n int, err error) {
|
||||
// and expects the deferred close of the connection leads to the exit of the goroutine on cancellation.
|
||||
// This test verifies that works.
|
||||
func TestStreamExitsAfterConnectionIsClosed(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
writeDetector := newWriterDetector(&fakeEmptyDataPty{})
|
||||
options := StreamOptions{
|
||||
Stdin: &fakeEmptyDataPty{},
|
||||
@@ -354,7 +352,7 @@ func TestStreamExitsAfterConnectionIsClosed(t *testing.T) {
|
||||
|
||||
errorChan := make(chan error)
|
||||
go func() {
|
||||
errorChan <- streamer.stream(logger, conn, nil)
|
||||
errorChan <- streamer.stream(conn, nil)
|
||||
}()
|
||||
|
||||
// Wait until stream goroutine starts.
|
||||
|
||||
@@ -21,7 +21,7 @@ import (
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/httpstream"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
@@ -31,7 +31,6 @@ import (
|
||||
// non-interactive stdin data has ended. See https://issues.k8s.io/13394 and
|
||||
// https://issues.k8s.io/13395 for more details.
|
||||
type streamProtocolV1 struct {
|
||||
logger klog.Logger
|
||||
StreamOptions
|
||||
|
||||
errorStream httpstream.Stream
|
||||
@@ -48,15 +47,15 @@ func newStreamProtocolV1(options StreamOptions) streamProtocolHandler {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *streamProtocolV1) stream(logger klog.Logger, conn streamCreator, ready chan<- struct{}) error {
|
||||
func (p *streamProtocolV1) stream(conn streamCreator, ready chan<- struct{}) error {
|
||||
doneChan := make(chan struct{}, 2)
|
||||
errorChan := make(chan error)
|
||||
|
||||
cp := func(s string, dst io.Writer, src io.Reader) {
|
||||
p.logger.V(6).Info("Copying", "data", s)
|
||||
defer p.logger.V(6).Info("Done copying", "data", s)
|
||||
klog.V(6).Infof("Copying %s", s)
|
||||
defer klog.V(6).Infof("Done copying %s", s)
|
||||
if _, err := io.Copy(dst, src); err != nil && err != io.EOF {
|
||||
p.logger.Error(err, "Error copying", "data", s)
|
||||
klog.Errorf("Error copying %s: %v", s, err)
|
||||
}
|
||||
if s == v1.StreamTypeStdout || s == v1.StreamTypeStderr {
|
||||
doneChan <- struct{}{}
|
||||
|
||||
@@ -22,9 +22,8 @@ import (
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// streamProtocolV2 implements version 2 of the streaming protocol for attach
|
||||
@@ -88,13 +87,13 @@ func (p *streamProtocolV2) createStreams(conn streamCreator) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *streamProtocolV2) copyStdin(logger klog.Logger) {
|
||||
func (p *streamProtocolV2) copyStdin() {
|
||||
if p.Stdin != nil {
|
||||
var once sync.Once
|
||||
|
||||
// copy from client's stdin to container's stdin
|
||||
go func() {
|
||||
defer runtime.HandleCrashWithLogger(logger)
|
||||
defer runtime.HandleCrash()
|
||||
|
||||
// if p.stdin is noninteractive, p.g. `echo abc | kubectl exec -i <pod> -- cat`, make sure
|
||||
// we close remoteStdin as soon as the copy from p.stdin to remoteStdin finishes. Otherwise
|
||||
@@ -102,7 +101,7 @@ func (p *streamProtocolV2) copyStdin(logger klog.Logger) {
|
||||
defer once.Do(func() { p.remoteStdin.Close() })
|
||||
|
||||
if _, err := io.Copy(p.remoteStdin, readerWrapper{p.Stdin}); err != nil {
|
||||
runtime.HandleErrorWithLogger(logger, err, "Copying stdin failed")
|
||||
runtime.HandleError(err)
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -121,26 +120,26 @@ func (p *streamProtocolV2) copyStdin(logger klog.Logger) {
|
||||
// When that happens, we must Close() on our side of remoteStdin, to
|
||||
// allow the copy in hijack to complete, and hijack to return.
|
||||
go func() {
|
||||
defer runtime.HandleCrashWithLogger(logger)
|
||||
defer runtime.HandleCrash()
|
||||
defer once.Do(func() { p.remoteStdin.Close() })
|
||||
|
||||
// this "copy" doesn't actually read anything - it's just here to wait for
|
||||
// the server to close remoteStdin.
|
||||
if _, err := io.Copy(io.Discard, p.remoteStdin); err != nil {
|
||||
runtime.HandleErrorWithLogger(logger, err, "Waiting for server to close stdin failed")
|
||||
runtime.HandleError(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func (p *streamProtocolV2) copyStdout(logger klog.Logger, wg *sync.WaitGroup) {
|
||||
func (p *streamProtocolV2) copyStdout(wg *sync.WaitGroup) {
|
||||
if p.Stdout == nil {
|
||||
return
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer runtime.HandleCrashWithLogger(logger)
|
||||
defer runtime.HandleCrash()
|
||||
defer wg.Done()
|
||||
// make sure, packet in queue can be consumed.
|
||||
// block in queue may lead to deadlock in conn.server
|
||||
@@ -148,29 +147,29 @@ func (p *streamProtocolV2) copyStdout(logger klog.Logger, wg *sync.WaitGroup) {
|
||||
defer io.Copy(io.Discard, p.remoteStdout)
|
||||
|
||||
if _, err := io.Copy(p.Stdout, p.remoteStdout); err != nil {
|
||||
runtime.HandleErrorWithLogger(logger, err, "Copying stdout failed")
|
||||
runtime.HandleError(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (p *streamProtocolV2) copyStderr(logger klog.Logger, wg *sync.WaitGroup) {
|
||||
func (p *streamProtocolV2) copyStderr(wg *sync.WaitGroup) {
|
||||
if p.Stderr == nil || p.Tty {
|
||||
return
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer runtime.HandleCrashWithLogger(logger)
|
||||
defer runtime.HandleCrash()
|
||||
defer wg.Done()
|
||||
defer io.Copy(io.Discard, p.remoteStderr)
|
||||
|
||||
if _, err := io.Copy(p.Stderr, p.remoteStderr); err != nil {
|
||||
runtime.HandleErrorWithLogger(logger, err, "Copying stderr failed")
|
||||
runtime.HandleError(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (p *streamProtocolV2) stream(logger klog.Logger, conn streamCreator, ready chan<- struct{}) error {
|
||||
func (p *streamProtocolV2) stream(conn streamCreator, ready chan<- struct{}) error {
|
||||
if err := p.createStreams(conn); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -182,13 +181,13 @@ func (p *streamProtocolV2) stream(logger klog.Logger, conn streamCreator, ready
|
||||
|
||||
// now that all the streams have been created, proceed with reading & copying
|
||||
|
||||
errorChan := watchErrorStream(logger, p.errorStream, &errorDecoderV2{})
|
||||
errorChan := watchErrorStream(p.errorStream, &errorDecoderV2{})
|
||||
|
||||
p.copyStdin(logger)
|
||||
p.copyStdin()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
p.copyStdout(logger, &wg)
|
||||
p.copyStderr(logger, &wg)
|
||||
p.copyStdout(&wg)
|
||||
p.copyStderr(&wg)
|
||||
|
||||
// we're waiting for stdout/stderr to finish copying
|
||||
wg.Wait()
|
||||
|
||||
@@ -28,7 +28,6 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/httpstream"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
)
|
||||
|
||||
type fakeReader struct {
|
||||
@@ -180,7 +179,6 @@ func TestV2CreateStreams(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestV2ErrorStreamReading(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
tests := []struct {
|
||||
name string
|
||||
stream io.Reader
|
||||
@@ -219,7 +217,7 @@ func TestV2ErrorStreamReading(t *testing.T) {
|
||||
h := newStreamProtocolV2(StreamOptions{}).(*streamProtocolV2)
|
||||
h.errorStream = test.stream
|
||||
|
||||
ch := watchErrorStream(logger, h.errorStream, &errorDecoderV2{})
|
||||
ch := watchErrorStream(h.errorStream, &errorDecoderV2{})
|
||||
if ch == nil {
|
||||
t.Fatalf("%s: unexpected nil channel", test.name)
|
||||
}
|
||||
|
||||
@@ -22,9 +22,8 @@ import (
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// streamProtocolV3 implements version 3 of the streaming protocol for attach
|
||||
@@ -63,12 +62,12 @@ func (p *streamProtocolV3) createStreams(conn streamCreator) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *streamProtocolV3) handleResizes(logger klog.Logger) {
|
||||
func (p *streamProtocolV3) handleResizes() {
|
||||
if p.resizeStream == nil || p.TerminalSizeQueue == nil {
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
defer runtime.HandleCrashWithLogger(logger)
|
||||
defer runtime.HandleCrash()
|
||||
|
||||
encoder := json.NewEncoder(p.resizeStream)
|
||||
for {
|
||||
@@ -77,13 +76,13 @@ func (p *streamProtocolV3) handleResizes(logger klog.Logger) {
|
||||
return
|
||||
}
|
||||
if err := encoder.Encode(&size); err != nil {
|
||||
runtime.HandleErrorWithLogger(logger, err, "Encoding terminal size failed")
|
||||
runtime.HandleError(err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (p *streamProtocolV3) stream(logger klog.Logger, conn streamCreator, ready chan<- struct{}) error {
|
||||
func (p *streamProtocolV3) stream(conn streamCreator, ready chan<- struct{}) error {
|
||||
if err := p.createStreams(conn); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -95,15 +94,15 @@ func (p *streamProtocolV3) stream(logger klog.Logger, conn streamCreator, ready
|
||||
|
||||
// now that all the streams have been created, proceed with reading & copying
|
||||
|
||||
errorChan := watchErrorStream(logger, p.errorStream, &errorDecoderV3{})
|
||||
errorChan := watchErrorStream(p.errorStream, &errorDecoderV3{})
|
||||
|
||||
p.handleResizes(logger)
|
||||
p.handleResizes()
|
||||
|
||||
p.copyStdin(logger)
|
||||
p.copyStdin()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
p.copyStdout(logger, &wg)
|
||||
p.copyStderr(logger, &wg)
|
||||
p.copyStdout(&wg)
|
||||
p.copyStderr(&wg)
|
||||
|
||||
// we're waiting for stdout/stderr to finish copying
|
||||
wg.Wait()
|
||||
|
||||
@@ -26,7 +26,6 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/remotecommand"
|
||||
"k8s.io/client-go/util/exec"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// streamProtocolV4 implements version 4 of the streaming protocol for attach
|
||||
@@ -48,11 +47,11 @@ func (p *streamProtocolV4) createStreams(conn streamCreator) error {
|
||||
return p.streamProtocolV3.createStreams(conn)
|
||||
}
|
||||
|
||||
func (p *streamProtocolV4) handleResizes(logger klog.Logger) {
|
||||
p.streamProtocolV3.handleResizes(logger)
|
||||
func (p *streamProtocolV4) handleResizes() {
|
||||
p.streamProtocolV3.handleResizes()
|
||||
}
|
||||
|
||||
func (p *streamProtocolV4) stream(logger klog.Logger, conn streamCreator, ready chan<- struct{}) error {
|
||||
func (p *streamProtocolV4) stream(conn streamCreator, ready chan<- struct{}) error {
|
||||
if err := p.createStreams(conn); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -64,15 +63,15 @@ func (p *streamProtocolV4) stream(logger klog.Logger, conn streamCreator, ready
|
||||
|
||||
// now that all the streams have been created, proceed with reading & copying
|
||||
|
||||
errorChan := watchErrorStream(logger, p.errorStream, &errorDecoderV4{})
|
||||
errorChan := watchErrorStream(p.errorStream, &errorDecoderV4{})
|
||||
|
||||
p.handleResizes(logger)
|
||||
p.handleResizes()
|
||||
|
||||
p.copyStdin(logger)
|
||||
p.copyStdin()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
p.copyStdout(logger, &wg)
|
||||
p.copyStderr(logger, &wg)
|
||||
p.copyStdout(&wg)
|
||||
p.copyStderr(&wg)
|
||||
|
||||
// we're waiting for stdout/stderr to finish copying
|
||||
wg.Wait()
|
||||
|
||||
@@ -16,8 +16,6 @@ limitations under the License.
|
||||
|
||||
package remotecommand
|
||||
|
||||
import "k8s.io/klog/v2"
|
||||
|
||||
// streamProtocolV5 add support for V5 of the remote command subprotocol.
|
||||
// For the streamProtocolHandler, this version is the same as V4.
|
||||
type streamProtocolV5 struct {
|
||||
@@ -32,6 +30,6 @@ func newStreamProtocolV5(options StreamOptions) streamProtocolHandler {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *streamProtocolV5) stream(logger klog.Logger, conn streamCreator, ready chan<- struct{}) error {
|
||||
return p.streamProtocolV4.stream(logger, conn, ready)
|
||||
func (p *streamProtocolV5) stream(conn streamCreator, ready chan<- struct{}) error {
|
||||
return p.streamProtocolV4.stream(conn, ready)
|
||||
}
|
||||
|
||||
@@ -130,8 +130,7 @@ func (e *wsStreamExecutor) StreamWithContext(ctx context.Context, options Stream
|
||||
}
|
||||
defer conn.Close()
|
||||
e.negotiated = conn.Subprotocol()
|
||||
logger := klog.FromContext(ctx)
|
||||
logger.V(4).Info("Subprotocol negotiated", "protocol", e.negotiated)
|
||||
klog.V(4).Infof("The subprotocol is %s", e.negotiated)
|
||||
|
||||
var streamer streamProtocolHandler
|
||||
switch e.negotiated {
|
||||
@@ -144,7 +143,7 @@ func (e *wsStreamExecutor) StreamWithContext(ctx context.Context, options Stream
|
||||
case remotecommand.StreamProtocolV2Name:
|
||||
streamer = newStreamProtocolV2(options)
|
||||
case "":
|
||||
logger.V(4).Info("The server did not negotiate a streaming protocol version, falling back", "protocol", remotecommand.StreamProtocolV1Name)
|
||||
klog.V(4).Infof("The server did not negotiate a streaming protocol version. Falling back to %s", remotecommand.StreamProtocolV1Name)
|
||||
fallthrough
|
||||
case remotecommand.StreamProtocolV1Name:
|
||||
streamer = newStreamProtocolV1(options)
|
||||
@@ -160,7 +159,7 @@ func (e *wsStreamExecutor) StreamWithContext(ctx context.Context, options Stream
|
||||
}()
|
||||
|
||||
readyChan := make(chan struct{})
|
||||
creator := newWSStreamCreator(logger, conn)
|
||||
creator := newWSStreamCreator(conn)
|
||||
go func() {
|
||||
select {
|
||||
// Wait until all streams have been created before starting the readDemuxLoop.
|
||||
@@ -178,7 +177,7 @@ func (e *wsStreamExecutor) StreamWithContext(ctx context.Context, options Stream
|
||||
e.heartbeatDeadline,
|
||||
)
|
||||
}()
|
||||
errorChan <- streamer.stream(logger, creator, readyChan)
|
||||
errorChan <- streamer.stream(creator, readyChan)
|
||||
}()
|
||||
|
||||
select {
|
||||
@@ -192,8 +191,7 @@ func (e *wsStreamExecutor) StreamWithContext(ctx context.Context, options Stream
|
||||
}
|
||||
|
||||
type wsStreamCreator struct {
|
||||
logger klog.Logger
|
||||
conn *gwebsocket.Conn
|
||||
conn *gwebsocket.Conn
|
||||
// Protects writing to websocket connection; reading is lock-free
|
||||
connWriteLock sync.Mutex
|
||||
// map of stream id to stream; multiple streams read/write the connection
|
||||
@@ -204,9 +202,8 @@ type wsStreamCreator struct {
|
||||
setStreamErr error
|
||||
}
|
||||
|
||||
func newWSStreamCreator(logger klog.Logger, conn *gwebsocket.Conn) *wsStreamCreator {
|
||||
func newWSStreamCreator(conn *gwebsocket.Conn) *wsStreamCreator {
|
||||
return &wsStreamCreator{
|
||||
logger: logger,
|
||||
conn: conn,
|
||||
streams: map[byte]*stream{},
|
||||
}
|
||||
@@ -241,7 +238,6 @@ func (c *wsStreamCreator) CreateStream(headers http.Header) (httpstream.Stream,
|
||||
}
|
||||
reader, writer := io.Pipe()
|
||||
s := &stream{
|
||||
logger: klog.LoggerWithValues(c.logger, "id", id),
|
||||
headers: headers,
|
||||
readPipe: reader,
|
||||
writePipe: writer,
|
||||
@@ -264,11 +260,11 @@ func (c *wsStreamCreator) CreateStream(headers http.Header) (httpstream.Stream,
|
||||
// connection reader at a time (a read mutex would provide no benefit).
|
||||
func (c *wsStreamCreator) readDemuxLoop(bufferSize int, period time.Duration, deadline time.Duration) {
|
||||
// Initialize and start the ping/pong heartbeat.
|
||||
h := newHeartbeat(c.logger, c.conn, period, deadline)
|
||||
h := newHeartbeat(c.conn, period, deadline)
|
||||
// Set initial timeout for websocket connection reading.
|
||||
c.logger.V(5).Info("Websocket read starts", "deadline", deadline)
|
||||
klog.V(5).Infof("Websocket initial read deadline: %s", deadline)
|
||||
if err := c.conn.SetReadDeadline(time.Now().Add(deadline)); err != nil {
|
||||
c.logger.Error(err, "Websocket initial setting read deadline failed")
|
||||
klog.Errorf("Websocket initial setting read deadline failed %v", err)
|
||||
return
|
||||
}
|
||||
go h.start()
|
||||
@@ -312,7 +308,7 @@ func (c *wsStreamCreator) readDemuxLoop(bufferSize int, period time.Duration, de
|
||||
streamID := readBuffer[0]
|
||||
s := c.getStream(streamID)
|
||||
if s == nil {
|
||||
c.logger.Error(nil, "Unknown stream, discarding message", "id", streamID)
|
||||
klog.Errorf("Unknown stream id %d, discarding message", streamID)
|
||||
continue
|
||||
}
|
||||
for {
|
||||
@@ -355,7 +351,6 @@ func (c *wsStreamCreator) closeAllStreamReaders(err error) {
|
||||
}
|
||||
|
||||
type stream struct {
|
||||
logger klog.Logger
|
||||
headers http.Header
|
||||
readPipe *io.PipeReader
|
||||
writePipe *io.PipeWriter
|
||||
@@ -374,8 +369,8 @@ func (s *stream) Read(p []byte) (n int, err error) {
|
||||
|
||||
// Write writes directly to the underlying WebSocket connection.
|
||||
func (s *stream) Write(p []byte) (n int, err error) {
|
||||
s.logger.V(8).Info("Write() on stream")
|
||||
defer s.logger.V(8).Info("Write() done on stream")
|
||||
klog.V(8).Infof("Write() on stream %d", s.id)
|
||||
defer klog.V(8).Infof("Write() done on stream %d", s.id)
|
||||
s.connWriteLock.Lock()
|
||||
defer s.connWriteLock.Unlock()
|
||||
if s.conn == nil {
|
||||
@@ -383,7 +378,7 @@ func (s *stream) Write(p []byte) (n int, err error) {
|
||||
}
|
||||
err = s.conn.SetWriteDeadline(time.Now().Add(writeDeadline))
|
||||
if err != nil {
|
||||
s.logger.V(4).Info("Websocket setting write deadline failed", "err", err)
|
||||
klog.V(4).Infof("Websocket setting write deadline failed %v", err)
|
||||
return 0, err
|
||||
}
|
||||
// Message writer buffers the message data, so we don't need to do that ourselves.
|
||||
@@ -412,8 +407,8 @@ func (s *stream) Write(p []byte) (n int, err error) {
|
||||
|
||||
// Close half-closes the stream, indicating this side is finished with the stream.
|
||||
func (s *stream) Close() error {
|
||||
s.logger.V(6).Info("Close() on stream")
|
||||
defer s.logger.V(6).Info("Close() done on stream")
|
||||
klog.V(6).Infof("Close() on stream %d", s.id)
|
||||
defer klog.V(6).Infof("Close() done on stream %d", s.id)
|
||||
s.connWriteLock.Lock()
|
||||
defer s.connWriteLock.Unlock()
|
||||
if s.conn == nil {
|
||||
@@ -426,8 +421,8 @@ func (s *stream) Close() error {
|
||||
}
|
||||
|
||||
func (s *stream) Reset() error {
|
||||
s.logger.V(4).Info("Reset() on stream")
|
||||
defer s.logger.V(4).Info("Reset() done on stream")
|
||||
klog.V(4).Infof("Reset() on stream %d", s.id)
|
||||
defer klog.V(4).Infof("Reset() done on stream %d", s.id)
|
||||
s.Close()
|
||||
return s.writePipe.Close()
|
||||
}
|
||||
@@ -447,8 +442,7 @@ func (s *stream) Identifier() uint32 {
|
||||
// inside the "readDemuxLoop" will return an i/o error prompting a connection close
|
||||
// and cleanup.
|
||||
type heartbeat struct {
|
||||
logger klog.Logger
|
||||
conn *gwebsocket.Conn
|
||||
conn *gwebsocket.Conn
|
||||
// period defines how often a "ping" heartbeat message is sent to the other endpoint
|
||||
period time.Duration
|
||||
// closing the "closer" channel will clean up the heartbeat timers
|
||||
@@ -462,9 +456,8 @@ type heartbeat struct {
|
||||
// newHeartbeat creates heartbeat structure encapsulating fields necessary to
|
||||
// run the websocket connection ping/pong mechanism and sets up handlers on
|
||||
// the websocket connection.
|
||||
func newHeartbeat(logger klog.Logger, conn *gwebsocket.Conn, period time.Duration, deadline time.Duration) *heartbeat {
|
||||
func newHeartbeat(conn *gwebsocket.Conn, period time.Duration, deadline time.Duration) *heartbeat {
|
||||
h := &heartbeat{
|
||||
logger: logger,
|
||||
conn: conn,
|
||||
period: period,
|
||||
closer: make(chan struct{}),
|
||||
@@ -474,10 +467,10 @@ func newHeartbeat(logger klog.Logger, conn *gwebsocket.Conn, period time.Duratio
|
||||
// be empty.
|
||||
h.conn.SetPongHandler(func(msg string) error {
|
||||
// Push the read deadline into the future.
|
||||
logger.V(6).Info("Pong message received -- resetting read deadline", "message", msg)
|
||||
klog.V(6).Infof("Pong message received (%s)--resetting read deadline", msg)
|
||||
err := h.conn.SetReadDeadline(time.Now().Add(deadline))
|
||||
if err != nil {
|
||||
logger.Error(err, "Websocket setting read deadline failed")
|
||||
klog.Errorf("Websocket setting read deadline failed %v", err)
|
||||
return err
|
||||
}
|
||||
if len(msg) > 0 {
|
||||
@@ -509,16 +502,16 @@ func (h *heartbeat) start() {
|
||||
for {
|
||||
select {
|
||||
case <-h.closer:
|
||||
h.logger.V(5).Info("Closed channel -- returning")
|
||||
klog.V(5).Infof("closed channel--returning")
|
||||
return
|
||||
case <-t.C:
|
||||
// "WriteControl" does not need to be protected by a mutex. According to
|
||||
// gorilla/websockets library docs: "The Close and WriteControl methods can
|
||||
// be called concurrently with all other methods."
|
||||
if err := h.conn.WriteControl(gwebsocket.PingMessage, h.message, time.Now().Add(pingReadDeadline)); err == nil {
|
||||
h.logger.V(6).Info("Websocket Ping succeeeded")
|
||||
klog.V(6).Infof("Websocket Ping succeeeded")
|
||||
} else {
|
||||
h.logger.Error(err, "Websocket Ping failed")
|
||||
klog.Errorf("Websocket Ping failed: %v", err)
|
||||
if errors.Is(err, gwebsocket.ErrCloseSent) {
|
||||
// we continue because c.conn.CloseChan will manage closing the connection already
|
||||
continue
|
||||
|
||||
@@ -48,7 +48,6 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/rest"
|
||||
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
)
|
||||
|
||||
// TestWebSocketClient_LoopbackStdinToStdout returns random data sent on the STDIN channel
|
||||
@@ -1050,7 +1049,6 @@ func TestWebSocketClient_ExecutorErrors(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestWebSocketClient_HeartbeatSucceeds(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
var upgrader = gwebsocket.Upgrader{
|
||||
CheckOrigin: func(r *http.Request) bool {
|
||||
return true // Accepting all requests
|
||||
@@ -1083,7 +1081,7 @@ func TestWebSocketClient_HeartbeatSucceeds(t *testing.T) {
|
||||
var expectedMsg = "test heartbeat message"
|
||||
var period = 100 * time.Millisecond
|
||||
var deadline = 200 * time.Millisecond
|
||||
heartbeat := newHeartbeat(logger, client, period, deadline)
|
||||
heartbeat := newHeartbeat(client, period, deadline)
|
||||
heartbeat.setMessage(expectedMsg)
|
||||
// Add a channel to the handler to retrieve the "pong" message.
|
||||
pongMsgCh := make(chan string)
|
||||
@@ -1123,8 +1121,7 @@ func TestWebSocketClient_HeartbeatSucceeds(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestLateStreamCreation(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
c := newWSStreamCreator(logger, nil)
|
||||
c := newWSStreamCreator(nil)
|
||||
c.closeAllStreamReaders(nil)
|
||||
if err := c.setStream(0, nil); err == nil {
|
||||
t.Fatal("expected error adding stream after closeAllStreamReaders")
|
||||
@@ -1132,10 +1129,8 @@ func TestLateStreamCreation(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestWebSocketClient_StreamsAndExpectedErrors(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
|
||||
// Validate Stream functions.
|
||||
c := newWSStreamCreator(logger, nil)
|
||||
c := newWSStreamCreator(nil)
|
||||
headers := http.Header{}
|
||||
headers.Set(v1.StreamType, v1.StreamTypeStdin)
|
||||
s, err := c.CreateStream(headers)
|
||||
|
||||
Reference in New Issue
Block a user