Merge pull request #119157 from seans3/websocket-executor

WebSocket Client and V5 RemoteCommand Subprotocol
This commit is contained in:
Kubernetes Prow Robot 2023-09-05 16:20:51 -07:00 committed by GitHub
commit 6013381508
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 2385 additions and 133 deletions

View File

@ -28,7 +28,6 @@
"github.com/gorilla/mux": "unmaintained, archive mode",
"github.com/gorilla/rpc": "unmaintained, archive mode",
"github.com/gorilla/schema": "unmaintained, archive mode",
"github.com/gorilla/websocket": "unmaintained, archive mode",
"github.com/gregjones/httpcache": "unmaintained, archive mode",
"github.com/grpc-ecosystem/go-grpc-prometheus": "unmaintained, archive mode",
"github.com/grpc-ecosystem/grpc-gateway": "use github.com/grpc-ecosystem/grpc-gateway/v2",
@ -143,11 +142,6 @@
"cloud.google.com/go/compute",
"cloud.google.com/go/storage"
],
"github.com/gorilla/websocket": [
"github.com/moby/spdystream",
"github.com/tmc/grpc-websocket-proxy",
"go.etcd.io/etcd/server/v3"
],
"github.com/gregjones/httpcache": [
"k8s.io/client-go"
],

View File

@ -27,6 +27,7 @@ import (
"golang.org/x/net/websocket"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/remotecommand"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/klog/v2"
)
@ -234,10 +235,18 @@ func (conn *Conn) Close() error {
return nil
}
// protocolSupportsStreamClose returns true if the passed protocol
// supports the stream close signal (currently only V5 remotecommand);
// false otherwise.
func protocolSupportsStreamClose(protocol string) bool {
return protocol == remotecommand.StreamProtocolV5Name
}
// handle implements a websocket handler.
func (conn *Conn) handle(ws *websocket.Conn) {
defer conn.Close()
conn.initialize(ws)
supportsStreamClose := protocolSupportsStreamClose(conn.selectedProtocol)
for {
conn.resetTimeout()
@ -251,6 +260,21 @@ func (conn *Conn) handle(ws *websocket.Conn) {
if len(data) == 0 {
continue
}
if supportsStreamClose && data[0] == remotecommand.StreamClose {
if len(data) != 2 {
klog.Errorf("Single channel byte should follow stream close signal. Got %d bytes", len(data)-1)
break
} else {
channel := data[1]
if int(channel) >= len(conn.channels) {
klog.Errorf("Close is targeted for a channel %d that is not valid, possible protocol error", channel)
break
}
klog.V(4).Infof("Received half-close signal from client; close %d stream", channel)
conn.channels[channel].Close() // After first Close, other closes are noop.
}
continue
}
channel := data[0]
if conn.codec == base64Codec {
channel = channel - '0'

View File

@ -16,6 +16,54 @@ limitations under the License.
// Package wsstream contains utilities for streaming content over WebSockets.
// The Conn type allows callers to multiplex multiple read/write channels over
// a single websocket. The Reader type allows an io.Reader to be copied over
// a websocket channel as binary content.
// a single websocket.
//
// "channel.k8s.io"
//
// The Websocket RemoteCommand subprotocol "channel.k8s.io" prepends each binary message with a
// byte indicating the channel number (zero indexed) the message was sent on. Messages in both
// directions should prefix their messages with this channel byte. Used for remote execution,
// the channel numbers are by convention defined to match the POSIX file-descriptors assigned
// to STDIN, STDOUT, and STDERR (0, 1, and 2). No other conversion is performed on the raw
// subprotocol - writes are sent as they are received by the server.
//
// Example client session:
//
// CONNECT http://server.com with subprotocol "channel.k8s.io"
// WRITE []byte{0, 102, 111, 111, 10} # send "foo\n" on channel 0 (STDIN)
// READ []byte{1, 10} # receive "\n" on channel 1 (STDOUT)
// CLOSE
//
// "v2.channel.k8s.io"
//
// The second Websocket subprotocol version "v2.channel.k8s.io" is the same as version 1,
// but it is the first "versioned" subprotocol.
//
// "v3.channel.k8s.io"
//
// The third version of the Websocket RemoteCommand subprotocol adds another channel
// for terminal resizing events. This channel is prepended with the byte '3', and it
// transmits two window sizes (encoding TerminalSize struct) with integers in the range
// (0,65536].
//
// "v4.channel.k8s.io"
//
// The fourth version of the Websocket RemoteCommand subprotocol adds a channel for
// errors. This channel returns structured errors containing process exit codes. The
// error is "apierrors.StatusError{}".
//
// "v5.channel.k8s.io"
//
// The fifth version of the Websocket RemoteCommand subprotocol adds a CLOSE signal,
// which is sent as the first byte of the message. The second byte is the channel
// id. This CLOSE signal is handled by the websocket server by closing the stream,
// allowing the other streams to complete transmission if necessary, and gracefully
// shutdown the connection.
//
// Example client session:
//
// CONNECT http://server.com with subprotocol "v5.channel.k8s.io"
// WRITE []byte{0, 102, 111, 111, 10} # send "foo\n" on channel 0 (STDIN)
// WRITE []byte{255, 0} # send CLOSE signal (STDIN)
// CLOSE
package wsstream // import "k8s.io/apimachinery/pkg/util/httpstream/wsstream"

View File

@ -46,8 +46,22 @@ const (
// adds support for exit codes.
StreamProtocolV4Name = "v4.channel.k8s.io"
// The subprotocol "v5.channel.k8s.io" is used for remote command
// attachment/execution. It is the 5th version of the subprotocol and
// adds support for a CLOSE signal.
StreamProtocolV5Name = "v5.channel.k8s.io"
NonZeroExitCodeReason = metav1.StatusReason("NonZeroExitCode")
ExitCodeCauseType = metav1.CauseType("ExitCode")
// RemoteCommand stream identifiers. The first three identifiers (for STDIN,
// STDOUT, STDERR) are the same as their file descriptors.
StreamStdIn = 0
StreamStdOut = 1
StreamStdErr = 2
StreamErr = 3
StreamResize = 4
StreamClose = 255
)
var SupportedStreamingProtocols = []string{StreamProtocolV4Name, StreamProtocolV3Name, StreamProtocolV2Name, StreamProtocolV1Name}

View File

@ -72,6 +72,7 @@ github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaU
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 h1:pdN6V1QBWetyv/0+wjACpqVH+eVULgEjkurDLq3goeM=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28=

View File

@ -13,6 +13,7 @@ require (
github.com/google/go-cmp v0.5.9
github.com/google/gofuzz v1.2.0
github.com/google/uuid v1.3.0
github.com/gorilla/websocket v1.4.2
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7
github.com/imdario/mergo v0.3.6
github.com/peterbourgon/diskv v2.0.1+incompatible

View File

@ -44,6 +44,7 @@ github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJY
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 h1:pdN6V1QBWetyv/0+wjACpqVH+eVULgEjkurDLq3goeM=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=

View File

@ -18,17 +18,10 @@ package remotecommand
import (
"context"
"fmt"
"io"
"net/http"
"net/url"
"k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/remotecommand"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/transport/spdy"
)
// StreamOptions holds information pertaining to the current streaming session:
@ -63,120 +56,3 @@ type streamCreator interface {
type streamProtocolHandler interface {
stream(conn streamCreator) error
}
// streamExecutor handles transporting standard shell streams over an httpstream connection.
type streamExecutor struct {
upgrader spdy.Upgrader
transport http.RoundTripper
method string
url *url.URL
protocols []string
}
// NewSPDYExecutor connects to the provided server and upgrades the connection to
// multiplexed bidirectional streams.
func NewSPDYExecutor(config *restclient.Config, method string, url *url.URL) (Executor, error) {
wrapper, upgradeRoundTripper, err := spdy.RoundTripperFor(config)
if err != nil {
return nil, err
}
return NewSPDYExecutorForTransports(wrapper, upgradeRoundTripper, method, url)
}
// NewSPDYExecutorForTransports connects to the provided server using the given transport,
// upgrades the response using the given upgrader to multiplexed bidirectional streams.
func NewSPDYExecutorForTransports(transport http.RoundTripper, upgrader spdy.Upgrader, method string, url *url.URL) (Executor, error) {
return NewSPDYExecutorForProtocols(
transport, upgrader, method, url,
remotecommand.StreamProtocolV4Name,
remotecommand.StreamProtocolV3Name,
remotecommand.StreamProtocolV2Name,
remotecommand.StreamProtocolV1Name,
)
}
// NewSPDYExecutorForProtocols connects to the provided server and upgrades the connection to
// multiplexed bidirectional streams using only the provided protocols. Exposed for testing, most
// callers should use NewSPDYExecutor or NewSPDYExecutorForTransports.
func NewSPDYExecutorForProtocols(transport http.RoundTripper, upgrader spdy.Upgrader, method string, url *url.URL, protocols ...string) (Executor, error) {
return &streamExecutor{
upgrader: upgrader,
transport: transport,
method: method,
url: url,
protocols: protocols,
}, nil
}
// Stream opens a protocol streamer to the server and streams until a client closes
// the connection or the server disconnects.
func (e *streamExecutor) Stream(options StreamOptions) error {
return e.StreamWithContext(context.Background(), options)
}
// newConnectionAndStream creates a new SPDY connection and a stream protocol handler upon it.
func (e *streamExecutor) newConnectionAndStream(ctx context.Context, options StreamOptions) (httpstream.Connection, streamProtocolHandler, error) {
req, err := http.NewRequestWithContext(ctx, e.method, e.url.String(), nil)
if err != nil {
return nil, nil, fmt.Errorf("error creating request: %v", err)
}
conn, protocol, err := spdy.Negotiate(
e.upgrader,
&http.Client{Transport: e.transport},
req,
e.protocols...,
)
if err != nil {
return nil, nil, err
}
var streamer streamProtocolHandler
switch protocol {
case remotecommand.StreamProtocolV4Name:
streamer = newStreamProtocolV4(options)
case remotecommand.StreamProtocolV3Name:
streamer = newStreamProtocolV3(options)
case remotecommand.StreamProtocolV2Name:
streamer = newStreamProtocolV2(options)
case "":
klog.V(4).Infof("The server did not negotiate a streaming protocol version. Falling back to %s", remotecommand.StreamProtocolV1Name)
fallthrough
case remotecommand.StreamProtocolV1Name:
streamer = newStreamProtocolV1(options)
}
return conn, streamer, nil
}
// StreamWithContext opens a protocol streamer to the server and streams until a client closes
// the connection or the server disconnects or the context is done.
func (e *streamExecutor) StreamWithContext(ctx context.Context, options StreamOptions) error {
conn, streamer, err := e.newConnectionAndStream(ctx, options)
if err != nil {
return err
}
defer conn.Close()
panicChan := make(chan any, 1)
errorChan := make(chan error, 1)
go func() {
defer func() {
if p := recover(); p != nil {
panicChan <- p
}
}()
errorChan <- streamer.stream(conn)
}()
select {
case p := <-panicChan:
panic(p)
case err := <-errorChan:
return err
case <-ctx.Done():
return ctx.Err()
}
}

View File

@ -0,0 +1,150 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remotecommand
import (
"context"
"fmt"
"net/http"
"net/url"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/remotecommand"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/transport/spdy"
"k8s.io/klog/v2"
)
// spdyStreamExecutor handles transporting standard shell streams over an httpstream connection.
type spdyStreamExecutor struct {
upgrader spdy.Upgrader
transport http.RoundTripper
method string
url *url.URL
protocols []string
}
// NewSPDYExecutor connects to the provided server and upgrades the connection to
// multiplexed bidirectional streams.
func NewSPDYExecutor(config *restclient.Config, method string, url *url.URL) (Executor, error) {
wrapper, upgradeRoundTripper, err := spdy.RoundTripperFor(config)
if err != nil {
return nil, err
}
return NewSPDYExecutorForTransports(wrapper, upgradeRoundTripper, method, url)
}
// NewSPDYExecutorForTransports connects to the provided server using the given transport,
// upgrades the response using the given upgrader to multiplexed bidirectional streams.
func NewSPDYExecutorForTransports(transport http.RoundTripper, upgrader spdy.Upgrader, method string, url *url.URL) (Executor, error) {
return NewSPDYExecutorForProtocols(
transport, upgrader, method, url,
remotecommand.StreamProtocolV5Name,
remotecommand.StreamProtocolV4Name,
remotecommand.StreamProtocolV3Name,
remotecommand.StreamProtocolV2Name,
remotecommand.StreamProtocolV1Name,
)
}
// NewSPDYExecutorForProtocols connects to the provided server and upgrades the connection to
// multiplexed bidirectional streams using only the provided protocols. Exposed for testing, most
// callers should use NewSPDYExecutor or NewSPDYExecutorForTransports.
func NewSPDYExecutorForProtocols(transport http.RoundTripper, upgrader spdy.Upgrader, method string, url *url.URL, protocols ...string) (Executor, error) {
return &spdyStreamExecutor{
upgrader: upgrader,
transport: transport,
method: method,
url: url,
protocols: protocols,
}, nil
}
// Stream opens a protocol streamer to the server and streams until a client closes
// the connection or the server disconnects.
func (e *spdyStreamExecutor) Stream(options StreamOptions) error {
return e.StreamWithContext(context.Background(), options)
}
// newConnectionAndStream creates a new SPDY connection and a stream protocol handler upon it.
func (e *spdyStreamExecutor) newConnectionAndStream(ctx context.Context, options StreamOptions) (httpstream.Connection, streamProtocolHandler, error) {
req, err := http.NewRequestWithContext(ctx, e.method, e.url.String(), nil)
if err != nil {
return nil, nil, fmt.Errorf("error creating request: %v", err)
}
conn, protocol, err := spdy.Negotiate(
e.upgrader,
&http.Client{Transport: e.transport},
req,
e.protocols...,
)
if err != nil {
return nil, nil, err
}
var streamer streamProtocolHandler
switch protocol {
case remotecommand.StreamProtocolV5Name:
streamer = newStreamProtocolV5(options)
case remotecommand.StreamProtocolV4Name:
streamer = newStreamProtocolV4(options)
case remotecommand.StreamProtocolV3Name:
streamer = newStreamProtocolV3(options)
case remotecommand.StreamProtocolV2Name:
streamer = newStreamProtocolV2(options)
case "":
klog.V(4).Infof("The server did not negotiate a streaming protocol version. Falling back to %s", remotecommand.StreamProtocolV1Name)
fallthrough
case remotecommand.StreamProtocolV1Name:
streamer = newStreamProtocolV1(options)
}
return conn, streamer, nil
}
// StreamWithContext opens a protocol streamer to the server and streams until a client closes
// the connection or the server disconnects or the context is done.
func (e *spdyStreamExecutor) StreamWithContext(ctx context.Context, options StreamOptions) error {
conn, streamer, err := e.newConnectionAndStream(ctx, options)
if err != nil {
return err
}
defer conn.Close()
panicChan := make(chan any, 1)
errorChan := make(chan error, 1)
go func() {
defer func() {
if p := recover(); p != nil {
panicChan <- p
}
}()
errorChan <- streamer.stream(conn)
}()
select {
case p := <-panicChan:
panic(p)
case err := <-errorChan:
return err
case <-ctx.Done():
return ctx.Err()
}
}

View File

@ -342,7 +342,7 @@ func TestStreamExitsAfterConnectionIsClosed(t *testing.T) {
if err != nil {
t.Fatal(err)
}
streamExec := exec.(*streamExecutor)
streamExec := exec.(*spdyStreamExecutor)
conn, streamer, err := streamExec.newConnectionAndStream(ctx, options)
if err != nil {

View File

@ -0,0 +1,35 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remotecommand
// streamProtocolV5 add support for V5 of the remote command subprotocol.
// For the streamProtocolHandler, this version is the same as V4.
type streamProtocolV5 struct {
*streamProtocolV4
}
var _ streamProtocolHandler = &streamProtocolV5{}
func newStreamProtocolV5(options StreamOptions) streamProtocolHandler {
return &streamProtocolV5{
streamProtocolV4: newStreamProtocolV4(options).(*streamProtocolV4),
}
}
func (p *streamProtocolV5) stream(conn streamCreator) error {
return p.streamProtocolV4.stream(conn)
}

View File

@ -0,0 +1,485 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remotecommand
import (
"context"
"fmt"
"io"
"net/http"
"sync"
"time"
gwebsocket "github.com/gorilla/websocket"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/remotecommand"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/transport/websocket"
"k8s.io/klog/v2"
)
// writeDeadline defines the time that a write to the websocket connection
// must complete by, otherwise an i/o timeout occurs. The writeDeadline
// has nothing to do with a response from the other websocket connection
// endpoint; only that the message was successfully processed by the
// local websocket connection. The typical write deadline within the websocket
// library is one second.
const writeDeadline = 2 * time.Second
var (
_ Executor = &wsStreamExecutor{}
_ streamCreator = &wsStreamCreator{}
_ httpstream.Stream = &stream{}
streamType2streamID = map[string]byte{
v1.StreamTypeStdin: remotecommand.StreamStdIn,
v1.StreamTypeStdout: remotecommand.StreamStdOut,
v1.StreamTypeStderr: remotecommand.StreamStdErr,
v1.StreamTypeError: remotecommand.StreamErr,
v1.StreamTypeResize: remotecommand.StreamResize,
}
)
const (
// pingPeriod defines how often a heartbeat "ping" message is sent.
pingPeriod = 5 * time.Second
// pingReadDeadline defines the time waiting for a response heartbeat
// "pong" message before a timeout error occurs for websocket reading.
// This duration must always be greater than the "pingPeriod". By defining
// this deadline in terms of the ping period, we are essentially saying
// we can drop "X-1" (e.g. 3-1=2) pings before firing the timeout.
pingReadDeadline = (pingPeriod * 3) + (1 * time.Second)
)
// wsStreamExecutor handles transporting standard shell streams over an httpstream connection.
type wsStreamExecutor struct {
transport http.RoundTripper
upgrader websocket.ConnectionHolder
method string
url string
// requested protocols in priority order (e.g. v5.channel.k8s.io before v4.channel.k8s.io).
protocols []string
// selected protocol from the handshake process; could be empty string if handshake fails.
negotiated string
// period defines how often a "ping" heartbeat message is sent to the other endpoint.
heartbeatPeriod time.Duration
// deadline defines the amount of time before "pong" response must be received.
heartbeatDeadline time.Duration
}
// NewWebSocketExecutor allows to execute commands via a WebSocket connection.
func NewWebSocketExecutor(config *restclient.Config, method, url string) (Executor, error) {
transport, upgrader, err := websocket.RoundTripperFor(config)
if err != nil {
return nil, fmt.Errorf("error creating websocket transports: %v", err)
}
return &wsStreamExecutor{
transport: transport,
upgrader: upgrader,
method: method,
url: url,
// Only supports V5 protocol for correct version skew functionality.
// Previous api servers will proxy upgrade requests to legacy websocket
// servers on container runtimes which support V1-V4. These legacy
// websocket servers will not handle the new CLOSE signal.
protocols: []string{remotecommand.StreamProtocolV5Name},
heartbeatPeriod: pingPeriod,
heartbeatDeadline: pingReadDeadline,
}, nil
}
// Deprecated: use StreamWithContext instead to avoid possible resource leaks.
// See https://github.com/kubernetes/kubernetes/pull/103177 for details.
func (e *wsStreamExecutor) Stream(options StreamOptions) error {
return e.StreamWithContext(context.Background(), options)
}
// StreamWithContext upgrades an HTTPRequest to a WebSocket connection, and starts the various
// goroutines to implement the necessary streams over the connection. The "options" parameter
// defines which streams are requested. Returns an error if one occurred. This method is NOT
// safe to run concurrently with the same executor (because of the state stored in the upgrader).
func (e *wsStreamExecutor) StreamWithContext(ctx context.Context, options StreamOptions) error {
req, err := http.NewRequestWithContext(ctx, e.method, e.url, nil)
if err != nil {
return err
}
conn, err := websocket.Negotiate(e.transport, e.upgrader, req, e.protocols...)
if err != nil {
return err
}
if conn == nil {
panic(fmt.Errorf("websocket connection is nil"))
}
defer conn.Close()
e.negotiated = conn.Subprotocol()
klog.V(4).Infof("The subprotocol is %s", e.negotiated)
var streamer streamProtocolHandler
switch e.negotiated {
case remotecommand.StreamProtocolV5Name:
streamer = newStreamProtocolV5(options)
case remotecommand.StreamProtocolV4Name:
streamer = newStreamProtocolV4(options)
case remotecommand.StreamProtocolV3Name:
streamer = newStreamProtocolV3(options)
case remotecommand.StreamProtocolV2Name:
streamer = newStreamProtocolV2(options)
case "":
klog.V(4).Infof("The server did not negotiate a streaming protocol version. Falling back to %s", remotecommand.StreamProtocolV1Name)
fallthrough
case remotecommand.StreamProtocolV1Name:
streamer = newStreamProtocolV1(options)
}
panicChan := make(chan any, 1)
errorChan := make(chan error, 1)
go func() {
defer func() {
if p := recover(); p != nil {
panicChan <- p
}
}()
creator := newWSStreamCreator(conn)
go creator.readDemuxLoop(
e.upgrader.DataBufferSize(),
e.heartbeatPeriod,
e.heartbeatDeadline,
)
errorChan <- streamer.stream(creator)
}()
select {
case p := <-panicChan:
panic(p)
case err := <-errorChan:
return err
case <-ctx.Done():
return ctx.Err()
}
}
type wsStreamCreator struct {
conn *gwebsocket.Conn
connWriteLock sync.Mutex
streams map[byte]*stream
streamsMu sync.Mutex
}
func newWSStreamCreator(conn *gwebsocket.Conn) *wsStreamCreator {
return &wsStreamCreator{
conn: conn,
streams: map[byte]*stream{},
}
}
func (c *wsStreamCreator) getStream(id byte) *stream {
c.streamsMu.Lock()
defer c.streamsMu.Unlock()
return c.streams[id]
}
func (c *wsStreamCreator) setStream(id byte, s *stream) {
c.streamsMu.Lock()
defer c.streamsMu.Unlock()
c.streams[id] = s
}
// CreateStream uses id from passed headers to create a stream over "c.conn" connection.
// Returns a Stream structure or nil and an error if one occurred.
func (c *wsStreamCreator) CreateStream(headers http.Header) (httpstream.Stream, error) {
streamType := headers.Get(v1.StreamType)
id, ok := streamType2streamID[streamType]
if !ok {
return nil, fmt.Errorf("unknown stream type: %s", streamType)
}
if s := c.getStream(id); s != nil {
return nil, fmt.Errorf("duplicate stream for type %s", streamType)
}
reader, writer := io.Pipe()
s := &stream{
headers: headers,
readPipe: reader,
writePipe: writer,
conn: c.conn,
connWriteLock: &c.connWriteLock,
id: id,
}
c.setStream(id, s)
return s, nil
}
// readDemuxLoop is the reading processor for this endpoint of the websocket
// connection. This loop reads the connection, and demultiplexes the data
// into one of the individual stream pipes (by checking the stream id). This
// loop can *not* be run concurrently, because there can only be one websocket
// connection reader at a time (a read mutex would provide no benefit).
func (c *wsStreamCreator) readDemuxLoop(bufferSize int, period time.Duration, deadline time.Duration) {
// Initialize and start the ping/pong heartbeat.
h := newHeartbeat(c.conn, period, deadline)
// Set initial timeout for websocket connection reading.
if err := c.conn.SetReadDeadline(time.Now().Add(deadline)); err != nil {
klog.Errorf("Websocket initial setting read deadline failed %v", err)
return
}
go h.start()
// Buffer size must correspond to the same size allocated
// for the read buffer during websocket client creation. A
// difference can cause incomplete connection reads.
readBuffer := make([]byte, bufferSize)
for {
// NextReader() only returns data messages (BinaryMessage or Text
// Message). Even though this call will never return control frames
// such as ping, pong, or close, this call is necessary for these
// message types to be processed. There can only be one reader
// at a time, so this reader loop must *not* be run concurrently;
// there is no lock for reading. Calling "NextReader()" before the
// current reader has been processed will close the current reader.
// If the heartbeat read deadline times out, this "NextReader()" will
// return an i/o error, and error handling will clean up.
messageType, r, err := c.conn.NextReader()
if err != nil {
websocketErr, ok := err.(*gwebsocket.CloseError)
if ok && websocketErr.Code == gwebsocket.CloseNormalClosure {
err = nil // readers will get io.EOF as it's a normal closure
} else {
err = fmt.Errorf("next reader: %w", err)
}
c.closeAllStreamReaders(err)
return
}
// All remote command protocols send/receive only binary data messages.
if messageType != gwebsocket.BinaryMessage {
c.closeAllStreamReaders(fmt.Errorf("unexpected message type: %d", messageType))
return
}
// It's ok to read just a single byte because the underlying library wraps the actual
// connection with a buffered reader anyway.
_, err = io.ReadFull(r, readBuffer[:1])
if err != nil {
c.closeAllStreamReaders(fmt.Errorf("read stream id: %w", err))
return
}
streamID := readBuffer[0]
s := c.getStream(streamID)
if s == nil {
klog.Errorf("Unknown stream id %d, discarding message", streamID)
continue
}
for {
nr, errRead := r.Read(readBuffer)
if nr > 0 {
// Write the data to the stream's pipe. This can block.
_, errWrite := s.writePipe.Write(readBuffer[:nr])
if errWrite != nil {
// Pipe must have been closed by the stream user.
// Nothing to do, discard the message.
break
}
}
if errRead != nil {
if errRead == io.EOF {
break
}
c.closeAllStreamReaders(fmt.Errorf("read message: %w", err))
return
}
}
}
}
// closeAllStreamReaders closes readers in all streams.
// This unblocks all stream.Read() calls.
func (c *wsStreamCreator) closeAllStreamReaders(err error) {
c.streamsMu.Lock()
defer c.streamsMu.Unlock()
for _, s := range c.streams {
// Closing writePipe unblocks all readPipe.Read() callers and prevents any future writes.
_ = s.writePipe.CloseWithError(err)
}
}
type stream struct {
headers http.Header
readPipe *io.PipeReader
writePipe *io.PipeWriter
// conn is used for writing directly into the connection.
// Is nil after Close() / Reset() to prevent future writes.
conn *gwebsocket.Conn
// connWriteLock protects conn against concurrent write operations. There must be a single writer and a single reader only.
// The mutex is shared across all streams because the underlying connection is shared.
connWriteLock *sync.Mutex
id byte
}
func (s *stream) Read(p []byte) (n int, err error) {
return s.readPipe.Read(p)
}
// Write writes directly to the underlying WebSocket connection.
func (s *stream) Write(p []byte) (n int, err error) {
klog.V(4).Infof("Write() on stream %d", s.id)
defer klog.V(4).Infof("Write() done on stream %d", s.id)
s.connWriteLock.Lock()
defer s.connWriteLock.Unlock()
if s.conn == nil {
return 0, fmt.Errorf("write on closed stream %d", s.id)
}
err = s.conn.SetWriteDeadline(time.Now().Add(writeDeadline))
if err != nil {
klog.V(7).Infof("Websocket setting write deadline failed %v", err)
return 0, err
}
// Message writer buffers the message data, so we don't need to do that ourselves.
// Just write id and the data as two separate writes to avoid allocating an intermediate buffer.
w, err := s.conn.NextWriter(gwebsocket.BinaryMessage)
if err != nil {
return 0, err
}
defer func() {
if w != nil {
w.Close()
}
}()
_, err = w.Write([]byte{s.id})
if err != nil {
return 0, err
}
n, err = w.Write(p)
if err != nil {
return n, err
}
err = w.Close()
w = nil
return n, err
}
// Close half-closes the stream, indicating this side is finished with the stream.
func (s *stream) Close() error {
klog.V(4).Infof("Close() on stream %d", s.id)
defer klog.V(4).Infof("Close() done on stream %d", s.id)
s.connWriteLock.Lock()
defer s.connWriteLock.Unlock()
if s.conn == nil {
return fmt.Errorf("Close() on already closed stream %d", s.id)
}
// Communicate the CLOSE stream signal to the other websocket endpoint.
err := s.conn.WriteMessage(gwebsocket.BinaryMessage, []byte{remotecommand.StreamClose, s.id})
s.conn = nil
return err
}
func (s *stream) Reset() error {
klog.V(4).Infof("Reset() on stream %d", s.id)
defer klog.V(4).Infof("Reset() done on stream %d", s.id)
s.Close()
return s.writePipe.Close()
}
func (s *stream) Headers() http.Header {
return s.headers
}
func (s *stream) Identifier() uint32 {
return uint32(s.id)
}
// heartbeat encasulates data necessary for the websocket ping/pong heartbeat. This
// heartbeat works by setting a read deadline on the websocket connection, then
// pushing this deadline into the future for every successful heartbeat. If the
// heartbeat "pong" fails to respond within the deadline, then the "NextReader()" call
// inside the "readDemuxLoop" will return an i/o error prompting a connection close
// and cleanup.
type heartbeat struct {
conn *gwebsocket.Conn
// period defines how often a "ping" heartbeat message is sent to the other endpoint
period time.Duration
// closing the "closer" channel will clean up the heartbeat timers
closer chan struct{}
// optional data to send with "ping" message
message []byte
// optionally received data message with "pong" message, same as sent with ping
pongMessage []byte
}
// newHeartbeat creates heartbeat structure encapsulating fields necessary to
// run the websocket connection ping/pong mechanism and sets up handlers on
// the websocket connection.
func newHeartbeat(conn *gwebsocket.Conn, period time.Duration, deadline time.Duration) *heartbeat {
h := &heartbeat{
conn: conn,
period: period,
closer: make(chan struct{}),
}
// Set up handler for receiving returned "pong" message from other endpoint
// by pushing the read deadline into the future. The "msg" received could
// be empty.
h.conn.SetPongHandler(func(msg string) error {
// Push the read deadline into the future.
klog.V(8).Infof("Pong message received (%s)--resetting read deadline", msg)
err := h.conn.SetReadDeadline(time.Now().Add(deadline))
if err != nil {
klog.Errorf("Websocket setting read deadline failed %v", err)
return err
}
if len(msg) > 0 {
h.pongMessage = []byte(msg)
}
return nil
})
// Set up handler to cleanup timers when this endpoint receives "Close" message.
closeHandler := h.conn.CloseHandler()
h.conn.SetCloseHandler(func(code int, text string) error {
close(h.closer)
return closeHandler(code, text)
})
return h
}
// setMessage is optional data sent with "ping" heartbeat. According to the websocket RFC
// this data sent with "ping" message should be returned in "pong" message.
func (h *heartbeat) setMessage(msg string) {
h.message = []byte(msg)
}
// start the heartbeat by setting up necesssary handlers and looping by sending "ping"
// message every "period" until the "closer" channel is closed.
func (h *heartbeat) start() {
// Loop to continually send "ping" message through websocket connection every "period".
t := time.NewTicker(h.period)
defer t.Stop()
for {
select {
case <-h.closer:
klog.V(8).Infof("closed channel--returning")
return
case <-t.C:
// "WriteControl" does not need to be protected by a mutex. According to
// gorilla/websockets library docs: "The Close and WriteControl methods can
// be called concurrently with all other methods."
if err := h.conn.WriteControl(gwebsocket.PingMessage, h.message, time.Now().Add(writeDeadline)); err == nil {
klog.V(8).Infof("Websocket Ping succeeeded")
} else {
klog.Errorf("Websocket Ping failed: %v", err)
// Continue, in case this is a transient failure.
// c.conn.CloseChan above will tell us when the connection is
// actually closed.
}
}
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,166 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package websocket
import (
"crypto/tls"
"fmt"
"net/http"
"net/url"
gwebsocket "github.com/gorilla/websocket"
"k8s.io/apimachinery/pkg/util/httpstream"
utilnet "k8s.io/apimachinery/pkg/util/net"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/transport"
)
var (
_ utilnet.TLSClientConfigHolder = &RoundTripper{}
_ http.RoundTripper = &RoundTripper{}
)
// ConnectionHolder defines functions for structure providing
// access to the websocket connection.
type ConnectionHolder interface {
DataBufferSize() int
Connection() *gwebsocket.Conn
}
// RoundTripper knows how to establish a connection to a remote WebSocket endpoint and make it available for use.
// RoundTripper must not be reused.
type RoundTripper struct {
// TLSConfig holds the TLS configuration settings to use when connecting
// to the remote server.
TLSConfig *tls.Config
// Proxier specifies a function to return a proxy for a given
// Request. If the function returns a non-nil error, the
// request is aborted with the provided error.
// If Proxy is nil or returns a nil *URL, no proxy is used.
Proxier func(req *http.Request) (*url.URL, error)
// Conn holds the WebSocket connection after a round trip.
Conn *gwebsocket.Conn
}
// Connection returns the stored websocket connection.
func (rt *RoundTripper) Connection() *gwebsocket.Conn {
return rt.Conn
}
// DataBufferSize returns the size of buffers for the
// websocket connection.
func (rt *RoundTripper) DataBufferSize() int {
return 32 * 1024
}
// TLSClientConfig implements pkg/util/net.TLSClientConfigHolder.
func (rt *RoundTripper) TLSClientConfig() *tls.Config {
return rt.TLSConfig
}
// RoundTrip connects to the remote websocket using the headers in the request and the TLS
// configuration from the config
func (rt *RoundTripper) RoundTrip(request *http.Request) (retResp *http.Response, retErr error) {
defer func() {
if request.Body != nil {
err := request.Body.Close()
if retErr == nil {
retErr = err
}
}
}()
// set the protocol version directly on the dialer from the header
protocolVersions := request.Header[httpstream.HeaderProtocolVersion]
delete(request.Header, httpstream.HeaderProtocolVersion)
dialer := gwebsocket.Dialer{
Proxy: rt.Proxier,
TLSClientConfig: rt.TLSConfig,
Subprotocols: protocolVersions,
ReadBufferSize: rt.DataBufferSize() + 1024, // add space for the protocol byte indicating which channel the data is for
WriteBufferSize: rt.DataBufferSize() + 1024, // add space for the protocol byte indicating which channel the data is for
}
switch request.URL.Scheme {
case "https":
request.URL.Scheme = "wss"
case "http":
request.URL.Scheme = "ws"
default:
return nil, fmt.Errorf("unknown url scheme: %s", request.URL.Scheme)
}
wsConn, resp, err := dialer.DialContext(request.Context(), request.URL.String(), request.Header)
if err != nil {
if err != gwebsocket.ErrBadHandshake {
return nil, err
}
return nil, fmt.Errorf("unable to upgrade connection: %v", err)
}
rt.Conn = wsConn
return resp, nil
}
// RoundTripperFor transforms the passed rest config into a wrapped roundtripper, as well
// as a pointer to the websocket RoundTripper. The websocket RoundTripper contains the
// websocket connection after RoundTrip() on the wrapper. Returns an error if there is
// a problem creating the round trippers.
func RoundTripperFor(config *restclient.Config) (http.RoundTripper, ConnectionHolder, error) {
transportCfg, err := config.TransportConfig()
if err != nil {
return nil, nil, err
}
tlsConfig, err := transport.TLSConfigFor(transportCfg)
if err != nil {
return nil, nil, err
}
proxy := config.Proxy
if proxy == nil {
proxy = utilnet.NewProxierWithNoProxyCIDR(http.ProxyFromEnvironment)
}
upgradeRoundTripper := &RoundTripper{
TLSConfig: tlsConfig,
Proxier: proxy,
}
wrapper, err := transport.HTTPWrappersForConfig(transportCfg, upgradeRoundTripper)
if err != nil {
return nil, nil, err
}
return wrapper, upgradeRoundTripper, nil
}
// Negotiate opens a connection to a remote server and attempts to negotiate
// a WebSocket connection. Upon success, it returns the negotiated connection.
// The round tripper rt must use the WebSocket round tripper wsRt - see RoundTripperFor.
func Negotiate(rt http.RoundTripper, connectionInfo ConnectionHolder, req *http.Request, protocols ...string) (*gwebsocket.Conn, error) {
req.Header[httpstream.HeaderProtocolVersion] = protocols
resp, err := rt.RoundTrip(req)
if err != nil {
return nil, fmt.Errorf("error sending request: %v", err)
}
err = resp.Body.Close()
if err != nil {
connectionInfo.Connection().Close()
return nil, fmt.Errorf("error closing response body: %v", err)
}
return connectionInfo.Connection(), nil
}

View File

@ -0,0 +1,140 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package websocket
import (
"context"
"io"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
"k8s.io/apimachinery/pkg/util/remotecommand"
restclient "k8s.io/client-go/rest"
)
func TestWebSocketRoundTripper_RoundTripperSucceeds(t *testing.T) {
// Create fake WebSocket server.
websocketServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
conns, err := webSocketServerStreams(req, w)
if err != nil {
t.Fatalf("error on webSocketServerStreams: %v", err)
}
defer conns.conn.Close()
}))
defer websocketServer.Close()
// Create the wrapped roundtripper and websocket upgrade roundtripper and call "RoundTrip()".
websocketLocation, err := url.Parse(websocketServer.URL)
require.NoError(t, err)
req, err := http.NewRequestWithContext(context.Background(), "POST", websocketServer.URL, nil)
require.NoError(t, err)
rt, wsRt, err := RoundTripperFor(&restclient.Config{Host: websocketLocation.Host})
require.NoError(t, err)
requestedProtocol := remotecommand.StreamProtocolV5Name
req.Header[httpstream.HeaderProtocolVersion] = []string{requestedProtocol}
_, err = rt.RoundTrip(req)
require.NoError(t, err)
// WebSocket Connection is stored in websocket RoundTripper.
// Compare the expected negotiated subprotocol with the actual subprotocol.
actualProtocol := wsRt.Connection().Subprotocol()
assert.Equal(t, requestedProtocol, actualProtocol)
}
func TestWebSocketRoundTripper_RoundTripperFails(t *testing.T) {
// Create fake WebSocket server.
websocketServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
conns, err := webSocketServerStreams(req, w)
if err != nil {
t.Fatalf("error on webSocketServerStreams: %v", err)
}
defer conns.conn.Close()
}))
defer websocketServer.Close()
// Create the wrapped roundtripper and websocket upgrade roundtripper and call "RoundTrip()".
websocketLocation, err := url.Parse(websocketServer.URL)
require.NoError(t, err)
req, err := http.NewRequestWithContext(context.Background(), "POST", websocketServer.URL, nil)
require.NoError(t, err)
rt, _, err := RoundTripperFor(&restclient.Config{Host: websocketLocation.Host})
require.NoError(t, err)
// Requested subprotocol version 1 is not supported by test websocket server.
requestedProtocol := remotecommand.StreamProtocolV1Name
req.Header[httpstream.HeaderProtocolVersion] = []string{requestedProtocol}
_, err = rt.RoundTrip(req)
// Ensure a "bad handshake" error is returned, since requested protocol is not supported.
require.Error(t, err)
assert.True(t, strings.Contains(err.Error(), "bad handshake"))
}
func TestWebSocketRoundTripper_NegotiateCreatesConnection(t *testing.T) {
// Create fake WebSocket server.
websocketServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
conns, err := webSocketServerStreams(req, w)
if err != nil {
t.Fatalf("error on webSocketServerStreams: %v", err)
}
defer conns.conn.Close()
}))
defer websocketServer.Close()
// Create the websocket roundtripper and call "Negotiate" to create websocket connection.
websocketLocation, err := url.Parse(websocketServer.URL)
require.NoError(t, err)
req, err := http.NewRequestWithContext(context.Background(), "POST", websocketServer.URL, nil)
require.NoError(t, err)
rt, wsRt, err := RoundTripperFor(&restclient.Config{Host: websocketLocation.Host})
require.NoError(t, err)
requestedProtocol := remotecommand.StreamProtocolV5Name
conn, err := Negotiate(rt, wsRt, req, requestedProtocol)
require.NoError(t, err)
// Compare the expected negotiated subprotocol with the actual subprotocol.
actualProtocol := conn.Subprotocol()
assert.Equal(t, requestedProtocol, actualProtocol)
}
// websocketStreams contains the WebSocket connection and streams from a server.
type websocketStreams struct {
conn io.Closer
}
func webSocketServerStreams(req *http.Request, w http.ResponseWriter) (*websocketStreams, error) {
conn := wsstream.NewConn(map[string]wsstream.ChannelProtocolConfig{
remotecommand.StreamProtocolV5Name: {
Binary: true,
Channels: []wsstream.ChannelType{},
},
})
conn.SetIdleTimeout(4 * time.Hour)
// Opening the connection responds to WebSocket client, negotiating
// the WebSocket upgrade connection and the subprotocol.
_, _, err := conn.Open(w, req)
if err != nil {
return nil, err
}
return &websocketStreams{conn: conn}, nil
}

View File

@ -302,6 +302,7 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 h1:BZHcxBETFHIdVyhyEfOvn/RdU/QGdLI4y34qQGjGWO0=

View File

@ -41,6 +41,7 @@ github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJY
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=

View File

@ -52,6 +52,7 @@ github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJY
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=

View File

@ -58,6 +58,7 @@ github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJY
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4ZsPv9hVvWI6+ch50m39Pf2Ks=
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=

View File

@ -32,6 +32,7 @@ github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN
github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=

View File

@ -63,6 +63,7 @@ require (
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 // indirect
github.com/imdario/mergo v0.3.6 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect

View File

@ -103,6 +103,7 @@ github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaU
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 h1:pdN6V1QBWetyv/0+wjACpqVH+eVULgEjkurDLq3goeM=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=

View File

@ -28,6 +28,7 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect

View File

@ -72,6 +72,7 @@ github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJY
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y=

View File

@ -250,6 +250,7 @@ github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pf
github.com/googleapis/gax-go/v2 v2.1.1/go.mod h1:hddJymUZASv3XPyGkUpKj8pPO47Rmb0eJc8R6ouapiM=
github.com/googleapis/gax-go/v2 v2.7.1 h1:gF4c0zjUP2H/s/hEGyLA3I0fA2ZWjzYiONAD6cvPr8A=
github.com/googleapis/gax-go/v2 v2.7.1/go.mod h1:4orTrqY6hXxxaUL4LHIPl6lGo8vAE38/qKbhSAKP6QI=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=

View File

@ -44,6 +44,7 @@ github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJY
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=

View File

@ -72,6 +72,7 @@ github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaU
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 h1:pdN6V1QBWetyv/0+wjACpqVH+eVULgEjkurDLq3goeM=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28=

View File

@ -45,6 +45,7 @@ github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJY
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28=
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=

1
vendor/modules.txt vendored
View File

@ -1929,6 +1929,7 @@ k8s.io/client-go/tools/remotecommand
k8s.io/client-go/tools/watch
k8s.io/client-go/transport
k8s.io/client-go/transport/spdy
k8s.io/client-go/transport/websocket
k8s.io/client-go/util/cert
k8s.io/client-go/util/certificate
k8s.io/client-go/util/certificate/csr