diff --git a/cluster/gce/gci/configure-kubeapiserver.sh b/cluster/gce/gci/configure-kubeapiserver.sh index f9ce60a161d..2fc9461aad9 100644 --- a/cluster/gce/gci/configure-kubeapiserver.sh +++ b/cluster/gce/gci/configure-kubeapiserver.sh @@ -276,22 +276,11 @@ function start-kube-apiserver { fi if [[ -n "${MASTER_ADVERTISE_ADDRESS:-}" ]]; then params+=" --advertise-address=${MASTER_ADVERTISE_ADDRESS}" - if [[ -n "${PROXY_SSH_USER:-}" ]]; then - if [[ -n "${KUBE_API_SERVER_RUNASUSER:-}" && -n "${KUBE_API_SERVER_RUNASGROUP:-}" ]]; then - chown -R "${KUBE_API_SERVER_RUNASUSER}":"${KUBE_API_SERVER_RUNASGROUP}" /etc/srv/sshproxy/ - fi - params+=" --ssh-user=${PROXY_SSH_USER}" - params+=" --ssh-keyfile=/etc/srv/sshproxy/.sshkeyfile" - fi elif [[ -n "${PROJECT_ID:-}" && -n "${TOKEN_URL:-}" && -n "${TOKEN_BODY:-}" && -n "${NODE_NETWORK:-}" ]]; then local -r vm_external_ip=$(get-metadata-value "instance/network-interfaces/0/access-configs/0/external-ip") - if [[ -n "${PROXY_SSH_USER:-}" ]]; then - if [[ -n "${KUBE_API_SERVER_RUNASUSER:-}" && -n "${KUBE_API_SERVER_RUNASGROUP:-}" ]]; then - chown -R "${KUBE_API_SERVER_RUNASUSER}":"${KUBE_API_SERVER_RUNASGROUP}" /etc/srv/sshproxy/ - fi - params+=" --advertise-address=${vm_external_ip}" - params+=" --ssh-user=${PROXY_SSH_USER}" - params+=" --ssh-keyfile=/etc/srv/sshproxy/.sshkeyfile" + params+=" --advertise-address=${vm_external_ip}" + if [[ -n "${KUBE_API_SERVER_RUNASUSER:-}" && -n "${KUBE_API_SERVER_RUNASGROUP:-}" ]]; then + chown -R "${KUBE_API_SERVER_RUNASUSER}":"${KUBE_API_SERVER_RUNASGROUP}" /etc/srv/sshproxy/ fi fi diff --git a/cmd/kube-apiserver/app/options/options.go b/cmd/kube-apiserver/app/options/options.go index 6df99c80c5c..6fdf2223067 100644 --- a/cmd/kube-apiserver/app/options/options.go +++ b/cmd/kube-apiserver/app/options/options.go @@ -75,8 +75,6 @@ type ServerRunOptions struct { APIServerServiceIP net.IP ServiceNodePortRange utilnet.PortRange - SSHKeyfile string - SSHUser string ProxyClientCertFile string ProxyClientKeyFile string @@ -196,16 +194,6 @@ func (s *ServerRunOptions) Flags() (fss cliflag.NamedFlagSets) { "If true, install a /logs handler for the apiserver logs.") fs.MarkDeprecated("enable-logs-handler", "This flag will be removed in v1.19") - // Deprecated in release 1.9 - fs.StringVar(&s.SSHUser, "ssh-user", s.SSHUser, - "If non-empty, use secure SSH proxy to the nodes, using this user name") - fs.MarkDeprecated("ssh-user", "This flag will be removed in a future version.") - - // Deprecated in release 1.9 - fs.StringVar(&s.SSHKeyfile, "ssh-keyfile", s.SSHKeyfile, - "If non-empty, use secure SSH proxy to the nodes, using this user keyfile") - fs.MarkDeprecated("ssh-keyfile", "This flag will be removed in a future version.") - fs.Int64Var(&s.MaxConnectionBytesPerSec, "max-connection-bytes-per-sec", s.MaxConnectionBytesPerSec, ""+ "If non-zero, throttle each user connection to this number of bytes/sec. "+ "Currently only applies to long-running requests.") diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 15edadd614d..1c4359c13b9 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -26,7 +26,6 @@ import ( "net/http" "net/url" "os" - "strconv" "strings" "time" @@ -55,7 +54,6 @@ import ( clientgoclientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/util/keyutil" - cloudprovider "k8s.io/cloud-provider" cliflag "k8s.io/component-base/cli/flag" "k8s.io/component-base/cli/globalflag" _ "k8s.io/component-base/metrics/prometheus/workqueue" // for workqueue metric registration @@ -71,8 +69,6 @@ import ( "k8s.io/kubernetes/pkg/capabilities" "k8s.io/kubernetes/pkg/controlplane" "k8s.io/kubernetes/pkg/controlplane/reconcilers" - "k8s.io/kubernetes/pkg/controlplane/tunneler" - "k8s.io/kubernetes/pkg/features" generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi" "k8s.io/kubernetes/pkg/kubeapiserver" kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" @@ -186,19 +182,14 @@ func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) erro // CreateServerChain creates the apiservers connected via delegation. func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) { - nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions) - if err != nil { - return nil, err - } - - kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport) + kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions) if err != nil { return nil, err } // If additional API servers are added, they should be gated. apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount, - serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig)) + serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(kubeAPIServerConfig.ExtraConfig.ProxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig)) if err != nil { return nil, err } @@ -213,7 +204,7 @@ func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan } // aggregator comes last in the chain - aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer) + aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, kubeAPIServerConfig.ExtraConfig.ProxyTransport, pluginInitializer) if err != nil { return nil, err } @@ -236,72 +227,27 @@ func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPISe return kubeAPIServer, nil } -// CreateNodeDialer creates the dialer infrastructure to connect to the nodes. -func CreateNodeDialer(s completedServerRunOptions) (tunneler.Tunneler, *http.Transport, error) { - // Setup nodeTunneler if needed - var nodeTunneler tunneler.Tunneler +// CreateProxyTransport creates the dialer infrastructure to connect to the nodes. +func CreateProxyTransport() *http.Transport { var proxyDialerFn utilnet.DialFunc - if len(s.SSHUser) > 0 { - // Get ssh key distribution func, if supported - var installSSHKey tunneler.InstallSSHKey - - if utilfeature.DefaultFeatureGate.Enabled(features.DisableCloudProviders) && cloudprovider.IsDeprecatedInternal(s.CloudProvider.CloudProvider) { - cloudprovider.DisableWarningForProvider(s.CloudProvider.CloudProvider) - return nil, nil, fmt.Errorf("cloud provider %q and ssh-user %q was specified, but built-in cloud providers are disabled. "+ - "Please set --cloud-provider=external and use an external network proxy, see https://github.com/kubernetes-sigs/apiserver-network-proxy", - s.CloudProvider.CloudProvider, s.SSHUser) - - } - - cloudprovider.DeprecationWarningForProvider(s.CloudProvider.CloudProvider) - cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider.CloudProvider, s.CloudProvider.CloudConfigFile) - if err != nil { - return nil, nil, fmt.Errorf("cloud provider could not be initialized: %v", err) - } - if cloud != nil { - if instances, supported := cloud.Instances(); supported { - installSSHKey = instances.AddSSHKeyToAllInstances - } - } - if s.KubeletConfig.Port == 0 { - return nil, nil, fmt.Errorf("must enable kubelet port if proxy ssh-tunneling is specified") - } - if s.KubeletConfig.ReadOnlyPort == 0 { - return nil, nil, fmt.Errorf("must enable kubelet readonly port if proxy ssh-tunneling is specified") - } - // Set up the nodeTunneler - // TODO(cjcullen): If we want this to handle per-kubelet ports or other - // kubelet listen-addresses, we need to plumb through options. - healthCheckPath := &url.URL{ - Scheme: "http", - Host: net.JoinHostPort("127.0.0.1", strconv.FormatUint(uint64(s.KubeletConfig.ReadOnlyPort), 10)), - Path: "healthz", - } - nodeTunneler = tunneler.New(s.SSHUser, s.SSHKeyfile, healthCheckPath, installSSHKey) - - // Use the nodeTunneler's dialer when proxying to pods, services, and nodes - proxyDialerFn = nodeTunneler.Dial - } // Proxying to pods and services is IP-based... don't expect to be able to verify the hostname proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true} proxyTransport := utilnet.SetTransportDefaults(&http.Transport{ DialContext: proxyDialerFn, TLSClientConfig: proxyTLSClientConfig, }) - return nodeTunneler, proxyTransport, nil + return proxyTransport } // CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them -func CreateKubeAPIServerConfig( - s completedServerRunOptions, - nodeTunneler tunneler.Tunneler, - proxyTransport *http.Transport, -) ( +func CreateKubeAPIServerConfig(s completedServerRunOptions) ( *controlplane.Config, aggregatorapiserver.ServiceResolver, []admission.PluginInitializer, error, ) { + proxyTransport := CreateProxyTransport() + genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport) if err != nil { return nil, nil, nil, err @@ -339,8 +285,6 @@ func CreateKubeAPIServerConfig( EnableLogsSupport: s.EnableLogsHandler, ProxyTransport: proxyTransport, - Tunneler: nodeTunneler, - ServiceIPRange: s.PrimaryServiceClusterIPRange, APIServerServiceIP: s.APIServerServiceIP, SecondaryServiceIPRange: s.SecondaryServiceClusterIPRange, @@ -386,10 +330,6 @@ func CreateKubeAPIServerConfig( return nil, nil, nil, err } - if nodeTunneler != nil { - // Use the nodeTunneler's dialer to connect to the kubelet - config.ExtraConfig.KubeletClientConfig.Dial = nodeTunneler.Dial - } if config.GenericConfig.EgressSelector != nil { // Use the config.GenericConfig.EgressSelector lookup to find the dialer to connect to the kubelet config.ExtraConfig.KubeletClientConfig.Lookup = config.GenericConfig.EgressSelector.Lookup diff --git a/pkg/controlplane/instance.go b/pkg/controlplane/instance.go index a626a992d60..606ccb921d6 100644 --- a/pkg/controlplane/instance.go +++ b/pkg/controlplane/instance.go @@ -75,7 +75,6 @@ import ( "k8s.io/apiserver/pkg/registry/generic" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/dynamiccertificates" - "k8s.io/apiserver/pkg/server/healthz" serverstorage "k8s.io/apiserver/pkg/server/storage" storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -90,7 +89,6 @@ import ( "k8s.io/kubernetes/pkg/controlplane/controller/apiserverleasegc" "k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust" "k8s.io/kubernetes/pkg/controlplane/reconcilers" - "k8s.io/kubernetes/pkg/controlplane/tunneler" kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options" kubeletclient "k8s.io/kubernetes/pkg/kubelet/client" "k8s.io/kubernetes/pkg/routes" @@ -146,10 +144,8 @@ type ExtraConfig struct { EventTTL time.Duration KubeletClientConfig kubeletclient.KubeletClientConfig - // Used to start and monitor tunneling - Tunneler tunneler.Tunneler EnableLogsSupport bool - ProxyTransport http.RoundTripper + ProxyTransport *http.Transport // Values to build the IP addresses used by discovery // The range of IPs to be assigned to services with type=ClusterIP or greater @@ -450,10 +446,6 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) return nil, err } - if c.ExtraConfig.Tunneler != nil { - m.installTunneler(c.ExtraConfig.Tunneler, corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes()) - } - m.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller", func(hookContext genericapiserver.PostStartHookContext) error { kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig) if err != nil { @@ -551,14 +543,6 @@ func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generi return nil } -func (m *Instance) installTunneler(nodeTunneler tunneler.Tunneler, nodeClient corev1client.NodeInterface) { - nodeTunneler.Run(nodeAddressProvider{nodeClient}.externalAddresses) - err := m.GenericAPIServer.AddHealthChecks(healthz.NamedCheck("SSH Tunnel Check", tunneler.TunnelSyncHealthChecker(nodeTunneler))) - if err != nil { - klog.Errorf("Failed adding ssh tunnel health check %v\n", err) - } -} - // RESTStorageProvider is a factory type for REST storage. type RESTStorageProvider interface { GroupName() string diff --git a/pkg/controlplane/tunneler/ssh.go b/pkg/controlplane/tunneler/ssh.go deleted file mode 100644 index 643f6c35534..00000000000 --- a/pkg/controlplane/tunneler/ssh.go +++ /dev/null @@ -1,231 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package tunneler - -import ( - "context" - "fmt" - "io/ioutil" - "net" - "net/http" - "net/url" - "os" - "sync/atomic" - "time" - - "k8s.io/apimachinery/pkg/util/clock" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/klog/v2" - "k8s.io/kubernetes/pkg/ssh" - utilpath "k8s.io/utils/path" -) - -type InstallSSHKey func(ctx context.Context, user string, data []byte) error - -type AddressFunc func() (addresses []string, err error) - -type Tunneler interface { - Run(AddressFunc) - Stop() - Dial(ctx context.Context, net, addr string) (net.Conn, error) - SecondsSinceSync() int64 - SecondsSinceSSHKeySync() int64 -} - -// TunnelSyncHealthChecker returns a health func that indicates if a tunneler is healthy. -// It's compatible with healthz.NamedCheck -func TunnelSyncHealthChecker(tunneler Tunneler) func(req *http.Request) error { - return func(req *http.Request) error { - if tunneler == nil { - return nil - } - lag := tunneler.SecondsSinceSync() - if lag > 600 { - return fmt.Errorf("tunnel sync is taking too long: %d", lag) - } - sshKeyLag := tunneler.SecondsSinceSSHKeySync() - // Since we are syncing ssh-keys every 5 minutes, the allowed - // lag since last sync should be more than 2x higher than that - // to allow for single failure, which can always happen. - // For now set it to 3x, which is 15 minutes. - // For more details see: http://pr.k8s.io/59347 - if sshKeyLag > 900 { - return fmt.Errorf("SSHKey sync is taking too long: %d", sshKeyLag) - } - return nil - } -} - -type SSHTunneler struct { - // Important: Since these two int64 fields are using sync/atomic, they have to be at the top of the struct due to a bug on 32-bit platforms - // See: https://golang.org/pkg/sync/atomic/ for more information - lastSync int64 // Seconds since Epoch - lastSSHKeySync int64 // Seconds since Epoch - - SSHUser string - SSHKeyfile string - InstallSSHKey InstallSSHKey - HealthCheckURL *url.URL - - tunnels *ssh.SSHTunnelList - clock clock.Clock - - getAddresses AddressFunc - stopChan chan struct{} -} - -func New(sshUser, sshKeyfile string, healthCheckURL *url.URL, installSSHKey InstallSSHKey) Tunneler { - return &SSHTunneler{ - SSHUser: sshUser, - SSHKeyfile: sshKeyfile, - InstallSSHKey: installSSHKey, - HealthCheckURL: healthCheckURL, - clock: clock.RealClock{}, - } -} - -// Run establishes tunnel loops and returns -func (c *SSHTunneler) Run(getAddresses AddressFunc) { - if c.stopChan != nil { - return - } - c.stopChan = make(chan struct{}) - - // Save the address getter - if getAddresses != nil { - c.getAddresses = getAddresses - } - - // Usernames are capped @ 32 - if len(c.SSHUser) > 32 { - klog.Warning("SSH User is too long, truncating to 32 chars") - c.SSHUser = c.SSHUser[0:32] - } - klog.Infof("Setting up proxy: %s %s", c.SSHUser, c.SSHKeyfile) - - // public keyfile is written last, so check for that. - publicKeyFile := c.SSHKeyfile + ".pub" - exists, err := utilpath.Exists(utilpath.CheckFollowSymlink, publicKeyFile) - if err != nil { - klog.Errorf("Error detecting if key exists: %v", err) - } else if !exists { - klog.Infof("Key doesn't exist, attempting to create") - if err := generateSSHKey(c.SSHKeyfile, publicKeyFile); err != nil { - klog.Errorf("Failed to create key pair: %v", err) - } - } - - c.tunnels = ssh.NewSSHTunnelList(c.SSHUser, c.SSHKeyfile, c.HealthCheckURL, c.stopChan) - // Sync loop to ensure that the SSH key has been installed. - c.lastSSHKeySync = c.clock.Now().Unix() - c.installSSHKeySyncLoop(c.SSHUser, publicKeyFile) - // Sync tunnelList w/ nodes. - c.lastSync = c.clock.Now().Unix() - c.nodesSyncLoop() -} - -// Stop gracefully shuts down the tunneler -func (c *SSHTunneler) Stop() { - if c.stopChan != nil { - close(c.stopChan) - c.stopChan = nil - } -} - -func (c *SSHTunneler) Dial(ctx context.Context, net, addr string) (net.Conn, error) { - return c.tunnels.Dial(ctx, net, addr) -} - -func (c *SSHTunneler) SecondsSinceSync() int64 { - now := c.clock.Now().Unix() - then := atomic.LoadInt64(&c.lastSync) - return now - then -} - -func (c *SSHTunneler) SecondsSinceSSHKeySync() int64 { - now := c.clock.Now().Unix() - then := atomic.LoadInt64(&c.lastSSHKeySync) - return now - then -} - -func (c *SSHTunneler) installSSHKeySyncLoop(user, publicKeyfile string) { - go wait.Until(func() { - if c.InstallSSHKey == nil { - klog.Error("Won't attempt to install ssh key: InstallSSHKey function is nil") - return - } - key, err := ssh.ParsePublicKeyFromFile(publicKeyfile) - if err != nil { - klog.Errorf("Failed to load public key: %v", err) - return - } - keyData, err := ssh.EncodeSSHKey(key) - if err != nil { - klog.Errorf("Failed to encode public key: %v", err) - return - } - if err := c.InstallSSHKey(context.TODO(), user, keyData); err != nil { - klog.Errorf("Failed to install ssh key: %v", err) - return - } - atomic.StoreInt64(&c.lastSSHKeySync, c.clock.Now().Unix()) - }, 5*time.Minute, c.stopChan) -} - -// nodesSyncLoop lists nodes every 15 seconds, calling Update() on the TunnelList -// each time (Update() is a noop if no changes are necessary). -func (c *SSHTunneler) nodesSyncLoop() { - // TODO (cjcullen) make this watch. - go wait.Until(func() { - addrs, err := c.getAddresses() - klog.V(4).Infof("Calling update w/ addrs: %v", addrs) - if err != nil { - klog.Errorf("Failed to getAddresses: %v", err) - } - c.tunnels.Update(addrs) - atomic.StoreInt64(&c.lastSync, c.clock.Now().Unix()) - }, 15*time.Second, c.stopChan) -} - -func generateSSHKey(privateKeyfile, publicKeyfile string) error { - private, public, err := ssh.GenerateKey(2048) - if err != nil { - return err - } - // If private keyfile already exists, we must have only made it halfway - // through last time, so delete it. - exists, err := utilpath.Exists(utilpath.CheckFollowSymlink, privateKeyfile) - if err != nil { - klog.Errorf("Error detecting if private key exists: %v", err) - } else if exists { - klog.Infof("Private key exists, but public key does not") - if err := os.Remove(privateKeyfile); err != nil { - klog.Errorf("Failed to remove stale private key: %v", err) - } - } - if err := ioutil.WriteFile(privateKeyfile, ssh.EncodePrivateKey(private), 0600); err != nil { - return err - } - publicKeyBytes, err := ssh.EncodePublicKey(public) - if err != nil { - return err - } - if err := ioutil.WriteFile(publicKeyfile+".tmp", publicKeyBytes, 0600); err != nil { - return err - } - return os.Rename(publicKeyfile+".tmp", publicKeyfile) -} diff --git a/pkg/controlplane/tunneler/ssh_test.go b/pkg/controlplane/tunneler/ssh_test.go deleted file mode 100644 index 2fe8f283944..00000000000 --- a/pkg/controlplane/tunneler/ssh_test.go +++ /dev/null @@ -1,163 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package tunneler - -import ( - "context" - "fmt" - "net" - "os" - "path/filepath" - "testing" - "time" - - "github.com/stretchr/testify/assert" - - "k8s.io/apimachinery/pkg/util/clock" -) - -// TestSecondsSinceSync verifies that proper results are returned -// when checking the time between syncs -func TestSecondsSinceSync(t *testing.T) { - tests := []struct { - name string - lastSync int64 - clock *clock.FakeClock - want int64 - }{ - { - name: "Nano Second. No difference", - lastSync: time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC).Unix(), - clock: clock.NewFakeClock(time.Date(2015, time.January, 1, 1, 1, 1, 2, time.UTC)), - want: int64(0), - }, - { - name: "Second", - lastSync: time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC).Unix(), - clock: clock.NewFakeClock(time.Date(2015, time.January, 1, 1, 1, 2, 1, time.UTC)), - want: int64(1), - }, - { - name: "Minute", - lastSync: time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC).Unix(), - clock: clock.NewFakeClock(time.Date(2015, time.January, 1, 1, 2, 1, 1, time.UTC)), - want: int64(60), - }, - { - name: "Hour", - lastSync: time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC).Unix(), - clock: clock.NewFakeClock(time.Date(2015, time.January, 1, 2, 1, 1, 1, time.UTC)), - want: int64(3600), - }, - { - name: "Day", - lastSync: time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC).Unix(), - clock: clock.NewFakeClock(time.Date(2015, time.January, 2, 1, 1, 1, 1, time.UTC)), - want: int64(86400), - }, - { - name: "Month", - lastSync: time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC).Unix(), - clock: clock.NewFakeClock(time.Date(2015, time.February, 1, 1, 1, 1, 1, time.UTC)), - want: int64(2678400), - }, - { - name: "Future Month. Should be -Month", - lastSync: time.Date(2015, time.February, 1, 1, 1, 1, 1, time.UTC).Unix(), - clock: clock.NewFakeClock(time.Date(2015, time.January, 1, 1, 1, 1, 2, time.UTC)), - want: int64(-2678400), - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tunneler := &SSHTunneler{} - assert := assert.New(t) - tunneler.lastSync = tt.lastSync - tunneler.clock = tt.clock - assert.Equal(int64(tt.want), tunneler.SecondsSinceSync()) - }) - } - -} - -// generateTempFile creates a temporary file path -func generateTempFilePath(prefix string) string { - tmpPath, _ := filepath.Abs(fmt.Sprintf("%s/%s-%d", os.TempDir(), prefix, time.Now().Unix())) - return tmpPath -} - -// TestGenerateSSHKey verifies that SSH key generation does indeed -// generate keys even with keys already exist. -func TestGenerateSSHKey(t *testing.T) { - assert := assert.New(t) - - privateKey := generateTempFilePath("private") - publicKey := generateTempFilePath("public") - - // Make sure we have no test keys laying around - os.Remove(privateKey) - os.Remove(publicKey) - - // Pass case: Sunny day case - err := generateSSHKey(privateKey, publicKey) - assert.NoError(err, "generateSSHKey should not have retuend an error: %s", err) - - // Pass case: PrivateKey exists test case - os.Remove(publicKey) - err = generateSSHKey(privateKey, publicKey) - assert.NoError(err, "generateSSHKey should not have retuend an error: %s", err) - - // Pass case: PublicKey exists test case - os.Remove(privateKey) - err = generateSSHKey(privateKey, publicKey) - assert.NoError(err, "generateSSHKey should not have retuend an error: %s", err) - - // Make sure we have no test keys laying around - os.Remove(privateKey) - os.Remove(publicKey) - - // TODO: testing error cases where the file can not be removed? -} - -type FakeTunneler struct { - SecondsSinceSyncValue int64 - SecondsSinceSSHKeySyncValue int64 -} - -func (t *FakeTunneler) Run(AddressFunc) {} -func (t *FakeTunneler) Stop() {} -func (t *FakeTunneler) Dial(ctx context.Context, net, addr string) (net.Conn, error) { return nil, nil } -func (t *FakeTunneler) SecondsSinceSync() int64 { return t.SecondsSinceSyncValue } -func (t *FakeTunneler) SecondsSinceSSHKeySync() int64 { return t.SecondsSinceSSHKeySyncValue } - -// TestIsTunnelSyncHealthy verifies that the 600 second lag test -// is honored. -func TestIsTunnelSyncHealthy(t *testing.T) { - tunneler := &FakeTunneler{} - - // Pass case: 540 second lag - tunneler.SecondsSinceSyncValue = 540 - healthFn := TunnelSyncHealthChecker(tunneler) - err := healthFn(nil) - assert.NoError(t, err, "IsTunnelSyncHealthy() should not have returned an error.") - - // Fail case: 720 second lag - tunneler.SecondsSinceSyncValue = 720 - err = healthFn(nil) - assert.Error(t, err, "IsTunnelSyncHealthy() should have returned an error.") -} diff --git a/pkg/ssh/ssh.go b/pkg/ssh/ssh.go deleted file mode 100644 index 4698972abc4..00000000000 --- a/pkg/ssh/ssh.go +++ /dev/null @@ -1,514 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// WARNING: DO NOT add new use-caes to this package as it is deprecated and slated for deletion. - -package ssh - -import ( - "bytes" - "context" - "crypto/rand" - "crypto/rsa" - "crypto/tls" - "crypto/x509" - "encoding/pem" - "errors" - "fmt" - "io/ioutil" - mathrand "math/rand" - "net" - "net/http" - "net/url" - "os" - "strings" - "sync" - "time" - - "golang.org/x/crypto/ssh" - - utilnet "k8s.io/apimachinery/pkg/util/net" - "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/component-base/metrics" - "k8s.io/component-base/metrics/legacyregistry" - "k8s.io/klog/v2" -) - -/* - * By default, all the following metrics are defined as falling under - * ALPHA stability level https://github.com/kubernetes/enhancements/blob/master/keps/sig-instrumentation/1209-metrics-stability/kubernetes-control-plane-metrics-stability.md#stability-classes) - * - * Promoting the stability level of the metric is a responsibility of the component owner, since it - * involves explicitly acknowledging support for the metric across multiple releases, in accordance with - * the metric stability policy. - */ -var ( - tunnelOpenCounter = metrics.NewCounter( - &metrics.CounterOpts{ - Name: "ssh_tunnel_open_count", - Help: "Counter of ssh tunnel total open attempts", - StabilityLevel: metrics.ALPHA, - }, - ) - tunnelOpenFailCounter = metrics.NewCounter( - &metrics.CounterOpts{ - Name: "ssh_tunnel_open_fail_count", - Help: "Counter of ssh tunnel failed open attempts", - StabilityLevel: metrics.ALPHA, - }, - ) -) - -func init() { - legacyregistry.MustRegister(tunnelOpenCounter) - legacyregistry.MustRegister(tunnelOpenFailCounter) -} - -// TODO: Unit tests for this code, we can spin up a test SSH server with instructions here: -// https://godoc.org/golang.org/x/crypto/ssh#ServerConn -type sshTunnel struct { - Config *ssh.ClientConfig - Host string - SSHPort string - client *ssh.Client -} - -func makeSSHTunnel(user string, signer ssh.Signer, host string) (*sshTunnel, error) { - config := ssh.ClientConfig{ - User: user, - Auth: []ssh.AuthMethod{ssh.PublicKeys(signer)}, - HostKeyCallback: ssh.InsecureIgnoreHostKey(), - } - return &sshTunnel{ - Config: &config, - Host: host, - SSHPort: "22", - }, nil -} - -func (s *sshTunnel) Open() error { - var err error - s.client, err = realTimeoutDialer.Dial("tcp", net.JoinHostPort(s.Host, s.SSHPort), s.Config) - tunnelOpenCounter.Inc() - if err != nil { - tunnelOpenFailCounter.Inc() - } - return err -} - -func (s *sshTunnel) Dial(ctx context.Context, network, address string) (net.Conn, error) { - if s.client == nil { - return nil, errors.New("tunnel is not opened.") - } - // This Dial method does not allow to pass a context unfortunately - return s.client.Dial(network, address) -} - -func (s *sshTunnel) Close() error { - if s.client == nil { - return errors.New("Cannot close tunnel. Tunnel was not opened.") - } - if err := s.client.Close(); err != nil { - return err - } - return nil -} - -// Interface to allow mocking of ssh.Dial, for testing SSH -type sshDialer interface { - Dial(network, addr string, config *ssh.ClientConfig) (*ssh.Client, error) -} - -// Real implementation of sshDialer -type realSSHDialer struct{} - -var _ sshDialer = &realSSHDialer{} - -func (d *realSSHDialer) Dial(network, addr string, config *ssh.ClientConfig) (*ssh.Client, error) { - conn, err := net.DialTimeout(network, addr, config.Timeout) - if err != nil { - return nil, err - } - conn.SetReadDeadline(time.Now().Add(30 * time.Second)) - c, chans, reqs, err := ssh.NewClientConn(conn, addr, config) - if err != nil { - return nil, err - } - conn.SetReadDeadline(time.Time{}) - return ssh.NewClient(c, chans, reqs), nil -} - -// timeoutDialer wraps an sshDialer with a timeout around Dial(). The golang -// ssh library can hang indefinitely inside the Dial() call (see issue #23835). -// Wrapping all Dial() calls with a conservative timeout provides safety against -// getting stuck on that. -type timeoutDialer struct { - dialer sshDialer - timeout time.Duration -} - -// 150 seconds is longer than the underlying default TCP backoff delay (127 -// seconds). This timeout is only intended to catch otherwise uncaught hangs. -const sshDialTimeout = 150 * time.Second - -var realTimeoutDialer sshDialer = &timeoutDialer{&realSSHDialer{}, sshDialTimeout} - -func (d *timeoutDialer) Dial(network, addr string, config *ssh.ClientConfig) (*ssh.Client, error) { - config.Timeout = d.timeout - return d.dialer.Dial(network, addr, config) -} - -// RunSSHCommand returns the stdout, stderr, and exit code from running cmd on -// host as specific user, along with any SSH-level error. -// If user=="", it will default (like SSH) to os.Getenv("USER") -func RunSSHCommand(cmd, user, host string, signer ssh.Signer) (string, string, int, error) { - return runSSHCommand(realTimeoutDialer, cmd, user, host, signer, true) -} - -// Internal implementation of runSSHCommand, for testing -func runSSHCommand(dialer sshDialer, cmd, user, host string, signer ssh.Signer, retry bool) (string, string, int, error) { - if user == "" { - user = os.Getenv("USER") - } - // Setup the config, dial the server, and open a session. - config := &ssh.ClientConfig{ - User: user, - Auth: []ssh.AuthMethod{ssh.PublicKeys(signer)}, - HostKeyCallback: ssh.InsecureIgnoreHostKey(), - } - client, err := dialer.Dial("tcp", host, config) - if err != nil && retry { - err = wait.Poll(5*time.Second, 20*time.Second, func() (bool, error) { - fmt.Printf("error dialing %s@%s: '%v', retrying\n", user, host, err) - if client, err = dialer.Dial("tcp", host, config); err != nil { - return false, err - } - return true, nil - }) - } - if err != nil { - return "", "", 0, fmt.Errorf("error getting SSH client to %s@%s: '%v'", user, host, err) - } - defer client.Close() - session, err := client.NewSession() - if err != nil { - return "", "", 0, fmt.Errorf("error creating session to %s@%s: '%v'", user, host, err) - } - defer session.Close() - - // Run the command. - code := 0 - var bout, berr bytes.Buffer - session.Stdout, session.Stderr = &bout, &berr - if err = session.Run(cmd); err != nil { - // Check whether the command failed to run or didn't complete. - if exiterr, ok := err.(*ssh.ExitError); ok { - // If we got an ExitError and the exit code is nonzero, we'll - // consider the SSH itself successful (just that the command run - // errored on the host). - if code = exiterr.ExitStatus(); code != 0 { - err = nil - } - } else { - // Some other kind of error happened (e.g. an IOError); consider the - // SSH unsuccessful. - err = fmt.Errorf("failed running `%s` on %s@%s: '%v'", cmd, user, host, err) - } - } - return bout.String(), berr.String(), code, err -} - -func MakePrivateKeySignerFromFile(key string) (ssh.Signer, error) { - // Create an actual signer. - buffer, err := ioutil.ReadFile(key) - if err != nil { - return nil, fmt.Errorf("error reading SSH key %s: '%v'", key, err) - } - return MakePrivateKeySignerFromBytes(buffer) -} - -func MakePrivateKeySignerFromBytes(buffer []byte) (ssh.Signer, error) { - signer, err := ssh.ParsePrivateKey(buffer) - if err != nil { - return nil, fmt.Errorf("error parsing SSH key: '%v'", err) - } - return signer, nil -} - -func ParsePublicKeyFromFile(keyFile string) (*rsa.PublicKey, error) { - buffer, err := ioutil.ReadFile(keyFile) - if err != nil { - return nil, fmt.Errorf("error reading SSH key %s: '%v'", keyFile, err) - } - keyBlock, _ := pem.Decode(buffer) - if keyBlock == nil { - return nil, fmt.Errorf("error parsing SSH key %s: 'invalid PEM format'", keyFile) - } - key, err := x509.ParsePKIXPublicKey(keyBlock.Bytes) - if err != nil { - return nil, fmt.Errorf("error parsing SSH key %s: '%v'", keyFile, err) - } - rsaKey, ok := key.(*rsa.PublicKey) - if !ok { - return nil, fmt.Errorf("SSH key could not be parsed as rsa public key") - } - return rsaKey, nil -} - -type tunnel interface { - Open() error - Close() error - Dial(ctx context.Context, network, address string) (net.Conn, error) -} - -type sshTunnelEntry struct { - Address string - Tunnel tunnel -} - -type sshTunnelCreator interface { - newSSHTunnel(user, keyFile, host string) (tunnel, error) -} - -type realTunnelCreator struct{} - -func (*realTunnelCreator) newSSHTunnel(user, keyFile, host string) (tunnel, error) { - signer, err := MakePrivateKeySignerFromFile(keyFile) - if err != nil { - return nil, err - } - return makeSSHTunnel(user, signer, host) -} - -type SSHTunnelList struct { - entries []sshTunnelEntry - adding map[string]bool - tunnelCreator sshTunnelCreator - tunnelsLock sync.Mutex - - user string - keyfile string - healthCheckURL *url.URL -} - -func NewSSHTunnelList(user, keyfile string, healthCheckURL *url.URL, stopChan chan struct{}) *SSHTunnelList { - l := &SSHTunnelList{ - adding: make(map[string]bool), - tunnelCreator: &realTunnelCreator{}, - user: user, - keyfile: keyfile, - healthCheckURL: healthCheckURL, - } - healthCheckPoll := 1 * time.Minute - go wait.Until(func() { - l.tunnelsLock.Lock() - defer l.tunnelsLock.Unlock() - // Healthcheck each tunnel every minute - numTunnels := len(l.entries) - for i, entry := range l.entries { - // Stagger healthchecks evenly across duration of healthCheckPoll. - delay := healthCheckPoll * time.Duration(i) / time.Duration(numTunnels) - l.delayedHealthCheck(entry, delay) - } - }, healthCheckPoll, stopChan) - return l -} - -func (l *SSHTunnelList) delayedHealthCheck(e sshTunnelEntry, delay time.Duration) { - go func() { - defer runtime.HandleCrash() - time.Sleep(delay) - if err := l.healthCheck(e); err != nil { - klog.Errorf("Healthcheck failed for tunnel to %q: %v", e.Address, err) - klog.Infof("Attempting once to re-establish tunnel to %q", e.Address) - l.removeAndReAdd(e) - } - }() -} - -func (l *SSHTunnelList) healthCheck(e sshTunnelEntry) error { - // GET the healthcheck path using the provided tunnel's dial function. - transport := utilnet.SetTransportDefaults(&http.Transport{ - DialContext: e.Tunnel.Dial, - // TODO(cjcullen): Plumb real TLS options through. - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - // We don't reuse the clients, so disable the keep-alive to properly - // close the connection. - DisableKeepAlives: true, - }) - client := &http.Client{Transport: transport} - resp, err := client.Get(l.healthCheckURL.String()) - if err != nil { - return err - } - resp.Body.Close() - return nil -} - -func (l *SSHTunnelList) removeAndReAdd(e sshTunnelEntry) { - // Find the entry to replace. - l.tunnelsLock.Lock() - for i, entry := range l.entries { - if entry.Tunnel == e.Tunnel { - l.entries = append(l.entries[:i], l.entries[i+1:]...) - l.adding[e.Address] = true - break - } - } - l.tunnelsLock.Unlock() - if err := e.Tunnel.Close(); err != nil { - klog.Infof("Failed to close removed tunnel: %v", err) - } - go l.createAndAddTunnel(e.Address) -} - -func (l *SSHTunnelList) Dial(ctx context.Context, net, addr string) (net.Conn, error) { - start := time.Now() - id := mathrand.Int63() // So you can match begins/ends in the log. - klog.Infof("[%x: %v] Dialing...", id, addr) - defer func() { - klog.Infof("[%x: %v] Dialed in %v.", id, addr, time.Since(start)) - }() - tunnel, err := l.pickTunnel(strings.Split(addr, ":")[0]) - if err != nil { - return nil, err - } - return tunnel.Dial(ctx, net, addr) -} - -func (l *SSHTunnelList) pickTunnel(addr string) (tunnel, error) { - l.tunnelsLock.Lock() - defer l.tunnelsLock.Unlock() - if len(l.entries) == 0 { - return nil, fmt.Errorf("No SSH tunnels currently open. Were the targets able to accept an ssh-key for user %q?", l.user) - } - // Prefer same tunnel as kubelet - for _, entry := range l.entries { - if entry.Address == addr { - return entry.Tunnel, nil - } - } - klog.Warningf("SSH tunnel not found for address %q, picking random node", addr) - n := mathrand.Intn(len(l.entries)) - return l.entries[n].Tunnel, nil -} - -// Update reconciles the list's entries with the specified addresses. Existing -// tunnels that are not in addresses are removed from entries and closed in a -// background goroutine. New tunnels specified in addresses are opened in a -// background goroutine and then added to entries. -func (l *SSHTunnelList) Update(addrs []string) { - haveAddrsMap := make(map[string]bool) - wantAddrsMap := make(map[string]bool) - func() { - l.tunnelsLock.Lock() - defer l.tunnelsLock.Unlock() - // Build a map of what we currently have. - for i := range l.entries { - haveAddrsMap[l.entries[i].Address] = true - } - // Determine any necessary additions. - for i := range addrs { - // Add tunnel if it is not in l.entries or l.adding - if _, ok := haveAddrsMap[addrs[i]]; !ok { - if _, ok := l.adding[addrs[i]]; !ok { - l.adding[addrs[i]] = true - addr := addrs[i] - go func() { - defer runtime.HandleCrash() - // Actually adding tunnel to list will block until lock - // is released after deletions. - l.createAndAddTunnel(addr) - }() - } - } - wantAddrsMap[addrs[i]] = true - } - // Determine any necessary deletions. - var newEntries []sshTunnelEntry - for i := range l.entries { - if _, ok := wantAddrsMap[l.entries[i].Address]; !ok { - tunnelEntry := l.entries[i] - klog.Infof("Removing tunnel to deleted node at %q", tunnelEntry.Address) - go func() { - defer runtime.HandleCrash() - if err := tunnelEntry.Tunnel.Close(); err != nil { - klog.Errorf("Failed to close tunnel to %q: %v", tunnelEntry.Address, err) - } - }() - } else { - newEntries = append(newEntries, l.entries[i]) - } - } - l.entries = newEntries - }() -} - -func (l *SSHTunnelList) createAndAddTunnel(addr string) { - klog.Infof("Trying to add tunnel to %q", addr) - tunnel, err := l.tunnelCreator.newSSHTunnel(l.user, l.keyfile, addr) - if err != nil { - klog.Errorf("Failed to create tunnel for %q: %v", addr, err) - return - } - if err := tunnel.Open(); err != nil { - klog.Errorf("Failed to open tunnel to %q: %v", addr, err) - l.tunnelsLock.Lock() - delete(l.adding, addr) - l.tunnelsLock.Unlock() - return - } - l.tunnelsLock.Lock() - l.entries = append(l.entries, sshTunnelEntry{addr, tunnel}) - delete(l.adding, addr) - l.tunnelsLock.Unlock() - klog.Infof("Successfully added tunnel for %q", addr) -} - -func EncodePrivateKey(private *rsa.PrivateKey) []byte { - return pem.EncodeToMemory(&pem.Block{ - Bytes: x509.MarshalPKCS1PrivateKey(private), - Type: "RSA PRIVATE KEY", - }) -} - -func EncodePublicKey(public *rsa.PublicKey) ([]byte, error) { - publicBytes, err := x509.MarshalPKIXPublicKey(public) - if err != nil { - return nil, err - } - return pem.EncodeToMemory(&pem.Block{ - Bytes: publicBytes, - Type: "PUBLIC KEY", - }), nil -} - -func EncodeSSHKey(public *rsa.PublicKey) ([]byte, error) { - publicKey, err := ssh.NewPublicKey(public) - if err != nil { - return nil, err - } - return ssh.MarshalAuthorizedKey(publicKey), nil -} - -func GenerateKey(bits int) (*rsa.PrivateKey, *rsa.PublicKey, error) { - private, err := rsa.GenerateKey(rand.Reader, bits) - if err != nil { - return nil, nil, err - } - return private, &private.PublicKey, nil -} diff --git a/pkg/ssh/ssh_test.go b/pkg/ssh/ssh_test.go deleted file mode 100644 index 589aeb2950d..00000000000 --- a/pkg/ssh/ssh_test.go +++ /dev/null @@ -1,365 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package ssh - -import ( - "context" - "fmt" - "io" - "net" - "os" - "reflect" - "strings" - "testing" - "time" - - "golang.org/x/crypto/ssh" - - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/klog/v2" -) - -type testSSHServer struct { - Host string - Port string - Type string - Data []byte - PrivateKey []byte - PublicKey []byte -} - -func runTestSSHServer(user, password string) (*testSSHServer, error) { - result := &testSSHServer{} - // Largely derived from https://godoc.org/golang.org/x/crypto/ssh#example-NewServerConn - config := &ssh.ServerConfig{ - PasswordCallback: func(c ssh.ConnMetadata, pass []byte) (*ssh.Permissions, error) { - if c.User() == user && string(pass) == password { - return nil, nil - } - return nil, fmt.Errorf("password rejected for %s", c.User()) - }, - PublicKeyCallback: func(c ssh.ConnMetadata, key ssh.PublicKey) (*ssh.Permissions, error) { - result.Type = key.Type() - result.Data = ssh.MarshalAuthorizedKey(key) - return nil, nil - }, - } - - privateKey, publicKey, err := GenerateKey(2048) - if err != nil { - return nil, err - } - privateBytes := EncodePrivateKey(privateKey) - signer, err := ssh.ParsePrivateKey(privateBytes) - if err != nil { - return nil, err - } - config.AddHostKey(signer) - result.PrivateKey = privateBytes - - publicBytes, err := EncodePublicKey(publicKey) - if err != nil { - return nil, err - } - result.PublicKey = publicBytes - - listener, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - return nil, err - } - - host, port, err := net.SplitHostPort(listener.Addr().String()) - if err != nil { - return nil, err - } - result.Host = host - result.Port = port - go func() { - // TODO: return this port. - defer listener.Close() - - conn, err := listener.Accept() - if err != nil { - klog.Errorf("Failed to accept: %v", err) - } - _, chans, reqs, err := ssh.NewServerConn(conn, config) - if err != nil { - klog.Errorf("Failed handshake: %v", err) - } - go ssh.DiscardRequests(reqs) - for newChannel := range chans { - if newChannel.ChannelType() != "direct-tcpip" { - newChannel.Reject(ssh.UnknownChannelType, fmt.Sprintf("unknown channel type: %s", newChannel.ChannelType())) - continue - } - channel, requests, err := newChannel.Accept() - if err != nil { - klog.Errorf("Failed to accept channel: %v", err) - } - - for req := range requests { - klog.Infof("Got request: %v", req) - } - - channel.Close() - } - }() - return result, nil -} - -func TestSSHTunnel(t *testing.T) { - private, public, err := GenerateKey(2048) - if err != nil { - t.Errorf("unexpected error: %v", err) - t.FailNow() - } - server, err := runTestSSHServer("foo", "bar") - if err != nil { - t.Errorf("unexpected error: %v", err) - t.FailNow() - } - - privateData := EncodePrivateKey(private) - tunnel, err := newSSHTunnelFromBytes("foo", privateData, server.Host) - if err != nil { - t.Errorf("unexpected error: %v", err) - t.FailNow() - } - tunnel.SSHPort = server.Port - - if err := tunnel.Open(); err != nil { - t.Errorf("unexpected error: %v", err) - t.FailNow() - } - - _, err = tunnel.Dial(context.Background(), "tcp", "127.0.0.1:8080") - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if server.Type != "ssh-rsa" { - t.Errorf("expected %s, got %s", "ssh-rsa", server.Type) - } - - publicData, err := EncodeSSHKey(public) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if !reflect.DeepEqual(server.Data, publicData) { - t.Errorf("expected %s, got %s", string(server.Data), string(privateData)) - } - - if err := tunnel.Close(); err != nil { - t.Errorf("unexpected error: %v", err) - } -} - -type fakeTunnel struct{} - -func (*fakeTunnel) Open() error { - return nil -} - -func (*fakeTunnel) Close() error { - return nil -} - -func (*fakeTunnel) Dial(ctx context.Context, network, address string) (net.Conn, error) { - return nil, nil -} - -type fakeTunnelCreator struct{} - -func (*fakeTunnelCreator) newSSHTunnel(string, string, string) (tunnel, error) { - return &fakeTunnel{}, nil -} - -func TestSSHTunnelListUpdate(t *testing.T) { - // Start with an empty tunnel list. - l := &SSHTunnelList{ - adding: make(map[string]bool), - tunnelCreator: &fakeTunnelCreator{}, - } - - // Start with 2 tunnels. - addressStrings := []string{"1.2.3.4", "5.6.7.8"} - l.Update(addressStrings) - checkTunnelsCorrect(t, l, addressStrings) - - // Add another tunnel. - addressStrings = append(addressStrings, "9.10.11.12") - l.Update(addressStrings) - checkTunnelsCorrect(t, l, addressStrings) - - // Go down to a single tunnel. - addressStrings = []string{"1.2.3.4"} - l.Update(addressStrings) - checkTunnelsCorrect(t, l, addressStrings) - - // Replace w/ all new tunnels. - addressStrings = []string{"21.22.23.24", "25.26.27.28"} - l.Update(addressStrings) - checkTunnelsCorrect(t, l, addressStrings) - - // Call update with the same tunnels. - l.Update(addressStrings) - checkTunnelsCorrect(t, l, addressStrings) -} - -func checkTunnelsCorrect(t *testing.T, tunnelList *SSHTunnelList, addresses []string) { - if err := wait.Poll(100*time.Millisecond, 2*time.Second, func() (bool, error) { - return hasCorrectTunnels(tunnelList, addresses), nil - }); err != nil { - t.Errorf("Error waiting for tunnels to reach expected state: %v. Expected %v, had %v", err, addresses, tunnelList) - } -} - -func hasCorrectTunnels(tunnelList *SSHTunnelList, addresses []string) bool { - tunnelList.tunnelsLock.Lock() - defer tunnelList.tunnelsLock.Unlock() - wantMap := make(map[string]bool) - for _, addr := range addresses { - wantMap[addr] = true - } - haveMap := make(map[string]bool) - for _, entry := range tunnelList.entries { - if wantMap[entry.Address] == false { - return false - } - haveMap[entry.Address] = true - } - for _, addr := range addresses { - if haveMap[addr] == false { - return false - } - } - return true -} - -type mockSSHDialer struct { - network string - addr string - config *ssh.ClientConfig -} - -func (d *mockSSHDialer) Dial(network, addr string, config *ssh.ClientConfig) (*ssh.Client, error) { - d.network = network - d.addr = addr - d.config = config - return nil, fmt.Errorf("mock error from Dial") -} - -type mockSigner struct { -} - -func (s *mockSigner) PublicKey() ssh.PublicKey { - panic("mockSigner.PublicKey not implemented") -} - -func (s *mockSigner) Sign(rand io.Reader, data []byte) (*ssh.Signature, error) { - panic("mockSigner.Sign not implemented") -} - -func TestSSHUser(t *testing.T) { - signer := &mockSigner{} - - table := []struct { - title string - user string - host string - signer ssh.Signer - command string - expectUser string - }{ - { - title: "all values provided", - user: "testuser", - host: "testhost", - signer: signer, - command: "uptime", - expectUser: "testuser", - }, - { - title: "empty user defaults to GetEnv(USER)", - user: "", - host: "testhost", - signer: signer, - command: "uptime", - expectUser: os.Getenv("USER"), - }, - } - - for _, item := range table { - dialer := &mockSSHDialer{} - - _, _, _, err := runSSHCommand(dialer, item.command, item.user, item.host, item.signer, false) - if err == nil { - t.Errorf("expected error (as mock returns error); did not get one") - } - errString := err.Error() - if !strings.HasPrefix(errString, fmt.Sprintf("error getting SSH client to %s@%s:", item.expectUser, item.host)) { - t.Errorf("unexpected error: %v", errString) - } - - if dialer.network != "tcp" { - t.Errorf("unexpected network: %v", dialer.network) - } - - if dialer.config.User != item.expectUser { - t.Errorf("unexpected user: %v", dialer.config.User) - } - if len(dialer.config.Auth) != 1 { - t.Errorf("unexpected auth: %v", dialer.config.Auth) - } - // (No way to test Auth - nothing exported?) - - } - -} - -func TestTimeoutDialer(t *testing.T) { - listener, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Errorf("unexpected error: %v", err) - t.FailNow() - } - - testCases := []struct { - timeout time.Duration - expectedErrString string - }{ - // delay > timeout should cause ssh.Dial to timeout. - {1, "i/o timeout"}, - } - for _, tc := range testCases { - dialer := &timeoutDialer{&realSSHDialer{}, tc.timeout} - _, err := dialer.Dial("tcp", listener.Addr().String(), &ssh.ClientConfig{}) - if len(tc.expectedErrString) == 0 && err != nil || - !strings.Contains(fmt.Sprint(err), tc.expectedErrString) { - t.Errorf("Expected error to contain %q; got %v", tc.expectedErrString, err) - } - } - - listener.Close() -} - -func newSSHTunnelFromBytes(user string, privateKey []byte, host string) (*sshTunnel, error) { - signer, err := MakePrivateKeySignerFromBytes(privateKey) - if err != nil { - return nil, err - } - return makeSSHTunnel(user, signer, host) -} diff --git a/test/integration/framework/test_server.go b/test/integration/framework/test_server.go index 68c0bc67390..a5974189f9d 100644 --- a/test/integration/framework/test_server.go +++ b/test/integration/framework/test_server.go @@ -135,11 +135,7 @@ func StartTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup t.Fatalf("failed to validate ServerRunOptions: %v", utilerrors.NewAggregate(errs)) } - tunneler, proxyTransport, err := app.CreateNodeDialer(completedOptions) - if err != nil { - t.Fatal(err) - } - kubeAPIServerConfig, _, _, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport) + kubeAPIServerConfig, _, _, err := app.CreateKubeAPIServerConfig(completedOptions) if err != nil { t.Fatal(err) }