Merge pull request #102297 from deads2k/ssh-tunnels

remove --ssh- options, deprecated 13 releases, that only work on GCE
This commit is contained in:
Kubernetes Prow Robot 2021-06-05 10:40:50 -07:00 committed by GitHub
commit 74af3b712d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 14 additions and 1390 deletions

View File

@ -276,23 +276,12 @@ function start-kube-apiserver {
fi
if [[ -n "${MASTER_ADVERTISE_ADDRESS:-}" ]]; then
params+=" --advertise-address=${MASTER_ADVERTISE_ADDRESS}"
if [[ -n "${PROXY_SSH_USER:-}" ]]; then
if [[ -n "${KUBE_API_SERVER_RUNASUSER:-}" && -n "${KUBE_API_SERVER_RUNASGROUP:-}" ]]; then
chown -R "${KUBE_API_SERVER_RUNASUSER}":"${KUBE_API_SERVER_RUNASGROUP}" /etc/srv/sshproxy/
fi
params+=" --ssh-user=${PROXY_SSH_USER}"
params+=" --ssh-keyfile=/etc/srv/sshproxy/.sshkeyfile"
fi
elif [[ -n "${PROJECT_ID:-}" && -n "${TOKEN_URL:-}" && -n "${TOKEN_BODY:-}" && -n "${NODE_NETWORK:-}" ]]; then
local -r vm_external_ip=$(get-metadata-value "instance/network-interfaces/0/access-configs/0/external-ip")
if [[ -n "${PROXY_SSH_USER:-}" ]]; then
params+=" --advertise-address=${vm_external_ip}"
if [[ -n "${KUBE_API_SERVER_RUNASUSER:-}" && -n "${KUBE_API_SERVER_RUNASGROUP:-}" ]]; then
chown -R "${KUBE_API_SERVER_RUNASUSER}":"${KUBE_API_SERVER_RUNASGROUP}" /etc/srv/sshproxy/
fi
params+=" --advertise-address=${vm_external_ip}"
params+=" --ssh-user=${PROXY_SSH_USER}"
params+=" --ssh-keyfile=/etc/srv/sshproxy/.sshkeyfile"
fi
fi
local webhook_authn_config_mount=""

View File

@ -75,8 +75,6 @@ type ServerRunOptions struct {
APIServerServiceIP net.IP
ServiceNodePortRange utilnet.PortRange
SSHKeyfile string
SSHUser string
ProxyClientCertFile string
ProxyClientKeyFile string
@ -196,16 +194,6 @@ func (s *ServerRunOptions) Flags() (fss cliflag.NamedFlagSets) {
"If true, install a /logs handler for the apiserver logs.")
fs.MarkDeprecated("enable-logs-handler", "This flag will be removed in v1.19")
// Deprecated in release 1.9
fs.StringVar(&s.SSHUser, "ssh-user", s.SSHUser,
"If non-empty, use secure SSH proxy to the nodes, using this user name")
fs.MarkDeprecated("ssh-user", "This flag will be removed in a future version.")
// Deprecated in release 1.9
fs.StringVar(&s.SSHKeyfile, "ssh-keyfile", s.SSHKeyfile,
"If non-empty, use secure SSH proxy to the nodes, using this user keyfile")
fs.MarkDeprecated("ssh-keyfile", "This flag will be removed in a future version.")
fs.Int64Var(&s.MaxConnectionBytesPerSec, "max-connection-bytes-per-sec", s.MaxConnectionBytesPerSec, ""+
"If non-zero, throttle each user connection to this number of bytes/sec. "+
"Currently only applies to long-running requests.")

View File

@ -26,7 +26,6 @@ import (
"net/http"
"net/url"
"os"
"strconv"
"strings"
"time"
@ -55,7 +54,6 @@ import (
clientgoclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/keyutil"
cloudprovider "k8s.io/cloud-provider"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/cli/globalflag"
_ "k8s.io/component-base/metrics/prometheus/workqueue" // for workqueue metric registration
@ -71,8 +69,6 @@ import (
"k8s.io/kubernetes/pkg/capabilities"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
"k8s.io/kubernetes/pkg/controlplane/tunneler"
"k8s.io/kubernetes/pkg/features"
generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi"
"k8s.io/kubernetes/pkg/kubeapiserver"
kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
@ -186,19 +182,14 @@ func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) erro
// CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions)
if err != nil {
return nil, err
}
kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)
if err != nil {
return nil, err
}
// If additional API servers are added, they should be gated.
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(kubeAPIServerConfig.ExtraConfig.ProxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
if err != nil {
return nil, err
}
@ -213,7 +204,7 @@ func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan
}
// aggregator comes last in the chain
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, kubeAPIServerConfig.ExtraConfig.ProxyTransport, pluginInitializer)
if err != nil {
return nil, err
}
@ -236,72 +227,27 @@ func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPISe
return kubeAPIServer, nil
}
// CreateNodeDialer creates the dialer infrastructure to connect to the nodes.
func CreateNodeDialer(s completedServerRunOptions) (tunneler.Tunneler, *http.Transport, error) {
// Setup nodeTunneler if needed
var nodeTunneler tunneler.Tunneler
// CreateProxyTransport creates the dialer infrastructure to connect to the nodes.
func CreateProxyTransport() *http.Transport {
var proxyDialerFn utilnet.DialFunc
if len(s.SSHUser) > 0 {
// Get ssh key distribution func, if supported
var installSSHKey tunneler.InstallSSHKey
if utilfeature.DefaultFeatureGate.Enabled(features.DisableCloudProviders) && cloudprovider.IsDeprecatedInternal(s.CloudProvider.CloudProvider) {
cloudprovider.DisableWarningForProvider(s.CloudProvider.CloudProvider)
return nil, nil, fmt.Errorf("cloud provider %q and ssh-user %q was specified, but built-in cloud providers are disabled. "+
"Please set --cloud-provider=external and use an external network proxy, see https://github.com/kubernetes-sigs/apiserver-network-proxy",
s.CloudProvider.CloudProvider, s.SSHUser)
}
cloudprovider.DeprecationWarningForProvider(s.CloudProvider.CloudProvider)
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider.CloudProvider, s.CloudProvider.CloudConfigFile)
if err != nil {
return nil, nil, fmt.Errorf("cloud provider could not be initialized: %v", err)
}
if cloud != nil {
if instances, supported := cloud.Instances(); supported {
installSSHKey = instances.AddSSHKeyToAllInstances
}
}
if s.KubeletConfig.Port == 0 {
return nil, nil, fmt.Errorf("must enable kubelet port if proxy ssh-tunneling is specified")
}
if s.KubeletConfig.ReadOnlyPort == 0 {
return nil, nil, fmt.Errorf("must enable kubelet readonly port if proxy ssh-tunneling is specified")
}
// Set up the nodeTunneler
// TODO(cjcullen): If we want this to handle per-kubelet ports or other
// kubelet listen-addresses, we need to plumb through options.
healthCheckPath := &url.URL{
Scheme: "http",
Host: net.JoinHostPort("127.0.0.1", strconv.FormatUint(uint64(s.KubeletConfig.ReadOnlyPort), 10)),
Path: "healthz",
}
nodeTunneler = tunneler.New(s.SSHUser, s.SSHKeyfile, healthCheckPath, installSSHKey)
// Use the nodeTunneler's dialer when proxying to pods, services, and nodes
proxyDialerFn = nodeTunneler.Dial
}
// Proxying to pods and services is IP-based... don't expect to be able to verify the hostname
proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true}
proxyTransport := utilnet.SetTransportDefaults(&http.Transport{
DialContext: proxyDialerFn,
TLSClientConfig: proxyTLSClientConfig,
})
return nodeTunneler, proxyTransport, nil
return proxyTransport
}
// CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them
func CreateKubeAPIServerConfig(
s completedServerRunOptions,
nodeTunneler tunneler.Tunneler,
proxyTransport *http.Transport,
) (
func CreateKubeAPIServerConfig(s completedServerRunOptions) (
*controlplane.Config,
aggregatorapiserver.ServiceResolver,
[]admission.PluginInitializer,
error,
) {
proxyTransport := CreateProxyTransport()
genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)
if err != nil {
return nil, nil, nil, err
@ -339,8 +285,6 @@ func CreateKubeAPIServerConfig(
EnableLogsSupport: s.EnableLogsHandler,
ProxyTransport: proxyTransport,
Tunneler: nodeTunneler,
ServiceIPRange: s.PrimaryServiceClusterIPRange,
APIServerServiceIP: s.APIServerServiceIP,
SecondaryServiceIPRange: s.SecondaryServiceClusterIPRange,
@ -386,10 +330,6 @@ func CreateKubeAPIServerConfig(
return nil, nil, nil, err
}
if nodeTunneler != nil {
// Use the nodeTunneler's dialer to connect to the kubelet
config.ExtraConfig.KubeletClientConfig.Dial = nodeTunneler.Dial
}
if config.GenericConfig.EgressSelector != nil {
// Use the config.GenericConfig.EgressSelector lookup to find the dialer to connect to the kubelet
config.ExtraConfig.KubeletClientConfig.Lookup = config.GenericConfig.EgressSelector.Lookup

View File

@ -75,7 +75,6 @@ import (
"k8s.io/apiserver/pkg/registry/generic"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
"k8s.io/apiserver/pkg/server/healthz"
serverstorage "k8s.io/apiserver/pkg/server/storage"
storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory"
utilfeature "k8s.io/apiserver/pkg/util/feature"
@ -90,7 +89,6 @@ import (
"k8s.io/kubernetes/pkg/controlplane/controller/apiserverleasegc"
"k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
"k8s.io/kubernetes/pkg/controlplane/tunneler"
kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/routes"
@ -146,10 +144,8 @@ type ExtraConfig struct {
EventTTL time.Duration
KubeletClientConfig kubeletclient.KubeletClientConfig
// Used to start and monitor tunneling
Tunneler tunneler.Tunneler
EnableLogsSupport bool
ProxyTransport http.RoundTripper
ProxyTransport *http.Transport
// Values to build the IP addresses used by discovery
// The range of IPs to be assigned to services with type=ClusterIP or greater
@ -450,10 +446,6 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
return nil, err
}
if c.ExtraConfig.Tunneler != nil {
m.installTunneler(c.ExtraConfig.Tunneler, corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes())
}
m.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller", func(hookContext genericapiserver.PostStartHookContext) error {
kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)
if err != nil {
@ -551,14 +543,6 @@ func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generi
return nil
}
func (m *Instance) installTunneler(nodeTunneler tunneler.Tunneler, nodeClient corev1client.NodeInterface) {
nodeTunneler.Run(nodeAddressProvider{nodeClient}.externalAddresses)
err := m.GenericAPIServer.AddHealthChecks(healthz.NamedCheck("SSH Tunnel Check", tunneler.TunnelSyncHealthChecker(nodeTunneler)))
if err != nil {
klog.Errorf("Failed adding ssh tunnel health check %v\n", err)
}
}
// RESTStorageProvider is a factory type for REST storage.
type RESTStorageProvider interface {
GroupName() string

View File

@ -1,231 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tunneler
import (
"context"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"sync/atomic"
"time"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/ssh"
utilpath "k8s.io/utils/path"
)
type InstallSSHKey func(ctx context.Context, user string, data []byte) error
type AddressFunc func() (addresses []string, err error)
type Tunneler interface {
Run(AddressFunc)
Stop()
Dial(ctx context.Context, net, addr string) (net.Conn, error)
SecondsSinceSync() int64
SecondsSinceSSHKeySync() int64
}
// TunnelSyncHealthChecker returns a health func that indicates if a tunneler is healthy.
// It's compatible with healthz.NamedCheck
func TunnelSyncHealthChecker(tunneler Tunneler) func(req *http.Request) error {
return func(req *http.Request) error {
if tunneler == nil {
return nil
}
lag := tunneler.SecondsSinceSync()
if lag > 600 {
return fmt.Errorf("tunnel sync is taking too long: %d", lag)
}
sshKeyLag := tunneler.SecondsSinceSSHKeySync()
// Since we are syncing ssh-keys every 5 minutes, the allowed
// lag since last sync should be more than 2x higher than that
// to allow for single failure, which can always happen.
// For now set it to 3x, which is 15 minutes.
// For more details see: http://pr.k8s.io/59347
if sshKeyLag > 900 {
return fmt.Errorf("SSHKey sync is taking too long: %d", sshKeyLag)
}
return nil
}
}
type SSHTunneler struct {
// Important: Since these two int64 fields are using sync/atomic, they have to be at the top of the struct due to a bug on 32-bit platforms
// See: https://golang.org/pkg/sync/atomic/ for more information
lastSync int64 // Seconds since Epoch
lastSSHKeySync int64 // Seconds since Epoch
SSHUser string
SSHKeyfile string
InstallSSHKey InstallSSHKey
HealthCheckURL *url.URL
tunnels *ssh.SSHTunnelList
clock clock.Clock
getAddresses AddressFunc
stopChan chan struct{}
}
func New(sshUser, sshKeyfile string, healthCheckURL *url.URL, installSSHKey InstallSSHKey) Tunneler {
return &SSHTunneler{
SSHUser: sshUser,
SSHKeyfile: sshKeyfile,
InstallSSHKey: installSSHKey,
HealthCheckURL: healthCheckURL,
clock: clock.RealClock{},
}
}
// Run establishes tunnel loops and returns
func (c *SSHTunneler) Run(getAddresses AddressFunc) {
if c.stopChan != nil {
return
}
c.stopChan = make(chan struct{})
// Save the address getter
if getAddresses != nil {
c.getAddresses = getAddresses
}
// Usernames are capped @ 32
if len(c.SSHUser) > 32 {
klog.Warning("SSH User is too long, truncating to 32 chars")
c.SSHUser = c.SSHUser[0:32]
}
klog.Infof("Setting up proxy: %s %s", c.SSHUser, c.SSHKeyfile)
// public keyfile is written last, so check for that.
publicKeyFile := c.SSHKeyfile + ".pub"
exists, err := utilpath.Exists(utilpath.CheckFollowSymlink, publicKeyFile)
if err != nil {
klog.Errorf("Error detecting if key exists: %v", err)
} else if !exists {
klog.Infof("Key doesn't exist, attempting to create")
if err := generateSSHKey(c.SSHKeyfile, publicKeyFile); err != nil {
klog.Errorf("Failed to create key pair: %v", err)
}
}
c.tunnels = ssh.NewSSHTunnelList(c.SSHUser, c.SSHKeyfile, c.HealthCheckURL, c.stopChan)
// Sync loop to ensure that the SSH key has been installed.
c.lastSSHKeySync = c.clock.Now().Unix()
c.installSSHKeySyncLoop(c.SSHUser, publicKeyFile)
// Sync tunnelList w/ nodes.
c.lastSync = c.clock.Now().Unix()
c.nodesSyncLoop()
}
// Stop gracefully shuts down the tunneler
func (c *SSHTunneler) Stop() {
if c.stopChan != nil {
close(c.stopChan)
c.stopChan = nil
}
}
func (c *SSHTunneler) Dial(ctx context.Context, net, addr string) (net.Conn, error) {
return c.tunnels.Dial(ctx, net, addr)
}
func (c *SSHTunneler) SecondsSinceSync() int64 {
now := c.clock.Now().Unix()
then := atomic.LoadInt64(&c.lastSync)
return now - then
}
func (c *SSHTunneler) SecondsSinceSSHKeySync() int64 {
now := c.clock.Now().Unix()
then := atomic.LoadInt64(&c.lastSSHKeySync)
return now - then
}
func (c *SSHTunneler) installSSHKeySyncLoop(user, publicKeyfile string) {
go wait.Until(func() {
if c.InstallSSHKey == nil {
klog.Error("Won't attempt to install ssh key: InstallSSHKey function is nil")
return
}
key, err := ssh.ParsePublicKeyFromFile(publicKeyfile)
if err != nil {
klog.Errorf("Failed to load public key: %v", err)
return
}
keyData, err := ssh.EncodeSSHKey(key)
if err != nil {
klog.Errorf("Failed to encode public key: %v", err)
return
}
if err := c.InstallSSHKey(context.TODO(), user, keyData); err != nil {
klog.Errorf("Failed to install ssh key: %v", err)
return
}
atomic.StoreInt64(&c.lastSSHKeySync, c.clock.Now().Unix())
}, 5*time.Minute, c.stopChan)
}
// nodesSyncLoop lists nodes every 15 seconds, calling Update() on the TunnelList
// each time (Update() is a noop if no changes are necessary).
func (c *SSHTunneler) nodesSyncLoop() {
// TODO (cjcullen) make this watch.
go wait.Until(func() {
addrs, err := c.getAddresses()
klog.V(4).Infof("Calling update w/ addrs: %v", addrs)
if err != nil {
klog.Errorf("Failed to getAddresses: %v", err)
}
c.tunnels.Update(addrs)
atomic.StoreInt64(&c.lastSync, c.clock.Now().Unix())
}, 15*time.Second, c.stopChan)
}
func generateSSHKey(privateKeyfile, publicKeyfile string) error {
private, public, err := ssh.GenerateKey(2048)
if err != nil {
return err
}
// If private keyfile already exists, we must have only made it halfway
// through last time, so delete it.
exists, err := utilpath.Exists(utilpath.CheckFollowSymlink, privateKeyfile)
if err != nil {
klog.Errorf("Error detecting if private key exists: %v", err)
} else if exists {
klog.Infof("Private key exists, but public key does not")
if err := os.Remove(privateKeyfile); err != nil {
klog.Errorf("Failed to remove stale private key: %v", err)
}
}
if err := ioutil.WriteFile(privateKeyfile, ssh.EncodePrivateKey(private), 0600); err != nil {
return err
}
publicKeyBytes, err := ssh.EncodePublicKey(public)
if err != nil {
return err
}
if err := ioutil.WriteFile(publicKeyfile+".tmp", publicKeyBytes, 0600); err != nil {
return err
}
return os.Rename(publicKeyfile+".tmp", publicKeyfile)
}

View File

@ -1,163 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tunneler
import (
"context"
"fmt"
"net"
"os"
"path/filepath"
"testing"
"time"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/util/clock"
)
// TestSecondsSinceSync verifies that proper results are returned
// when checking the time between syncs
func TestSecondsSinceSync(t *testing.T) {
tests := []struct {
name string
lastSync int64
clock *clock.FakeClock
want int64
}{
{
name: "Nano Second. No difference",
lastSync: time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC).Unix(),
clock: clock.NewFakeClock(time.Date(2015, time.January, 1, 1, 1, 1, 2, time.UTC)),
want: int64(0),
},
{
name: "Second",
lastSync: time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC).Unix(),
clock: clock.NewFakeClock(time.Date(2015, time.January, 1, 1, 1, 2, 1, time.UTC)),
want: int64(1),
},
{
name: "Minute",
lastSync: time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC).Unix(),
clock: clock.NewFakeClock(time.Date(2015, time.January, 1, 1, 2, 1, 1, time.UTC)),
want: int64(60),
},
{
name: "Hour",
lastSync: time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC).Unix(),
clock: clock.NewFakeClock(time.Date(2015, time.January, 1, 2, 1, 1, 1, time.UTC)),
want: int64(3600),
},
{
name: "Day",
lastSync: time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC).Unix(),
clock: clock.NewFakeClock(time.Date(2015, time.January, 2, 1, 1, 1, 1, time.UTC)),
want: int64(86400),
},
{
name: "Month",
lastSync: time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC).Unix(),
clock: clock.NewFakeClock(time.Date(2015, time.February, 1, 1, 1, 1, 1, time.UTC)),
want: int64(2678400),
},
{
name: "Future Month. Should be -Month",
lastSync: time.Date(2015, time.February, 1, 1, 1, 1, 1, time.UTC).Unix(),
clock: clock.NewFakeClock(time.Date(2015, time.January, 1, 1, 1, 1, 2, time.UTC)),
want: int64(-2678400),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tunneler := &SSHTunneler{}
assert := assert.New(t)
tunneler.lastSync = tt.lastSync
tunneler.clock = tt.clock
assert.Equal(int64(tt.want), tunneler.SecondsSinceSync())
})
}
}
// generateTempFile creates a temporary file path
func generateTempFilePath(prefix string) string {
tmpPath, _ := filepath.Abs(fmt.Sprintf("%s/%s-%d", os.TempDir(), prefix, time.Now().Unix()))
return tmpPath
}
// TestGenerateSSHKey verifies that SSH key generation does indeed
// generate keys even with keys already exist.
func TestGenerateSSHKey(t *testing.T) {
assert := assert.New(t)
privateKey := generateTempFilePath("private")
publicKey := generateTempFilePath("public")
// Make sure we have no test keys laying around
os.Remove(privateKey)
os.Remove(publicKey)
// Pass case: Sunny day case
err := generateSSHKey(privateKey, publicKey)
assert.NoError(err, "generateSSHKey should not have retuend an error: %s", err)
// Pass case: PrivateKey exists test case
os.Remove(publicKey)
err = generateSSHKey(privateKey, publicKey)
assert.NoError(err, "generateSSHKey should not have retuend an error: %s", err)
// Pass case: PublicKey exists test case
os.Remove(privateKey)
err = generateSSHKey(privateKey, publicKey)
assert.NoError(err, "generateSSHKey should not have retuend an error: %s", err)
// Make sure we have no test keys laying around
os.Remove(privateKey)
os.Remove(publicKey)
// TODO: testing error cases where the file can not be removed?
}
type FakeTunneler struct {
SecondsSinceSyncValue int64
SecondsSinceSSHKeySyncValue int64
}
func (t *FakeTunneler) Run(AddressFunc) {}
func (t *FakeTunneler) Stop() {}
func (t *FakeTunneler) Dial(ctx context.Context, net, addr string) (net.Conn, error) { return nil, nil }
func (t *FakeTunneler) SecondsSinceSync() int64 { return t.SecondsSinceSyncValue }
func (t *FakeTunneler) SecondsSinceSSHKeySync() int64 { return t.SecondsSinceSSHKeySyncValue }
// TestIsTunnelSyncHealthy verifies that the 600 second lag test
// is honored.
func TestIsTunnelSyncHealthy(t *testing.T) {
tunneler := &FakeTunneler{}
// Pass case: 540 second lag
tunneler.SecondsSinceSyncValue = 540
healthFn := TunnelSyncHealthChecker(tunneler)
err := healthFn(nil)
assert.NoError(t, err, "IsTunnelSyncHealthy() should not have returned an error.")
// Fail case: 720 second lag
tunneler.SecondsSinceSyncValue = 720
err = healthFn(nil)
assert.Error(t, err, "IsTunnelSyncHealthy() should have returned an error.")
}

View File

@ -1,514 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// WARNING: DO NOT add new use-caes to this package as it is deprecated and slated for deletion.
package ssh
import (
"bytes"
"context"
"crypto/rand"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"encoding/pem"
"errors"
"fmt"
"io/ioutil"
mathrand "math/rand"
"net"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"
"golang.org/x/crypto/ssh"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog/v2"
)
/*
* By default, all the following metrics are defined as falling under
* ALPHA stability level https://github.com/kubernetes/enhancements/blob/master/keps/sig-instrumentation/1209-metrics-stability/kubernetes-control-plane-metrics-stability.md#stability-classes)
*
* Promoting the stability level of the metric is a responsibility of the component owner, since it
* involves explicitly acknowledging support for the metric across multiple releases, in accordance with
* the metric stability policy.
*/
var (
tunnelOpenCounter = metrics.NewCounter(
&metrics.CounterOpts{
Name: "ssh_tunnel_open_count",
Help: "Counter of ssh tunnel total open attempts",
StabilityLevel: metrics.ALPHA,
},
)
tunnelOpenFailCounter = metrics.NewCounter(
&metrics.CounterOpts{
Name: "ssh_tunnel_open_fail_count",
Help: "Counter of ssh tunnel failed open attempts",
StabilityLevel: metrics.ALPHA,
},
)
)
func init() {
legacyregistry.MustRegister(tunnelOpenCounter)
legacyregistry.MustRegister(tunnelOpenFailCounter)
}
// TODO: Unit tests for this code, we can spin up a test SSH server with instructions here:
// https://godoc.org/golang.org/x/crypto/ssh#ServerConn
type sshTunnel struct {
Config *ssh.ClientConfig
Host string
SSHPort string
client *ssh.Client
}
func makeSSHTunnel(user string, signer ssh.Signer, host string) (*sshTunnel, error) {
config := ssh.ClientConfig{
User: user,
Auth: []ssh.AuthMethod{ssh.PublicKeys(signer)},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
}
return &sshTunnel{
Config: &config,
Host: host,
SSHPort: "22",
}, nil
}
func (s *sshTunnel) Open() error {
var err error
s.client, err = realTimeoutDialer.Dial("tcp", net.JoinHostPort(s.Host, s.SSHPort), s.Config)
tunnelOpenCounter.Inc()
if err != nil {
tunnelOpenFailCounter.Inc()
}
return err
}
func (s *sshTunnel) Dial(ctx context.Context, network, address string) (net.Conn, error) {
if s.client == nil {
return nil, errors.New("tunnel is not opened.")
}
// This Dial method does not allow to pass a context unfortunately
return s.client.Dial(network, address)
}
func (s *sshTunnel) Close() error {
if s.client == nil {
return errors.New("Cannot close tunnel. Tunnel was not opened.")
}
if err := s.client.Close(); err != nil {
return err
}
return nil
}
// Interface to allow mocking of ssh.Dial, for testing SSH
type sshDialer interface {
Dial(network, addr string, config *ssh.ClientConfig) (*ssh.Client, error)
}
// Real implementation of sshDialer
type realSSHDialer struct{}
var _ sshDialer = &realSSHDialer{}
func (d *realSSHDialer) Dial(network, addr string, config *ssh.ClientConfig) (*ssh.Client, error) {
conn, err := net.DialTimeout(network, addr, config.Timeout)
if err != nil {
return nil, err
}
conn.SetReadDeadline(time.Now().Add(30 * time.Second))
c, chans, reqs, err := ssh.NewClientConn(conn, addr, config)
if err != nil {
return nil, err
}
conn.SetReadDeadline(time.Time{})
return ssh.NewClient(c, chans, reqs), nil
}
// timeoutDialer wraps an sshDialer with a timeout around Dial(). The golang
// ssh library can hang indefinitely inside the Dial() call (see issue #23835).
// Wrapping all Dial() calls with a conservative timeout provides safety against
// getting stuck on that.
type timeoutDialer struct {
dialer sshDialer
timeout time.Duration
}
// 150 seconds is longer than the underlying default TCP backoff delay (127
// seconds). This timeout is only intended to catch otherwise uncaught hangs.
const sshDialTimeout = 150 * time.Second
var realTimeoutDialer sshDialer = &timeoutDialer{&realSSHDialer{}, sshDialTimeout}
func (d *timeoutDialer) Dial(network, addr string, config *ssh.ClientConfig) (*ssh.Client, error) {
config.Timeout = d.timeout
return d.dialer.Dial(network, addr, config)
}
// RunSSHCommand returns the stdout, stderr, and exit code from running cmd on
// host as specific user, along with any SSH-level error.
// If user=="", it will default (like SSH) to os.Getenv("USER")
func RunSSHCommand(cmd, user, host string, signer ssh.Signer) (string, string, int, error) {
return runSSHCommand(realTimeoutDialer, cmd, user, host, signer, true)
}
// Internal implementation of runSSHCommand, for testing
func runSSHCommand(dialer sshDialer, cmd, user, host string, signer ssh.Signer, retry bool) (string, string, int, error) {
if user == "" {
user = os.Getenv("USER")
}
// Setup the config, dial the server, and open a session.
config := &ssh.ClientConfig{
User: user,
Auth: []ssh.AuthMethod{ssh.PublicKeys(signer)},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
}
client, err := dialer.Dial("tcp", host, config)
if err != nil && retry {
err = wait.Poll(5*time.Second, 20*time.Second, func() (bool, error) {
fmt.Printf("error dialing %s@%s: '%v', retrying\n", user, host, err)
if client, err = dialer.Dial("tcp", host, config); err != nil {
return false, err
}
return true, nil
})
}
if err != nil {
return "", "", 0, fmt.Errorf("error getting SSH client to %s@%s: '%v'", user, host, err)
}
defer client.Close()
session, err := client.NewSession()
if err != nil {
return "", "", 0, fmt.Errorf("error creating session to %s@%s: '%v'", user, host, err)
}
defer session.Close()
// Run the command.
code := 0
var bout, berr bytes.Buffer
session.Stdout, session.Stderr = &bout, &berr
if err = session.Run(cmd); err != nil {
// Check whether the command failed to run or didn't complete.
if exiterr, ok := err.(*ssh.ExitError); ok {
// If we got an ExitError and the exit code is nonzero, we'll
// consider the SSH itself successful (just that the command run
// errored on the host).
if code = exiterr.ExitStatus(); code != 0 {
err = nil
}
} else {
// Some other kind of error happened (e.g. an IOError); consider the
// SSH unsuccessful.
err = fmt.Errorf("failed running `%s` on %s@%s: '%v'", cmd, user, host, err)
}
}
return bout.String(), berr.String(), code, err
}
func MakePrivateKeySignerFromFile(key string) (ssh.Signer, error) {
// Create an actual signer.
buffer, err := ioutil.ReadFile(key)
if err != nil {
return nil, fmt.Errorf("error reading SSH key %s: '%v'", key, err)
}
return MakePrivateKeySignerFromBytes(buffer)
}
func MakePrivateKeySignerFromBytes(buffer []byte) (ssh.Signer, error) {
signer, err := ssh.ParsePrivateKey(buffer)
if err != nil {
return nil, fmt.Errorf("error parsing SSH key: '%v'", err)
}
return signer, nil
}
func ParsePublicKeyFromFile(keyFile string) (*rsa.PublicKey, error) {
buffer, err := ioutil.ReadFile(keyFile)
if err != nil {
return nil, fmt.Errorf("error reading SSH key %s: '%v'", keyFile, err)
}
keyBlock, _ := pem.Decode(buffer)
if keyBlock == nil {
return nil, fmt.Errorf("error parsing SSH key %s: 'invalid PEM format'", keyFile)
}
key, err := x509.ParsePKIXPublicKey(keyBlock.Bytes)
if err != nil {
return nil, fmt.Errorf("error parsing SSH key %s: '%v'", keyFile, err)
}
rsaKey, ok := key.(*rsa.PublicKey)
if !ok {
return nil, fmt.Errorf("SSH key could not be parsed as rsa public key")
}
return rsaKey, nil
}
type tunnel interface {
Open() error
Close() error
Dial(ctx context.Context, network, address string) (net.Conn, error)
}
type sshTunnelEntry struct {
Address string
Tunnel tunnel
}
type sshTunnelCreator interface {
newSSHTunnel(user, keyFile, host string) (tunnel, error)
}
type realTunnelCreator struct{}
func (*realTunnelCreator) newSSHTunnel(user, keyFile, host string) (tunnel, error) {
signer, err := MakePrivateKeySignerFromFile(keyFile)
if err != nil {
return nil, err
}
return makeSSHTunnel(user, signer, host)
}
type SSHTunnelList struct {
entries []sshTunnelEntry
adding map[string]bool
tunnelCreator sshTunnelCreator
tunnelsLock sync.Mutex
user string
keyfile string
healthCheckURL *url.URL
}
func NewSSHTunnelList(user, keyfile string, healthCheckURL *url.URL, stopChan chan struct{}) *SSHTunnelList {
l := &SSHTunnelList{
adding: make(map[string]bool),
tunnelCreator: &realTunnelCreator{},
user: user,
keyfile: keyfile,
healthCheckURL: healthCheckURL,
}
healthCheckPoll := 1 * time.Minute
go wait.Until(func() {
l.tunnelsLock.Lock()
defer l.tunnelsLock.Unlock()
// Healthcheck each tunnel every minute
numTunnels := len(l.entries)
for i, entry := range l.entries {
// Stagger healthchecks evenly across duration of healthCheckPoll.
delay := healthCheckPoll * time.Duration(i) / time.Duration(numTunnels)
l.delayedHealthCheck(entry, delay)
}
}, healthCheckPoll, stopChan)
return l
}
func (l *SSHTunnelList) delayedHealthCheck(e sshTunnelEntry, delay time.Duration) {
go func() {
defer runtime.HandleCrash()
time.Sleep(delay)
if err := l.healthCheck(e); err != nil {
klog.Errorf("Healthcheck failed for tunnel to %q: %v", e.Address, err)
klog.Infof("Attempting once to re-establish tunnel to %q", e.Address)
l.removeAndReAdd(e)
}
}()
}
func (l *SSHTunnelList) healthCheck(e sshTunnelEntry) error {
// GET the healthcheck path using the provided tunnel's dial function.
transport := utilnet.SetTransportDefaults(&http.Transport{
DialContext: e.Tunnel.Dial,
// TODO(cjcullen): Plumb real TLS options through.
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
// We don't reuse the clients, so disable the keep-alive to properly
// close the connection.
DisableKeepAlives: true,
})
client := &http.Client{Transport: transport}
resp, err := client.Get(l.healthCheckURL.String())
if err != nil {
return err
}
resp.Body.Close()
return nil
}
func (l *SSHTunnelList) removeAndReAdd(e sshTunnelEntry) {
// Find the entry to replace.
l.tunnelsLock.Lock()
for i, entry := range l.entries {
if entry.Tunnel == e.Tunnel {
l.entries = append(l.entries[:i], l.entries[i+1:]...)
l.adding[e.Address] = true
break
}
}
l.tunnelsLock.Unlock()
if err := e.Tunnel.Close(); err != nil {
klog.Infof("Failed to close removed tunnel: %v", err)
}
go l.createAndAddTunnel(e.Address)
}
func (l *SSHTunnelList) Dial(ctx context.Context, net, addr string) (net.Conn, error) {
start := time.Now()
id := mathrand.Int63() // So you can match begins/ends in the log.
klog.Infof("[%x: %v] Dialing...", id, addr)
defer func() {
klog.Infof("[%x: %v] Dialed in %v.", id, addr, time.Since(start))
}()
tunnel, err := l.pickTunnel(strings.Split(addr, ":")[0])
if err != nil {
return nil, err
}
return tunnel.Dial(ctx, net, addr)
}
func (l *SSHTunnelList) pickTunnel(addr string) (tunnel, error) {
l.tunnelsLock.Lock()
defer l.tunnelsLock.Unlock()
if len(l.entries) == 0 {
return nil, fmt.Errorf("No SSH tunnels currently open. Were the targets able to accept an ssh-key for user %q?", l.user)
}
// Prefer same tunnel as kubelet
for _, entry := range l.entries {
if entry.Address == addr {
return entry.Tunnel, nil
}
}
klog.Warningf("SSH tunnel not found for address %q, picking random node", addr)
n := mathrand.Intn(len(l.entries))
return l.entries[n].Tunnel, nil
}
// Update reconciles the list's entries with the specified addresses. Existing
// tunnels that are not in addresses are removed from entries and closed in a
// background goroutine. New tunnels specified in addresses are opened in a
// background goroutine and then added to entries.
func (l *SSHTunnelList) Update(addrs []string) {
haveAddrsMap := make(map[string]bool)
wantAddrsMap := make(map[string]bool)
func() {
l.tunnelsLock.Lock()
defer l.tunnelsLock.Unlock()
// Build a map of what we currently have.
for i := range l.entries {
haveAddrsMap[l.entries[i].Address] = true
}
// Determine any necessary additions.
for i := range addrs {
// Add tunnel if it is not in l.entries or l.adding
if _, ok := haveAddrsMap[addrs[i]]; !ok {
if _, ok := l.adding[addrs[i]]; !ok {
l.adding[addrs[i]] = true
addr := addrs[i]
go func() {
defer runtime.HandleCrash()
// Actually adding tunnel to list will block until lock
// is released after deletions.
l.createAndAddTunnel(addr)
}()
}
}
wantAddrsMap[addrs[i]] = true
}
// Determine any necessary deletions.
var newEntries []sshTunnelEntry
for i := range l.entries {
if _, ok := wantAddrsMap[l.entries[i].Address]; !ok {
tunnelEntry := l.entries[i]
klog.Infof("Removing tunnel to deleted node at %q", tunnelEntry.Address)
go func() {
defer runtime.HandleCrash()
if err := tunnelEntry.Tunnel.Close(); err != nil {
klog.Errorf("Failed to close tunnel to %q: %v", tunnelEntry.Address, err)
}
}()
} else {
newEntries = append(newEntries, l.entries[i])
}
}
l.entries = newEntries
}()
}
func (l *SSHTunnelList) createAndAddTunnel(addr string) {
klog.Infof("Trying to add tunnel to %q", addr)
tunnel, err := l.tunnelCreator.newSSHTunnel(l.user, l.keyfile, addr)
if err != nil {
klog.Errorf("Failed to create tunnel for %q: %v", addr, err)
return
}
if err := tunnel.Open(); err != nil {
klog.Errorf("Failed to open tunnel to %q: %v", addr, err)
l.tunnelsLock.Lock()
delete(l.adding, addr)
l.tunnelsLock.Unlock()
return
}
l.tunnelsLock.Lock()
l.entries = append(l.entries, sshTunnelEntry{addr, tunnel})
delete(l.adding, addr)
l.tunnelsLock.Unlock()
klog.Infof("Successfully added tunnel for %q", addr)
}
func EncodePrivateKey(private *rsa.PrivateKey) []byte {
return pem.EncodeToMemory(&pem.Block{
Bytes: x509.MarshalPKCS1PrivateKey(private),
Type: "RSA PRIVATE KEY",
})
}
func EncodePublicKey(public *rsa.PublicKey) ([]byte, error) {
publicBytes, err := x509.MarshalPKIXPublicKey(public)
if err != nil {
return nil, err
}
return pem.EncodeToMemory(&pem.Block{
Bytes: publicBytes,
Type: "PUBLIC KEY",
}), nil
}
func EncodeSSHKey(public *rsa.PublicKey) ([]byte, error) {
publicKey, err := ssh.NewPublicKey(public)
if err != nil {
return nil, err
}
return ssh.MarshalAuthorizedKey(publicKey), nil
}
func GenerateKey(bits int) (*rsa.PrivateKey, *rsa.PublicKey, error) {
private, err := rsa.GenerateKey(rand.Reader, bits)
if err != nil {
return nil, nil, err
}
return private, &private.PublicKey, nil
}

View File

@ -1,365 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ssh
import (
"context"
"fmt"
"io"
"net"
"os"
"reflect"
"strings"
"testing"
"time"
"golang.org/x/crypto/ssh"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
)
type testSSHServer struct {
Host string
Port string
Type string
Data []byte
PrivateKey []byte
PublicKey []byte
}
func runTestSSHServer(user, password string) (*testSSHServer, error) {
result := &testSSHServer{}
// Largely derived from https://godoc.org/golang.org/x/crypto/ssh#example-NewServerConn
config := &ssh.ServerConfig{
PasswordCallback: func(c ssh.ConnMetadata, pass []byte) (*ssh.Permissions, error) {
if c.User() == user && string(pass) == password {
return nil, nil
}
return nil, fmt.Errorf("password rejected for %s", c.User())
},
PublicKeyCallback: func(c ssh.ConnMetadata, key ssh.PublicKey) (*ssh.Permissions, error) {
result.Type = key.Type()
result.Data = ssh.MarshalAuthorizedKey(key)
return nil, nil
},
}
privateKey, publicKey, err := GenerateKey(2048)
if err != nil {
return nil, err
}
privateBytes := EncodePrivateKey(privateKey)
signer, err := ssh.ParsePrivateKey(privateBytes)
if err != nil {
return nil, err
}
config.AddHostKey(signer)
result.PrivateKey = privateBytes
publicBytes, err := EncodePublicKey(publicKey)
if err != nil {
return nil, err
}
result.PublicKey = publicBytes
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return nil, err
}
host, port, err := net.SplitHostPort(listener.Addr().String())
if err != nil {
return nil, err
}
result.Host = host
result.Port = port
go func() {
// TODO: return this port.
defer listener.Close()
conn, err := listener.Accept()
if err != nil {
klog.Errorf("Failed to accept: %v", err)
}
_, chans, reqs, err := ssh.NewServerConn(conn, config)
if err != nil {
klog.Errorf("Failed handshake: %v", err)
}
go ssh.DiscardRequests(reqs)
for newChannel := range chans {
if newChannel.ChannelType() != "direct-tcpip" {
newChannel.Reject(ssh.UnknownChannelType, fmt.Sprintf("unknown channel type: %s", newChannel.ChannelType()))
continue
}
channel, requests, err := newChannel.Accept()
if err != nil {
klog.Errorf("Failed to accept channel: %v", err)
}
for req := range requests {
klog.Infof("Got request: %v", req)
}
channel.Close()
}
}()
return result, nil
}
func TestSSHTunnel(t *testing.T) {
private, public, err := GenerateKey(2048)
if err != nil {
t.Errorf("unexpected error: %v", err)
t.FailNow()
}
server, err := runTestSSHServer("foo", "bar")
if err != nil {
t.Errorf("unexpected error: %v", err)
t.FailNow()
}
privateData := EncodePrivateKey(private)
tunnel, err := newSSHTunnelFromBytes("foo", privateData, server.Host)
if err != nil {
t.Errorf("unexpected error: %v", err)
t.FailNow()
}
tunnel.SSHPort = server.Port
if err := tunnel.Open(); err != nil {
t.Errorf("unexpected error: %v", err)
t.FailNow()
}
_, err = tunnel.Dial(context.Background(), "tcp", "127.0.0.1:8080")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if server.Type != "ssh-rsa" {
t.Errorf("expected %s, got %s", "ssh-rsa", server.Type)
}
publicData, err := EncodeSSHKey(public)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(server.Data, publicData) {
t.Errorf("expected %s, got %s", string(server.Data), string(privateData))
}
if err := tunnel.Close(); err != nil {
t.Errorf("unexpected error: %v", err)
}
}
type fakeTunnel struct{}
func (*fakeTunnel) Open() error {
return nil
}
func (*fakeTunnel) Close() error {
return nil
}
func (*fakeTunnel) Dial(ctx context.Context, network, address string) (net.Conn, error) {
return nil, nil
}
type fakeTunnelCreator struct{}
func (*fakeTunnelCreator) newSSHTunnel(string, string, string) (tunnel, error) {
return &fakeTunnel{}, nil
}
func TestSSHTunnelListUpdate(t *testing.T) {
// Start with an empty tunnel list.
l := &SSHTunnelList{
adding: make(map[string]bool),
tunnelCreator: &fakeTunnelCreator{},
}
// Start with 2 tunnels.
addressStrings := []string{"1.2.3.4", "5.6.7.8"}
l.Update(addressStrings)
checkTunnelsCorrect(t, l, addressStrings)
// Add another tunnel.
addressStrings = append(addressStrings, "9.10.11.12")
l.Update(addressStrings)
checkTunnelsCorrect(t, l, addressStrings)
// Go down to a single tunnel.
addressStrings = []string{"1.2.3.4"}
l.Update(addressStrings)
checkTunnelsCorrect(t, l, addressStrings)
// Replace w/ all new tunnels.
addressStrings = []string{"21.22.23.24", "25.26.27.28"}
l.Update(addressStrings)
checkTunnelsCorrect(t, l, addressStrings)
// Call update with the same tunnels.
l.Update(addressStrings)
checkTunnelsCorrect(t, l, addressStrings)
}
func checkTunnelsCorrect(t *testing.T, tunnelList *SSHTunnelList, addresses []string) {
if err := wait.Poll(100*time.Millisecond, 2*time.Second, func() (bool, error) {
return hasCorrectTunnels(tunnelList, addresses), nil
}); err != nil {
t.Errorf("Error waiting for tunnels to reach expected state: %v. Expected %v, had %v", err, addresses, tunnelList)
}
}
func hasCorrectTunnels(tunnelList *SSHTunnelList, addresses []string) bool {
tunnelList.tunnelsLock.Lock()
defer tunnelList.tunnelsLock.Unlock()
wantMap := make(map[string]bool)
for _, addr := range addresses {
wantMap[addr] = true
}
haveMap := make(map[string]bool)
for _, entry := range tunnelList.entries {
if wantMap[entry.Address] == false {
return false
}
haveMap[entry.Address] = true
}
for _, addr := range addresses {
if haveMap[addr] == false {
return false
}
}
return true
}
type mockSSHDialer struct {
network string
addr string
config *ssh.ClientConfig
}
func (d *mockSSHDialer) Dial(network, addr string, config *ssh.ClientConfig) (*ssh.Client, error) {
d.network = network
d.addr = addr
d.config = config
return nil, fmt.Errorf("mock error from Dial")
}
type mockSigner struct {
}
func (s *mockSigner) PublicKey() ssh.PublicKey {
panic("mockSigner.PublicKey not implemented")
}
func (s *mockSigner) Sign(rand io.Reader, data []byte) (*ssh.Signature, error) {
panic("mockSigner.Sign not implemented")
}
func TestSSHUser(t *testing.T) {
signer := &mockSigner{}
table := []struct {
title string
user string
host string
signer ssh.Signer
command string
expectUser string
}{
{
title: "all values provided",
user: "testuser",
host: "testhost",
signer: signer,
command: "uptime",
expectUser: "testuser",
},
{
title: "empty user defaults to GetEnv(USER)",
user: "",
host: "testhost",
signer: signer,
command: "uptime",
expectUser: os.Getenv("USER"),
},
}
for _, item := range table {
dialer := &mockSSHDialer{}
_, _, _, err := runSSHCommand(dialer, item.command, item.user, item.host, item.signer, false)
if err == nil {
t.Errorf("expected error (as mock returns error); did not get one")
}
errString := err.Error()
if !strings.HasPrefix(errString, fmt.Sprintf("error getting SSH client to %s@%s:", item.expectUser, item.host)) {
t.Errorf("unexpected error: %v", errString)
}
if dialer.network != "tcp" {
t.Errorf("unexpected network: %v", dialer.network)
}
if dialer.config.User != item.expectUser {
t.Errorf("unexpected user: %v", dialer.config.User)
}
if len(dialer.config.Auth) != 1 {
t.Errorf("unexpected auth: %v", dialer.config.Auth)
}
// (No way to test Auth - nothing exported?)
}
}
func TestTimeoutDialer(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Errorf("unexpected error: %v", err)
t.FailNow()
}
testCases := []struct {
timeout time.Duration
expectedErrString string
}{
// delay > timeout should cause ssh.Dial to timeout.
{1, "i/o timeout"},
}
for _, tc := range testCases {
dialer := &timeoutDialer{&realSSHDialer{}, tc.timeout}
_, err := dialer.Dial("tcp", listener.Addr().String(), &ssh.ClientConfig{})
if len(tc.expectedErrString) == 0 && err != nil ||
!strings.Contains(fmt.Sprint(err), tc.expectedErrString) {
t.Errorf("Expected error to contain %q; got %v", tc.expectedErrString, err)
}
}
listener.Close()
}
func newSSHTunnelFromBytes(user string, privateKey []byte, host string) (*sshTunnel, error) {
signer, err := MakePrivateKeySignerFromBytes(privateKey)
if err != nil {
return nil, err
}
return makeSSHTunnel(user, signer, host)
}

View File

@ -135,11 +135,7 @@ func StartTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup
t.Fatalf("failed to validate ServerRunOptions: %v", utilerrors.NewAggregate(errs))
}
tunneler, proxyTransport, err := app.CreateNodeDialer(completedOptions)
if err != nil {
t.Fatal(err)
}
kubeAPIServerConfig, _, _, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport)
kubeAPIServerConfig, _, _, err := app.CreateKubeAPIServerConfig(completedOptions)
if err != nil {
t.Fatal(err)
}