mirror of
https://github.com/kubernetes/client-go.git
synced 2025-06-30 00:41:53 +00:00
406 lines
15 KiB
Go
406 lines
15 KiB
Go
/*
|
|
Copyright 2023 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package remotecommand
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/rand"
|
|
"crypto/tls"
|
|
"io"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"net/url"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
"k8s.io/apimachinery/pkg/util/httpstream"
|
|
utilnettesting "k8s.io/apimachinery/pkg/util/net/testing"
|
|
"k8s.io/apimachinery/pkg/util/remotecommand"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/client-go/rest"
|
|
)
|
|
|
|
func TestFallbackClient_WebSocketPrimarySucceeds(t *testing.T) {
|
|
// Create fake WebSocket server. Copy received STDIN data back onto STDOUT stream.
|
|
websocketServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
|
conns, err := webSocketServerStreams(req, w, streamOptionsFromRequest(req))
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusForbidden)
|
|
return
|
|
}
|
|
defer conns.conn.Close()
|
|
// Loopback the STDIN stream onto the STDOUT stream.
|
|
_, err = io.Copy(conns.stdoutStream, conns.stdinStream)
|
|
if err != nil {
|
|
t.Errorf("unexpected error %v", err)
|
|
}
|
|
}))
|
|
defer websocketServer.Close()
|
|
|
|
// Now create the fallback client (executor), and point it to the "websocketServer".
|
|
// Must add STDIN and STDOUT query params for the client request.
|
|
websocketServer.URL = websocketServer.URL + "?" + "stdin=true" + "&" + "stdout=true"
|
|
websocketLocation, err := url.Parse(websocketServer.URL)
|
|
require.NoError(t, err)
|
|
websocketExecutor, err := NewWebSocketExecutor(&rest.Config{Host: websocketLocation.Host}, "GET", websocketServer.URL)
|
|
require.NoError(t, err)
|
|
spdyExecutor, err := NewSPDYExecutor(&rest.Config{Host: websocketLocation.Host}, "POST", websocketLocation)
|
|
require.NoError(t, err)
|
|
// Never fallback, so always use the websocketExecutor, which succeeds against websocket server.
|
|
exec, err := NewFallbackExecutor(websocketExecutor, spdyExecutor, func(error) bool { return false })
|
|
require.NoError(t, err)
|
|
// Generate random data, and set it up to stream on STDIN. The data will be
|
|
// returned on the STDOUT buffer.
|
|
randomSize := 1024 * 1024
|
|
randomData := make([]byte, randomSize)
|
|
if _, err := rand.Read(randomData); err != nil {
|
|
t.Errorf("unexpected error reading random data: %v", err)
|
|
}
|
|
var stdout bytes.Buffer
|
|
options := &StreamOptions{
|
|
Stdin: bytes.NewReader(randomData),
|
|
Stdout: &stdout,
|
|
}
|
|
errorChan := make(chan error)
|
|
go func() {
|
|
// Start the streaming on the WebSocket "exec" client.
|
|
errorChan <- exec.StreamWithContext(context.Background(), *options)
|
|
}()
|
|
|
|
select {
|
|
case <-time.After(wait.ForeverTestTimeout):
|
|
t.Fatalf("expect stream to be closed after connection is closed.")
|
|
case err := <-errorChan:
|
|
if err != nil {
|
|
t.Errorf("unexpected error")
|
|
}
|
|
}
|
|
|
|
data, err := io.ReadAll(bytes.NewReader(stdout.Bytes()))
|
|
if err != nil {
|
|
t.Errorf("error reading the stream: %v", err)
|
|
return
|
|
}
|
|
// Check the random data sent on STDIN was the same returned on STDOUT.
|
|
if !bytes.Equal(randomData, data) {
|
|
t.Errorf("unexpected data received: %d sent: %d", len(data), len(randomData))
|
|
}
|
|
}
|
|
|
|
func TestFallbackClient_SPDYSecondarySucceeds(t *testing.T) {
|
|
// Create fake SPDY server. Copy received STDIN data back onto STDOUT stream.
|
|
spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
|
var stdin, stdout bytes.Buffer
|
|
ctx, err := createHTTPStreams(w, req, &StreamOptions{
|
|
Stdin: &stdin,
|
|
Stdout: &stdout,
|
|
})
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusForbidden)
|
|
return
|
|
}
|
|
defer ctx.conn.Close()
|
|
_, err = io.Copy(ctx.stdoutStream, ctx.stdinStream)
|
|
if err != nil {
|
|
t.Fatalf("error copying STDIN to STDOUT: %v", err)
|
|
}
|
|
}))
|
|
defer spdyServer.Close()
|
|
|
|
spdyLocation, err := url.Parse(spdyServer.URL)
|
|
require.NoError(t, err)
|
|
websocketExecutor, err := NewWebSocketExecutor(&rest.Config{Host: spdyLocation.Host}, "GET", spdyServer.URL)
|
|
require.NoError(t, err)
|
|
spdyExecutor, err := NewSPDYExecutor(&rest.Config{Host: spdyLocation.Host}, "POST", spdyLocation)
|
|
require.NoError(t, err)
|
|
// Always fallback to spdyExecutor, and spdyExecutor succeeds against fake spdy server.
|
|
exec, err := NewFallbackExecutor(websocketExecutor, spdyExecutor, func(error) bool { return true })
|
|
require.NoError(t, err)
|
|
// Generate random data, and set it up to stream on STDIN. The data will be
|
|
// returned on the STDOUT buffer.
|
|
randomSize := 1024 * 1024
|
|
randomData := make([]byte, randomSize)
|
|
if _, err := rand.Read(randomData); err != nil {
|
|
t.Errorf("unexpected error reading random data: %v", err)
|
|
}
|
|
var stdout bytes.Buffer
|
|
options := &StreamOptions{
|
|
Stdin: bytes.NewReader(randomData),
|
|
Stdout: &stdout,
|
|
}
|
|
errorChan := make(chan error)
|
|
go func() {
|
|
errorChan <- exec.StreamWithContext(context.Background(), *options)
|
|
}()
|
|
|
|
select {
|
|
case <-time.After(wait.ForeverTestTimeout):
|
|
t.Fatalf("expect stream to be closed after connection is closed.")
|
|
case err := <-errorChan:
|
|
if err != nil {
|
|
t.Errorf("unexpected error")
|
|
}
|
|
}
|
|
|
|
data, err := io.ReadAll(bytes.NewReader(stdout.Bytes()))
|
|
if err != nil {
|
|
t.Errorf("error reading the stream: %v", err)
|
|
return
|
|
}
|
|
// Check the random data sent on STDIN was the same returned on STDOUT.
|
|
if !bytes.Equal(randomData, data) {
|
|
t.Errorf("unexpected data received: %d sent: %d", len(data), len(randomData))
|
|
}
|
|
}
|
|
|
|
func TestFallbackClient_PrimaryAndSecondaryFail(t *testing.T) {
|
|
// Create fake WebSocket server. Copy received STDIN data back onto STDOUT stream.
|
|
websocketServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
|
conns, err := webSocketServerStreams(req, w, streamOptionsFromRequest(req))
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusForbidden)
|
|
return
|
|
}
|
|
defer conns.conn.Close()
|
|
// Loopback the STDIN stream onto the STDOUT stream.
|
|
_, err = io.Copy(conns.stdoutStream, conns.stdinStream)
|
|
if err != nil {
|
|
t.Errorf("unexpected error %v", err)
|
|
}
|
|
}))
|
|
defer websocketServer.Close()
|
|
|
|
// Now create the fallback client (executor), and point it to the "websocketServer".
|
|
// Must add STDIN and STDOUT query params for the client request.
|
|
websocketServer.URL = websocketServer.URL + "?" + "stdin=true" + "&" + "stdout=true"
|
|
websocketLocation, err := url.Parse(websocketServer.URL)
|
|
require.NoError(t, err)
|
|
websocketExecutor, err := NewWebSocketExecutor(&rest.Config{Host: websocketLocation.Host}, "GET", websocketServer.URL)
|
|
require.NoError(t, err)
|
|
spdyExecutor, err := NewSPDYExecutor(&rest.Config{Host: websocketLocation.Host}, "POST", websocketLocation)
|
|
require.NoError(t, err)
|
|
// Always fallback to spdyExecutor, but spdyExecutor fails against websocket server.
|
|
exec, err := NewFallbackExecutor(websocketExecutor, spdyExecutor, func(error) bool { return true })
|
|
require.NoError(t, err)
|
|
// Update the websocket executor to request remote command v4, which is unsupported.
|
|
fallbackExec, ok := exec.(*FallbackExecutor)
|
|
assert.True(t, ok, "error casting executor as FallbackExecutor")
|
|
websocketExec, ok := fallbackExec.primary.(*wsStreamExecutor)
|
|
assert.True(t, ok, "error casting executor as websocket executor")
|
|
// Set the attempted subprotocol version to V4; websocket server only accepts V5.
|
|
websocketExec.protocols = []string{remotecommand.StreamProtocolV4Name}
|
|
|
|
// Generate random data, and set it up to stream on STDIN. The data will be
|
|
// returned on the STDOUT buffer.
|
|
randomSize := 1024 * 1024
|
|
randomData := make([]byte, randomSize)
|
|
if _, err := rand.Read(randomData); err != nil {
|
|
t.Errorf("unexpected error reading random data: %v", err)
|
|
}
|
|
var stdout bytes.Buffer
|
|
options := &StreamOptions{
|
|
Stdin: bytes.NewReader(randomData),
|
|
Stdout: &stdout,
|
|
}
|
|
errorChan := make(chan error)
|
|
go func() {
|
|
errorChan <- exec.StreamWithContext(context.Background(), *options)
|
|
}()
|
|
|
|
select {
|
|
case <-time.After(wait.ForeverTestTimeout):
|
|
t.Fatalf("expect stream to be closed after connection is closed.")
|
|
case err := <-errorChan:
|
|
// Ensure secondary executor returned an error.
|
|
require.Error(t, err)
|
|
}
|
|
}
|
|
|
|
// localhostCert was generated from crypto/tls/generate_cert.go with the following command:
|
|
//
|
|
// go run generate_cert.go --rsa-bits 2048 --host 127.0.0.1,::1,example.com --ca --start-date "Jan 1 00:00:00 1970" --duration=1000000h
|
|
var localhostCert = []byte(`-----BEGIN CERTIFICATE-----
|
|
MIIDGTCCAgGgAwIBAgIRALL5AZcefF4kkYV1SEG6YrMwDQYJKoZIhvcNAQELBQAw
|
|
EjEQMA4GA1UEChMHQWNtZSBDbzAgFw03MDAxMDEwMDAwMDBaGA8yMDg0MDEyOTE2
|
|
MDAwMFowEjEQMA4GA1UEChMHQWNtZSBDbzCCASIwDQYJKoZIhvcNAQEBBQADggEP
|
|
ADCCAQoCggEBALQ/FHcyVwdFHxARbbD2KBtDUT7Eni+8ioNdjtGcmtXqBv45EC1C
|
|
JOqqGJTroFGJ6Q9kQIZ9FqH5IJR2fOOJD9kOTueG4Vt1JY1rj1Kbpjefu8XleZ5L
|
|
SBwIWVnN/lEsEbuKmj7N2gLt5AH3zMZiBI1mg1u9Z5ZZHYbCiTpBrwsq6cTlvR9g
|
|
dyo1YkM5hRESCzsrL0aUByoo0qRMD8ZsgANJwgsiO0/M6idbxDwv1BnGwGmRYvOE
|
|
Hxpy3v0Jg7GJYrvnpnifJTs4nw91N5X9pXxR7FFzi/6HTYDWRljvTb0w6XciKYAz
|
|
bWZ0+cJr5F7wB7ovlbm7HrQIR7z7EIIu2d8CAwEAAaNoMGYwDgYDVR0PAQH/BAQD
|
|
AgKkMBMGA1UdJQQMMAoGCCsGAQUFBwMBMA8GA1UdEwEB/wQFMAMBAf8wLgYDVR0R
|
|
BCcwJYILZXhhbXBsZS5jb22HBH8AAAGHEAAAAAAAAAAAAAAAAAAAAAEwDQYJKoZI
|
|
hvcNAQELBQADggEBAFPPWopNEJtIA2VFAQcqN6uJK+JVFOnjGRoCrM6Xgzdm0wxY
|
|
XCGjsxY5dl+V7KzdGqu858rCaq5osEBqypBpYAnS9C38VyCDA1vPS1PsN8SYv48z
|
|
DyBwj+7R2qar0ADBhnhWxvYO9M72lN/wuCqFKYMeFSnJdQLv3AsrrHe9lYqOa36s
|
|
8wxSwVTFTYXBzljPEnSaaJMPqFD8JXaZK1ryJPkO5OsCNQNGtatNiWAf3DcmwHAT
|
|
MGYMzP0u4nw47aRz9shB8w+taPKHx2BVwE1m/yp3nHVioOjXqA1fwRQVGclCJSH1
|
|
D2iq3hWVHRENgjTjANBPICLo9AZ4JfN6PH19mnU=
|
|
-----END CERTIFICATE-----`)
|
|
|
|
// localhostKey is the private key for localhostCert.
|
|
var localhostKey = []byte(`-----BEGIN RSA PRIVATE KEY-----
|
|
MIIEogIBAAKCAQEAtD8UdzJXB0UfEBFtsPYoG0NRPsSeL7yKg12O0Zya1eoG/jkQ
|
|
LUIk6qoYlOugUYnpD2RAhn0WofkglHZ844kP2Q5O54bhW3UljWuPUpumN5+7xeV5
|
|
nktIHAhZWc3+USwRu4qaPs3aAu3kAffMxmIEjWaDW71nllkdhsKJOkGvCyrpxOW9
|
|
H2B3KjViQzmFERILOysvRpQHKijSpEwPxmyAA0nCCyI7T8zqJ1vEPC/UGcbAaZFi
|
|
84QfGnLe/QmDsYliu+emeJ8lOzifD3U3lf2lfFHsUXOL/odNgNZGWO9NvTDpdyIp
|
|
gDNtZnT5wmvkXvAHui+VubsetAhHvPsQgi7Z3wIDAQABAoIBAGmw93IxjYCQ0ncc
|
|
kSKMJNZfsdtJdaxuNRZ0nNNirhQzR2h403iGaZlEpmdkhzxozsWcto1l+gh+SdFk
|
|
bTUK4MUZM8FlgO2dEqkLYh5BcMT7ICMZvSfJ4v21E5eqR68XVUqQKoQbNvQyxFk3
|
|
EddeEGdNrkb0GDK8DKlBlzAW5ep4gjG85wSTjR+J+muUv3R0BgLBFSuQnIDM/IMB
|
|
LWqsja/QbtB7yppe7jL5u8UCFdZG8BBKT9fcvFIu5PRLO3MO0uOI7LTc8+W1Xm23
|
|
uv+j3SY0+v+6POjK0UlJFFi/wkSPTFIfrQO1qFBkTDQHhQ6q/7GnILYYOiGbIRg2
|
|
NNuP52ECgYEAzXEoy50wSYh8xfFaBuxbm3ruuG2W49jgop7ZfoFrPWwOQKAZS441
|
|
VIwV4+e5IcA6KkuYbtGSdTYqK1SMkgnUyD/VevwAqH5TJoEIGu0pDuKGwVuwqioZ
|
|
frCIAV5GllKyUJ55VZNbRr2vY2fCsWbaCSCHETn6C16DNuTCe5C0JBECgYEA4JqY
|
|
5GpNbMG8fOt4H7hU0Fbm2yd6SHJcQ3/9iimef7xG6ajxsYrIhg1ft+3IPHMjVI0+
|
|
9brwHDnWg4bOOx/VO4VJBt6Dm/F33bndnZRkuIjfSNpLM51P+EnRdaFVHOJHwKqx
|
|
uF69kihifCAG7YATgCveeXImzBUSyZUz9UrETu8CgYARNBimdFNG1RcdvEg9rC0/
|
|
p9u1tfecvNySwZqU7WF9kz7eSonTueTdX521qAHowaAdSpdJMGODTTXaywm6cPhQ
|
|
jIfj9JZZhbqQzt1O4+08Qdvm9TamCUB5S28YLjza+bHU7nBaqixKkDfPqzCyilpX
|
|
yVGGL8SwjwmN3zop/sQXAQKBgC0JMsESQ6YcDsRpnrOVjYQc+LtW5iEitTdfsaID
|
|
iGGKihmOI7B66IxgoCHMTws39wycKdSyADVYr5e97xpR3rrJlgQHmBIrz+Iow7Q2
|
|
LiAGaec8xjl6QK/DdXmFuQBKqyKJ14rljFODP4QuE9WJid94bGqjpf3j99ltznZP
|
|
4J8HAoGAJb4eb4lu4UGwifDzqfAPzLGCoi0fE1/hSx34lfuLcc1G+LEu9YDKoOVJ
|
|
9suOh0b5K/bfEy9KrVMBBriduvdaERSD8S3pkIQaitIz0B029AbE4FLFf9lKQpP2
|
|
KR8NJEkK99Vh/tew6jAMll70xFrE7aF8VLXJVE7w4sQzuvHxl9Q=
|
|
-----END RSA PRIVATE KEY-----
|
|
`)
|
|
|
|
// See (https://github.com/kubernetes/kubernetes/issues/126134).
|
|
func TestFallbackClient_WebSocketHTTPSProxyNoFallback(t *testing.T) {
|
|
cert, err := tls.X509KeyPair(localhostCert, localhostKey)
|
|
if err != nil {
|
|
t.Errorf("https (valid hostname): proxy_test: %v", err)
|
|
}
|
|
|
|
var proxyCalled atomic.Int64
|
|
proxyHandler := utilnettesting.NewHTTPProxyHandler(t, func(req *http.Request) bool {
|
|
proxyCalled.Add(1)
|
|
return true
|
|
})
|
|
defer proxyHandler.Wait()
|
|
|
|
proxyServer := httptest.NewUnstartedServer(proxyHandler)
|
|
proxyServer.TLS = &tls.Config{Certificates: []tls.Certificate{cert}}
|
|
proxyServer.StartTLS()
|
|
defer proxyServer.Close() //nolint:errcheck
|
|
|
|
proxyLocation, err := url.Parse(proxyServer.URL)
|
|
require.NoError(t, err)
|
|
|
|
// Create fake WebSocket server. Copy received STDIN data back onto STDOUT stream.
|
|
websocketServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
|
conns, err := webSocketServerStreams(req, w, streamOptionsFromRequest(req))
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusForbidden)
|
|
return
|
|
}
|
|
defer conns.conn.Close() //nolint:errcheck
|
|
// Loopback the STDIN stream onto the STDOUT stream.
|
|
_, err = io.Copy(conns.stdoutStream, conns.stdinStream)
|
|
if err != nil {
|
|
t.Fatalf("websocket copy error: %v", err)
|
|
}
|
|
}))
|
|
defer websocketServer.Close() //nolint:errcheck
|
|
|
|
// Now create the WebSocket client (executor), and point it to the TLS proxy server.
|
|
// The proxy server should open a websocket connection to the fake websocket server.
|
|
websocketServer.URL = websocketServer.URL + "?" + "stdin=true" + "&" + "stdout=true"
|
|
websocketLocation, err := url.Parse(websocketServer.URL)
|
|
require.NoError(t, err)
|
|
clientConfig := &rest.Config{
|
|
Host: websocketLocation.Host,
|
|
TLSClientConfig: rest.TLSClientConfig{CAData: localhostCert},
|
|
Proxy: func(req *http.Request) (*url.URL, error) {
|
|
return proxyLocation, nil
|
|
},
|
|
}
|
|
websocketExecutor, err := NewWebSocketExecutor(clientConfig, "GET", websocketServer.URL)
|
|
require.NoError(t, err)
|
|
emptyURL, _ := url.Parse("")
|
|
spdyExecutor, err := NewSPDYExecutor(clientConfig, "POST", emptyURL)
|
|
require.NoError(t, err)
|
|
// No fallback to spdyExecutor with websocket.
|
|
sawHTTPSProxyError := false
|
|
exec, err := NewFallbackExecutor(websocketExecutor, spdyExecutor, func(err error) bool {
|
|
if httpstream.IsUpgradeFailure(err) {
|
|
t.Errorf("saw upgrade failure: %v", err)
|
|
return true
|
|
}
|
|
if httpstream.IsHTTPSProxyError(err) {
|
|
sawHTTPSProxyError = true
|
|
t.Logf("saw https proxy error: %v", err)
|
|
return true
|
|
}
|
|
return false
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// Generate random data, and set it up to stream on STDIN. The data will be
|
|
// returned on the STDOUT buffer.
|
|
randomSize := 1024 * 1024
|
|
randomData := make([]byte, randomSize)
|
|
if _, err := rand.Read(randomData); err != nil {
|
|
t.Errorf("unexpected error reading random data: %v", err)
|
|
}
|
|
var stdout bytes.Buffer
|
|
options := &StreamOptions{
|
|
Stdin: bytes.NewReader(randomData),
|
|
Stdout: &stdout,
|
|
}
|
|
errorChan := make(chan error)
|
|
go func() {
|
|
errorChan <- exec.StreamWithContext(context.Background(), *options)
|
|
}()
|
|
|
|
select {
|
|
case <-time.After(wait.ForeverTestTimeout):
|
|
t.Fatalf("expect stream to be closed after connection is closed.")
|
|
case err := <-errorChan:
|
|
if err != nil {
|
|
t.Errorf("unexpected error")
|
|
}
|
|
}
|
|
|
|
data, err := io.ReadAll(bytes.NewReader(stdout.Bytes()))
|
|
if err != nil {
|
|
t.Errorf("error reading the stream: %v", err)
|
|
return
|
|
}
|
|
// Check the random data sent on STDIN was the same returned on STDOUT.
|
|
if !bytes.Equal(randomData, data) {
|
|
t.Errorf("unexpected data received: %d sent: %d", len(data), len(randomData))
|
|
}
|
|
|
|
// Ensure the https proxy error was *not* observed
|
|
if sawHTTPSProxyError {
|
|
t.Errorf("expected to *not* see https proxy error")
|
|
}
|
|
// Ensure the proxy was called once
|
|
if e, a := int64(1), proxyCalled.Load(); e != a {
|
|
t.Errorf("expected %d proxy call, got %d", e, a)
|
|
}
|
|
}
|