From 04eb90a5d488de787ebe99fea4d394f1a53c6450 Mon Sep 17 00:00:00 2001 From: CJ Cullen Date: Fri, 4 Dec 2015 18:01:29 -0800 Subject: [PATCH] Make tunneler hold tunnels open and healthcheck vs. reopening every 5 minutes. Also add a test for the Update() logic. Reordered tunnels vs. storage initialization (prevent a nil ptr panic) --- cmd/kube-apiserver/app/server.go | 14 +- cmd/kubelet/app/options/options.go | 4 +- cmd/kubelet/app/server.go | 4 +- docs/admin/kubelet.md | 4 +- pkg/master/master.go | 28 ++-- pkg/master/tunneler.go | 146 +++++------------ pkg/master/tunneler_test.go | 20 +-- pkg/util/ssh.go | 247 ++++++++++++++++++++--------- pkg/util/ssh_test.go | 85 ++++++++++ 9 files changed, 331 insertions(+), 221 deletions(-) diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 8e72b113af2..f041af03e59 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -23,6 +23,7 @@ import ( "crypto/tls" "fmt" "net" + "net/url" "os" "strconv" "strings" @@ -211,9 +212,18 @@ func Run(s *options.APIServer) error { installSSH = instances.AddSSHKeyToAllInstances } } - + if s.KubeletConfig.Port == 0 { + glog.Fatalf("Must enable kubelet port if proxy ssh-tunneling is specified.") + } // Set up the tunneler - tunneler = master.NewSSHTunneler(s.SSHUser, s.SSHKeyfile, installSSH) + // TODO(cjcullen): If we want this to handle per-kubelet ports or other + // kubelet listen-addresses, we need to plumb through options. + healthCheckPath := &url.URL{ + Scheme: "https", + Host: net.JoinHostPort("127.0.0.1", strconv.FormatUint(uint64(s.KubeletConfig.Port), 10)), + Path: "healthz", + } + tunneler = master.NewSSHTunneler(s.SSHUser, s.SSHKeyfile, healthCheckPath, installSSH) // Use the tunneler's dialer to connect to the kubelet s.KubeletConfig.Dial = tunneler.Dial diff --git a/cmd/kubelet/app/options/options.go b/cmd/kubelet/app/options/options.go index 58e4729e280..5c76945c36c 100644 --- a/cmd/kubelet/app/options/options.go +++ b/cmd/kubelet/app/options/options.go @@ -114,7 +114,7 @@ func NewKubeletServer() *KubeletServer { RktStage1Image: "", RootDirectory: defaultRootDir, SerializeImagePulls: true, - StreamingConnectionIdleTimeout: unversioned.Duration{5 * time.Minute}, + StreamingConnectionIdleTimeout: unversioned.Duration{4 * time.Hour}, SyncFrequency: unversioned.Duration{1 * time.Minute}, SystemContainer: "", ReconcileCIDR: true, @@ -174,7 +174,7 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.ClusterDomain, "cluster-domain", s.ClusterDomain, "Domain for this cluster. If set, kubelet will configure all containers to search this domain in addition to the host's search domains") fs.StringVar(&s.MasterServiceNamespace, "master-service-namespace", s.MasterServiceNamespace, "The namespace from which the kubernetes master services should be injected into pods") fs.StringVar(&s.ClusterDNS, "cluster-dns", s.ClusterDNS, "IP address for a cluster DNS server. If set, kubelet will configure all containers to use this for DNS resolution in addition to the host's DNS servers") - fs.DurationVar(&s.StreamingConnectionIdleTimeout.Duration, "streaming-connection-idle-timeout", s.StreamingConnectionIdleTimeout.Duration, "Maximum time a streaming connection can be idle before the connection is automatically closed. Example: '5m'") + fs.DurationVar(&s.StreamingConnectionIdleTimeout.Duration, "streaming-connection-idle-timeout", s.StreamingConnectionIdleTimeout.Duration, "Maximum time a streaming connection can be idle before the connection is automatically closed. 0 indicates no timeout. Example: '5m'") fs.DurationVar(&s.NodeStatusUpdateFrequency.Duration, "node-status-update-frequency", s.NodeStatusUpdateFrequency.Duration, "Specifies how often kubelet posts node status to master. Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod in nodecontroller. Default: 10s") bindableNodeLabels := util.ConfigurationMap(s.NodeLabels) fs.Var(&bindableNodeLabels, "node-labels", " Labels to add when registering the node in the cluster. Labels must are key=value pairs seperated by ','.") diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 523cc315919..13ab9f75a7d 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -596,10 +596,10 @@ func RunKubelet(kcfg *KubeletConfig) error { if _, err := k.RunOnce(podCfg.Updates()); err != nil { return fmt.Errorf("runonce failed: %v", err) } - glog.Infof("Started kubelet as runonce") + glog.Info("Started kubelet as runonce") } else { startKubelet(k, podCfg, kcfg) - glog.Infof("Started kubelet") + glog.Info("Started kubelet") } return nil } diff --git a/docs/admin/kubelet.md b/docs/admin/kubelet.md index 9d51184eac7..d4b0f509eb0 100644 --- a/docs/admin/kubelet.md +++ b/docs/admin/kubelet.md @@ -137,7 +137,7 @@ kubelet --root-dir="/var/lib/kubelet": Directory path for managing kubelet files (volume mounts,etc). --runonce[=false]: If true, exit after spawning pods from local manifests or remote urls. Exclusive with --api-servers, and --enable-server --serialize-image-pulls[=true]: Pull images one at a time. We recommend *not* changing the default value on nodes that run docker daemon with version < 1.9 or an Aufs storage backend. Issue #10959 has more details. [default=true] - --streaming-connection-idle-timeout=5m0s: Maximum time a streaming connection can be idle before the connection is automatically closed. Example: '5m' + --streaming-connection-idle-timeout=4h0m0s: Maximum time a streaming connection can be idle before the connection is automatically closed. 0 indicates no timeout. Example: '5m' --sync-frequency=1m0s: Max period between synchronizing running containers and config --system-container="": Optional resource-only container in which to place all non-kernel processes that are not already in a container. Empty for no container. Rolling back the flag requires a reboot. (Default: ""). --system-reserved=: A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=150G) pairs that describe resources reserved for non-kubernetes components. Currently only cpu and memory are supported. See http://releases.k8s.io/HEAD/docs/user-guide/compute-resources.html for more detail. [default=none] @@ -146,7 +146,7 @@ kubelet --volume-plugin-dir="/usr/libexec/kubernetes/kubelet-plugins/volume/exec/": The full path of the directory in which to search for additional third party volume plugins ``` -###### Auto generated by spf13/cobra on 21-Jan-2016 +###### Auto generated by spf13/cobra on 29-Jan-2016 diff --git a/pkg/master/master.go b/pkg/master/master.go index df7db628f53..a0acc6944ae 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -161,20 +161,6 @@ func New(c *Config) *Master { func (m *Master) InstallAPIs(c *Config) { apiGroupsInfo := []genericapiserver.APIGroupInfo{} - // Run the tunnel. - healthzChecks := []healthz.HealthzChecker{} - if m.tunneler != nil { - m.tunneler.Run(m.getNodeAddresses) - healthzChecks = append(healthzChecks, healthz.NamedCheck("SSH Tunnel Check", m.IsTunnelSyncHealthy)) - prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "apiserver_proxy_tunnel_sync_latency_secs", - Help: "The time since the last successful synchronization of the SSH tunnels for proxy requests.", - }, func() float64 { return float64(m.tunneler.SecondsSinceSync()) }) - } - - // TODO(nikhiljindal): Refactor generic parts of support services (like /versions) to genericapiserver. - apiserver.InstallSupport(m.MuxHelper, m.RootWebService, c.EnableProfiling, healthzChecks...) - // Install v1 unless disabled. if !m.ApiGroupVersionOverrides["api/v1"].Disable { // Install v1 API. @@ -192,6 +178,20 @@ func (m *Master) InstallAPIs(c *Config) { apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo) } + // Run the tunneler. + healthzChecks := []healthz.HealthzChecker{} + if m.tunneler != nil { + m.tunneler.Run(m.getNodeAddresses) + healthzChecks = append(healthzChecks, healthz.NamedCheck("SSH Tunnel Check", m.IsTunnelSyncHealthy)) + prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "apiserver_proxy_tunnel_sync_latency_secs", + Help: "The time since the last successful synchronization of the SSH tunnels for proxy requests.", + }, func() float64 { return float64(m.tunneler.SecondsSinceSync()) }) + } + + // TODO(nikhiljindal): Refactor generic parts of support services (like /versions) to genericapiserver. + apiserver.InstallSupport(m.MuxHelper, m.RootWebService, c.EnableProfiling, healthzChecks...) + // Install root web services m.HandlerContainer.Add(m.RootWebService) diff --git a/pkg/master/tunneler.go b/pkg/master/tunneler.go index 2cef89d2093..9cb04b34c20 100644 --- a/pkg/master/tunneler.go +++ b/pkg/master/tunneler.go @@ -18,10 +18,9 @@ package master import ( "io/ioutil" - "math/rand" "net" + "net/url" "os" - "sync" "sync/atomic" "time" @@ -43,12 +42,12 @@ type Tunneler interface { } type SSHTunneler struct { - SSHUser string - SSHKeyfile string - InstallSSHKey InstallSSHKey + SSHUser string + SSHKeyfile string + InstallSSHKey InstallSSHKey + HealthCheckURL *url.URL tunnels *util.SSHTunnelList - tunnelsLock sync.Mutex lastSync int64 // Seconds since Epoch lastSyncMetric prometheus.GaugeFunc clock util.Clock @@ -57,13 +56,13 @@ type SSHTunneler struct { stopChan chan struct{} } -func NewSSHTunneler(sshUser string, sshKeyfile string, installSSHKey InstallSSHKey) Tunneler { +func NewSSHTunneler(sshUser, sshKeyfile string, healthCheckURL *url.URL, installSSHKey InstallSSHKey) Tunneler { return &SSHTunneler{ - SSHUser: sshUser, - SSHKeyfile: sshKeyfile, - InstallSSHKey: installSSHKey, - - clock: util.RealClock{}, + SSHUser: sshUser, + SSHKeyfile: sshKeyfile, + InstallSSHKey: installSSHKey, + HealthCheckURL: healthCheckURL, + clock: util.RealClock{}, } } @@ -93,14 +92,17 @@ func (c *SSHTunneler) Run(getAddresses AddressFunc) { glog.Errorf("Error detecting if key exists: %v", err) } else if !exists { glog.Infof("Key doesn't exist, attempting to create") - err := c.generateSSHKey(c.SSHUser, c.SSHKeyfile, publicKeyFile) - if err != nil { + if err := generateSSHKey(c.SSHKeyfile, publicKeyFile); err != nil { glog.Errorf("Failed to create key pair: %v", err) } } - c.tunnels = &util.SSHTunnelList{} - c.setupSecureProxy(c.SSHUser, c.SSHKeyfile, publicKeyFile) + + c.tunnels = util.NewSSHTunnelList(c.SSHUser, c.SSHKeyfile, c.HealthCheckURL, c.stopChan) + // Sync loop to ensure that the SSH key has been installed. + c.installSSHKeySyncLoop(c.SSHUser, publicKeyFile) + // Sync tunnelList w/ nodes. c.lastSync = c.clock.Now().Unix() + c.nodesSyncLoop() } // Stop gracefully shuts down the tunneler @@ -112,23 +114,7 @@ func (c *SSHTunneler) Stop() { } func (c *SSHTunneler) Dial(net, addr string) (net.Conn, error) { - // Only lock while picking a tunnel. - tunnel, err := func() (util.SSHTunnelEntry, error) { - c.tunnelsLock.Lock() - defer c.tunnelsLock.Unlock() - return c.tunnels.PickRandomTunnel() - }() - if err != nil { - return nil, err - } - - start := time.Now() - id := rand.Int63() // So you can match begins/ends in the log. - glog.V(3).Infof("[%x: %v] Dialing...", id, tunnel.Address) - defer func() { - glog.V(3).Infof("[%x: %v] Dialed in %v.", id, tunnel.Address, time.Now().Sub(start)) - }() - return tunnel.Tunnel.Dial(net, addr) + return c.tunnels.Dial(net, addr) } func (c *SSHTunneler) SecondsSinceSync() int64 { @@ -137,61 +123,7 @@ func (c *SSHTunneler) SecondsSinceSync() int64 { return now - then } -func (c *SSHTunneler) needToReplaceTunnels(addrs []string) bool { - c.tunnelsLock.Lock() - defer c.tunnelsLock.Unlock() - if c.tunnels == nil || c.tunnels.Len() != len(addrs) { - return true - } - // TODO (cjcullen): This doesn't need to be n^2 - for ix := range addrs { - if !c.tunnels.Has(addrs[ix]) { - return true - } - } - return false -} - -func (c *SSHTunneler) replaceTunnels(user, keyfile string, newAddrs []string) error { - glog.Infof("replacing tunnels. New addrs: %v", newAddrs) - tunnels := util.MakeSSHTunnels(user, keyfile, newAddrs) - if err := tunnels.Open(); err != nil { - return err - } - c.tunnelsLock.Lock() - defer c.tunnelsLock.Unlock() - if c.tunnels != nil { - c.tunnels.Close() - } - c.tunnels = tunnels - atomic.StoreInt64(&c.lastSync, c.clock.Now().Unix()) - return nil -} - -func (c *SSHTunneler) loadTunnels(user, keyfile string) error { - addrs, err := c.getAddresses() - if err != nil { - return err - } - if !c.needToReplaceTunnels(addrs) { - return nil - } - // TODO: This is going to unnecessarily close connections to unchanged nodes. - // See comment about using Watch above. - glog.Info("found different nodes. Need to replace tunnels") - return c.replaceTunnels(user, keyfile, addrs) -} - -func (c *SSHTunneler) refreshTunnels(user, keyfile string) error { - addrs, err := c.getAddresses() - if err != nil { - return err - } - return c.replaceTunnels(user, keyfile, addrs) -} - -func (c *SSHTunneler) setupSecureProxy(user, privateKeyfile, publicKeyfile string) { - // Sync loop to ensure that the SSH key has been installed. +func (c *SSHTunneler) installSSHKeySyncLoop(user, publicKeyfile string) { go util.Until(func() { if c.InstallSSHKey == nil { glog.Error("Won't attempt to install ssh key: InstallSSHKey function is nil") @@ -211,30 +143,24 @@ func (c *SSHTunneler) setupSecureProxy(user, privateKeyfile, publicKeyfile strin glog.Errorf("Failed to install ssh key: %v", err) } }, 5*time.Minute, c.stopChan) - // Sync loop for tunnels - // TODO: switch this to watch. - go util.Until(func() { - if err := c.loadTunnels(user, privateKeyfile); err != nil { - glog.Errorf("Failed to load SSH Tunnels: %v", err) - } - if c.tunnels != nil && c.tunnels.Len() != 0 { - // Sleep for 10 seconds if we have some tunnels. - // TODO (cjcullen): tunnels can lag behind actually existing nodes. - time.Sleep(9 * time.Second) - } - }, 1*time.Second, c.stopChan) - // Refresh loop for tunnels - // TODO: could make this more controller-ish - go util.Until(func() { - time.Sleep(5 * time.Minute) - if err := c.refreshTunnels(user, privateKeyfile); err != nil { - glog.Errorf("Failed to refresh SSH Tunnels: %v", err) - } - }, 0*time.Second, c.stopChan) } -func (c *SSHTunneler) generateSSHKey(user, privateKeyfile, publicKeyfile string) error { - // TODO: user is not used. Consider removing it as an input to the function. +// nodesSyncLoop lists nodes ever 15 seconds, calling Update() on the TunnelList +// each time (Update() is a noop if no changes are necessary). +func (c *SSHTunneler) nodesSyncLoop() { + // TODO (cjcullen) make this watch. + go util.Until(func() { + addrs, err := c.getAddresses() + glog.Infof("Calling update w/ addrs: %v", addrs) + if err != nil { + glog.Errorf("Failed to getAddresses: %v", err) + } + c.tunnels.Update(addrs) + atomic.StoreInt64(&c.lastSync, c.clock.Now().Unix()) + }, 15*time.Second, c.stopChan) +} + +func generateSSHKey(privateKeyfile, publicKeyfile string) error { private, public, err := util.GenerateKey(2048) if err != nil { return err diff --git a/pkg/master/tunneler_test.go b/pkg/master/tunneler_test.go index 24822f2e06e..b08a1df343b 100644 --- a/pkg/master/tunneler_test.go +++ b/pkg/master/tunneler_test.go @@ -66,19 +66,6 @@ func TestSecondsSinceSync(t *testing.T) { assert.Equal(int64(-2678400), tunneler.SecondsSinceSync()) } -// TestRefreshTunnels verifies that the function errors when no addresses -// are associated with nodes -func TestRefreshTunnels(t *testing.T) { - tunneler := &SSHTunneler{} - tunneler.getAddresses = func() ([]string, error) { return []string{}, nil } - assert := assert.New(t) - - // Fail case (no addresses associated with nodes) - assert.Error(tunneler.refreshTunnels("test", "/somepath/undefined")) - - // TODO: pass case without needing actual connections? -} - // TestIsTunnelSyncHealthy verifies that the 600 second lag test // is honored. func TestIsTunnelSyncHealthy(t *testing.T) { @@ -108,7 +95,6 @@ func generateTempFilePath(prefix string) string { // TestGenerateSSHKey verifies that SSH key generation does indeed // generate keys even with keys already exist. func TestGenerateSSHKey(t *testing.T) { - tunneler := &SSHTunneler{} assert := assert.New(t) privateKey := generateTempFilePath("private") @@ -119,17 +105,17 @@ func TestGenerateSSHKey(t *testing.T) { os.Remove(publicKey) // Pass case: Sunny day case - err := tunneler.generateSSHKey("unused", privateKey, publicKey) + err := generateSSHKey(privateKey, publicKey) assert.NoError(err, "generateSSHKey should not have retuend an error: %s", err) // Pass case: PrivateKey exists test case os.Remove(publicKey) - err = tunneler.generateSSHKey("unused", privateKey, publicKey) + err = generateSSHKey(privateKey, publicKey) assert.NoError(err, "generateSSHKey should not have retuend an error: %s", err) // Pass case: PublicKey exists test case os.Remove(privateKey) - err = tunneler.generateSSHKey("unused", privateKey, publicKey) + err = generateSSHKey(privateKey, publicKey) assert.NoError(err, "generateSSHKey should not have retuend an error: %s", err) // Make sure we have no test keys laying around diff --git a/pkg/util/ssh.go b/pkg/util/ssh.go index 64ced17b674..04a06afb3e7 100644 --- a/pkg/util/ssh.go +++ b/pkg/util/ssh.go @@ -20,6 +20,7 @@ import ( "bytes" "crypto/rand" "crypto/rsa" + "crypto/tls" "crypto/x509" "encoding/pem" "errors" @@ -28,12 +29,17 @@ import ( "io/ioutil" mathrand "math/rand" "net" + "net/http" + "net/url" "os" + "sync" "time" "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus" "golang.org/x/crypto/ssh" + + utilnet "k8s.io/kubernetes/pkg/util/net" "k8s.io/kubernetes/pkg/util/runtime" ) @@ -108,9 +114,8 @@ func (s *SSHTunnel) Open() error { tunnelOpenCounter.Inc() if err != nil { tunnelOpenFailCounter.Inc() - return err } - return nil + return err } func (s *SSHTunnel) Dial(network, address string) (net.Conn, error) { @@ -240,95 +245,193 @@ func ParsePublicKeyFromFile(keyFile string) (*rsa.PublicKey, error) { return rsaKey, nil } -// Should be thread safe. -type SSHTunnelEntry struct { +type tunnel interface { + Open() error + Close() error + Dial(network, address string) (net.Conn, error) +} + +type sshTunnelEntry struct { Address string - Tunnel *SSHTunnel + Tunnel tunnel +} + +type sshTunnelCreator interface { + NewSSHTunnel(user, keyFile, healthCheckURL string) (tunnel, error) +} + +type realTunnelCreator struct{} + +func (*realTunnelCreator) NewSSHTunnel(user, keyFile, healthCheckURL string) (tunnel, error) { + return NewSSHTunnel(user, keyFile, healthCheckURL) } -// Not thread safe! type SSHTunnelList struct { - entries []SSHTunnelEntry + entries []sshTunnelEntry + adding map[string]bool + tunnelCreator sshTunnelCreator + tunnelsLock sync.Mutex + + user string + keyfile string + healthCheckURL *url.URL } -func MakeSSHTunnels(user, keyfile string, addresses []string) *SSHTunnelList { - tunnels := []SSHTunnelEntry{} - for ix := range addresses { - addr := addresses[ix] - tunnel, err := NewSSHTunnel(user, keyfile, addr) - if err != nil { - glog.Errorf("Failed to create tunnel for %q: %v", addr, err) - continue - } - tunnels = append(tunnels, SSHTunnelEntry{addr, tunnel}) +func NewSSHTunnelList(user, keyfile string, healthCheckURL *url.URL, stopChan chan struct{}) *SSHTunnelList { + l := &SSHTunnelList{ + adding: make(map[string]bool), + tunnelCreator: &realTunnelCreator{}, + user: user, + keyfile: keyfile, + healthCheckURL: healthCheckURL, } - return &SSHTunnelList{tunnels} + healthCheckPoll := 1 * time.Minute + go Until(func() { + l.tunnelsLock.Lock() + defer l.tunnelsLock.Unlock() + // Healthcheck each tunnel every minute + numTunnels := len(l.entries) + for i, entry := range l.entries { + // Stagger healthchecks evenly across duration of healthCheckPoll. + delay := healthCheckPoll * time.Duration(i) / time.Duration(numTunnels) + l.delayedHealthCheck(entry, delay) + } + }, healthCheckPoll, stopChan) + return l } -// Open attempts to open all tunnels in the list, and removes any tunnels that -// failed to open. -func (l *SSHTunnelList) Open() error { - var openTunnels []SSHTunnelEntry - for ix := range l.entries { - if err := l.entries[ix].Tunnel.Open(); err != nil { - glog.Errorf("Failed to open tunnel %v: %v", l.entries[ix], err) - } else { - openTunnels = append(openTunnels, l.entries[ix]) +func (l *SSHTunnelList) delayedHealthCheck(e sshTunnelEntry, delay time.Duration) { + go func() { + defer runtime.HandleCrash() + time.Sleep(delay) + if err := l.healthCheck(e); err != nil { + glog.Errorf("Healthcheck failed for tunnel to %q: %v", e.Address, err) + glog.Infof("Attempting once to re-establish tunnel to %q", e.Address) + l.removeAndReAdd(e) + } + }() +} + +func (l *SSHTunnelList) healthCheck(e sshTunnelEntry) error { + // GET the healthcheck path using the provided tunnel's dial function. + transport := utilnet.SetTransportDefaults(&http.Transport{ + Dial: e.Tunnel.Dial, + // TODO(cjcullen): Plumb real TLS options through. + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }) + client := &http.Client{Transport: transport} + _, err := client.Get(l.healthCheckURL.String()) + return err +} + +func (l *SSHTunnelList) removeAndReAdd(e sshTunnelEntry) { + // Find the entry to replace. + l.tunnelsLock.Lock() + defer l.tunnelsLock.Unlock() + for i, entry := range l.entries { + if entry.Tunnel == e.Tunnel { + l.entries = append(l.entries[:i], l.entries[i+1:]...) + l.adding[e.Address] = true + go l.createAndAddTunnel(e.Address) + return } } - l.entries = openTunnels +} + +func (l *SSHTunnelList) Dial(net, addr string) (net.Conn, error) { + start := time.Now() + id := mathrand.Int63() // So you can match begins/ends in the log. + glog.Infof("[%x: %v] Dialing...", id, addr) + defer func() { + glog.Infof("[%x: %v] Dialed in %v.", id, addr, time.Now().Sub(start)) + }() + tunnel, err := l.pickRandomTunnel() + if err != nil { + return nil, err + } + return tunnel.Dial(net, addr) +} + +func (l *SSHTunnelList) pickRandomTunnel() (tunnel, error) { + l.tunnelsLock.Lock() + defer l.tunnelsLock.Unlock() if len(l.entries) == 0 { - return errors.New("Failed to open any tunnels.") + return nil, fmt.Errorf("No SSH tunnels currently open. Were the targets able to accept an ssh-key for user %q?", l.user) } - return nil + n := mathrand.Intn(len(l.entries)) + return l.entries[n].Tunnel, nil } -// Close asynchronously closes all tunnels in the list after waiting for 1 -// minute. Tunnels will still be open upon this function's return, but should -// no longer be used. -func (l *SSHTunnelList) Close() { - for ix := range l.entries { - entry := l.entries[ix] - go func() { - defer runtime.HandleCrash() - time.Sleep(1 * time.Minute) - if err := entry.Tunnel.Close(); err != nil { - glog.Errorf("Failed to close tunnel %v: %v", entry, err) +// Update reconciles the list's entries with the specified addresses. Existing +// tunnels that are not in addresses are removed from entries and closed in a +// background goroutine. New tunnels specified in addresses are opened in a +// background goroutine and then added to entries. +func (l *SSHTunnelList) Update(addrs []string) { + haveAddrsMap := make(map[string]bool) + wantAddrsMap := make(map[string]bool) + func() { + l.tunnelsLock.Lock() + defer l.tunnelsLock.Unlock() + // Build a map of what we currently have. + for i := range l.entries { + haveAddrsMap[l.entries[i].Address] = true + } + // Determine any necessary additions. + for i := range addrs { + // Add tunnel if it is not in l.entries or l.adding + if _, ok := haveAddrsMap[addrs[i]]; !ok { + if _, ok := l.adding[addrs[i]]; !ok { + l.adding[addrs[i]] = true + addr := addrs[i] + go func() { + defer runtime.HandleCrash() + // Actually adding tunnel to list will block until lock + // is released after deletions. + l.createAndAddTunnel(addr) + }() + } } - }() - } -} - -/* this will make sense if we move the lock into SSHTunnelList. -func (l *SSHTunnelList) Dial(network, addr string) (net.Conn, error) { - if len(l.entries) == 0 { - return nil, fmt.Errorf("empty tunnel list.") - } - n := mathrand.Intn(len(l.entries)) - return l.entries[n].Tunnel.Dial(network, addr) -} -*/ - -// Returns a random tunnel, xor an error if there are none. -func (l *SSHTunnelList) PickRandomTunnel() (SSHTunnelEntry, error) { - if len(l.entries) == 0 { - return SSHTunnelEntry{}, fmt.Errorf("empty tunnel list.") - } - n := mathrand.Intn(len(l.entries)) - return l.entries[n], nil -} - -func (l *SSHTunnelList) Has(addr string) bool { - for ix := range l.entries { - if l.entries[ix].Address == addr { - return true + wantAddrsMap[addrs[i]] = true } - } - return false + // Determine any necessary deletions. + var newEntries []sshTunnelEntry + for i := range l.entries { + if _, ok := wantAddrsMap[l.entries[i].Address]; !ok { + tunnelEntry := l.entries[i] + glog.Infof("Removing tunnel to deleted node at %q", tunnelEntry.Address) + go func() { + defer runtime.HandleCrash() + if err := tunnelEntry.Tunnel.Close(); err != nil { + glog.Errorf("Failed to close tunnel to %q: %v", tunnelEntry.Address, err) + } + }() + } else { + newEntries = append(newEntries, l.entries[i]) + } + } + l.entries = newEntries + }() } -func (l *SSHTunnelList) Len() int { - return len(l.entries) +func (l *SSHTunnelList) createAndAddTunnel(addr string) { + glog.Infof("Trying to add tunnel to %q", addr) + tunnel, err := l.tunnelCreator.NewSSHTunnel(l.user, l.keyfile, addr) + if err != nil { + glog.Errorf("Failed to create tunnel for %q: %v", addr, err) + return + } + if err := tunnel.Open(); err != nil { + glog.Errorf("Failed to open tunnel to %q: %v", addr, err) + l.tunnelsLock.Lock() + delete(l.adding, addr) + l.tunnelsLock.Unlock() + return + } + l.tunnelsLock.Lock() + l.entries = append(l.entries, sshTunnelEntry{addr, tunnel}) + delete(l.adding, addr) + l.tunnelsLock.Unlock() + glog.Infof("Successfully added tunnel for %q", addr) } func EncodePrivateKey(private *rsa.PrivateKey) []byte { diff --git a/pkg/util/ssh_test.go b/pkg/util/ssh_test.go index 74dc8a82b6d..072cfff6f1a 100644 --- a/pkg/util/ssh_test.go +++ b/pkg/util/ssh_test.go @@ -24,6 +24,9 @@ import ( "reflect" "strings" "testing" + "time" + + "k8s.io/kubernetes/pkg/util/wait" "github.com/golang/glog" "golang.org/x/crypto/ssh" @@ -163,6 +166,88 @@ func TestSSHTunnel(t *testing.T) { } } +type fakeTunnel struct{} + +func (*fakeTunnel) Open() error { + return nil +} + +func (*fakeTunnel) Close() error { + return nil +} + +func (*fakeTunnel) Dial(network, address string) (net.Conn, error) { + return nil, nil +} + +type fakeTunnelCreator struct{} + +func (*fakeTunnelCreator) NewSSHTunnel(string, string, string) (tunnel, error) { + return &fakeTunnel{}, nil +} + +func TestSSHTunnelListUpdate(t *testing.T) { + // Start with an empty tunnel list. + l := &SSHTunnelList{ + adding: make(map[string]bool), + tunnelCreator: &fakeTunnelCreator{}, + } + + // Start with 2 tunnels. + addressStrings := []string{"1.2.3.4", "5.6.7.8"} + l.Update(addressStrings) + checkTunnelsCorrect(t, l, addressStrings) + + // Add another tunnel. + addressStrings = append(addressStrings, "9.10.11.12") + l.Update(addressStrings) + checkTunnelsCorrect(t, l, addressStrings) + + // Go down to a single tunnel. + addressStrings = []string{"1.2.3.4"} + l.Update(addressStrings) + checkTunnelsCorrect(t, l, addressStrings) + + // Replace w/ all new tunnels. + addressStrings = []string{"21.22.23.24", "25.26.27.28"} + l.Update(addressStrings) + checkTunnelsCorrect(t, l, addressStrings) + + // Call update with the same tunnels. + l.Update(addressStrings) + checkTunnelsCorrect(t, l, addressStrings) +} + +func checkTunnelsCorrect(t *testing.T, tunnelList *SSHTunnelList, addresses []string) { + if err := wait.Poll(100*time.Millisecond, 2*time.Second, func() (bool, error) { + return hasCorrectTunnels(tunnelList, addresses), nil + }); err != nil { + t.Errorf("Error waiting for tunnels to reach expected state: %v. Expected %v, had %v", err, addresses, tunnelList) + } +} + +func hasCorrectTunnels(tunnelList *SSHTunnelList, addresses []string) bool { + tunnelList.tunnelsLock.Lock() + defer tunnelList.tunnelsLock.Unlock() + wantMap := make(map[string]bool) + for _, addr := range addresses { + wantMap[addr] = true + } + haveMap := make(map[string]bool) + for _, entry := range tunnelList.entries { + if wantMap[entry.Address] == false { + return false + } + haveMap[entry.Address] = true + } + for _, addr := range addresses { + if haveMap[addr] == false { + return false + } + } + return true +} + type mockSSHDialer struct { network string addr string