Merge pull request #19314 from cjcullen/tunnels

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2016-02-02 19:46:06 -08:00
commit 8d8de2efb4
9 changed files with 331 additions and 221 deletions

View File

@ -23,6 +23,7 @@ import (
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"net" "net"
"net/url"
"os" "os"
"strconv" "strconv"
"strings" "strings"
@ -211,9 +212,18 @@ func Run(s *options.APIServer) error {
installSSH = instances.AddSSHKeyToAllInstances installSSH = instances.AddSSHKeyToAllInstances
} }
} }
if s.KubeletConfig.Port == 0 {
glog.Fatalf("Must enable kubelet port if proxy ssh-tunneling is specified.")
}
// Set up the tunneler // Set up the tunneler
tunneler = master.NewSSHTunneler(s.SSHUser, s.SSHKeyfile, installSSH) // TODO(cjcullen): If we want this to handle per-kubelet ports or other
// kubelet listen-addresses, we need to plumb through options.
healthCheckPath := &url.URL{
Scheme: "https",
Host: net.JoinHostPort("127.0.0.1", strconv.FormatUint(uint64(s.KubeletConfig.Port), 10)),
Path: "healthz",
}
tunneler = master.NewSSHTunneler(s.SSHUser, s.SSHKeyfile, healthCheckPath, installSSH)
// Use the tunneler's dialer to connect to the kubelet // Use the tunneler's dialer to connect to the kubelet
s.KubeletConfig.Dial = tunneler.Dial s.KubeletConfig.Dial = tunneler.Dial

View File

@ -114,7 +114,7 @@ func NewKubeletServer() *KubeletServer {
RktStage1Image: "", RktStage1Image: "",
RootDirectory: defaultRootDir, RootDirectory: defaultRootDir,
SerializeImagePulls: true, SerializeImagePulls: true,
StreamingConnectionIdleTimeout: unversioned.Duration{5 * time.Minute}, StreamingConnectionIdleTimeout: unversioned.Duration{4 * time.Hour},
SyncFrequency: unversioned.Duration{1 * time.Minute}, SyncFrequency: unversioned.Duration{1 * time.Minute},
SystemContainer: "", SystemContainer: "",
ReconcileCIDR: true, ReconcileCIDR: true,
@ -174,7 +174,7 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.ClusterDomain, "cluster-domain", s.ClusterDomain, "Domain for this cluster. If set, kubelet will configure all containers to search this domain in addition to the host's search domains") fs.StringVar(&s.ClusterDomain, "cluster-domain", s.ClusterDomain, "Domain for this cluster. If set, kubelet will configure all containers to search this domain in addition to the host's search domains")
fs.StringVar(&s.MasterServiceNamespace, "master-service-namespace", s.MasterServiceNamespace, "The namespace from which the kubernetes master services should be injected into pods") fs.StringVar(&s.MasterServiceNamespace, "master-service-namespace", s.MasterServiceNamespace, "The namespace from which the kubernetes master services should be injected into pods")
fs.StringVar(&s.ClusterDNS, "cluster-dns", s.ClusterDNS, "IP address for a cluster DNS server. If set, kubelet will configure all containers to use this for DNS resolution in addition to the host's DNS servers") fs.StringVar(&s.ClusterDNS, "cluster-dns", s.ClusterDNS, "IP address for a cluster DNS server. If set, kubelet will configure all containers to use this for DNS resolution in addition to the host's DNS servers")
fs.DurationVar(&s.StreamingConnectionIdleTimeout.Duration, "streaming-connection-idle-timeout", s.StreamingConnectionIdleTimeout.Duration, "Maximum time a streaming connection can be idle before the connection is automatically closed. Example: '5m'") fs.DurationVar(&s.StreamingConnectionIdleTimeout.Duration, "streaming-connection-idle-timeout", s.StreamingConnectionIdleTimeout.Duration, "Maximum time a streaming connection can be idle before the connection is automatically closed. 0 indicates no timeout. Example: '5m'")
fs.DurationVar(&s.NodeStatusUpdateFrequency.Duration, "node-status-update-frequency", s.NodeStatusUpdateFrequency.Duration, "Specifies how often kubelet posts node status to master. Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod in nodecontroller. Default: 10s") fs.DurationVar(&s.NodeStatusUpdateFrequency.Duration, "node-status-update-frequency", s.NodeStatusUpdateFrequency.Duration, "Specifies how often kubelet posts node status to master. Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod in nodecontroller. Default: 10s")
bindableNodeLabels := util.ConfigurationMap(s.NodeLabels) bindableNodeLabels := util.ConfigurationMap(s.NodeLabels)
fs.Var(&bindableNodeLabels, "node-labels", "<Warning: Alpha feature> Labels to add when registering the node in the cluster. Labels must are key=value pairs seperated by ','.") fs.Var(&bindableNodeLabels, "node-labels", "<Warning: Alpha feature> Labels to add when registering the node in the cluster. Labels must are key=value pairs seperated by ','.")

View File

@ -596,10 +596,10 @@ func RunKubelet(kcfg *KubeletConfig) error {
if _, err := k.RunOnce(podCfg.Updates()); err != nil { if _, err := k.RunOnce(podCfg.Updates()); err != nil {
return fmt.Errorf("runonce failed: %v", err) return fmt.Errorf("runonce failed: %v", err)
} }
glog.Infof("Started kubelet as runonce") glog.Info("Started kubelet as runonce")
} else { } else {
startKubelet(k, podCfg, kcfg) startKubelet(k, podCfg, kcfg)
glog.Infof("Started kubelet") glog.Info("Started kubelet")
} }
return nil return nil
} }

View File

@ -137,7 +137,7 @@ kubelet
--root-dir="/var/lib/kubelet": Directory path for managing kubelet files (volume mounts,etc). --root-dir="/var/lib/kubelet": Directory path for managing kubelet files (volume mounts,etc).
--runonce[=false]: If true, exit after spawning pods from local manifests or remote urls. Exclusive with --api-servers, and --enable-server --runonce[=false]: If true, exit after spawning pods from local manifests or remote urls. Exclusive with --api-servers, and --enable-server
--serialize-image-pulls[=true]: Pull images one at a time. We recommend *not* changing the default value on nodes that run docker daemon with version < 1.9 or an Aufs storage backend. Issue #10959 has more details. [default=true] --serialize-image-pulls[=true]: Pull images one at a time. We recommend *not* changing the default value on nodes that run docker daemon with version < 1.9 or an Aufs storage backend. Issue #10959 has more details. [default=true]
--streaming-connection-idle-timeout=5m0s: Maximum time a streaming connection can be idle before the connection is automatically closed. Example: '5m' --streaming-connection-idle-timeout=4h0m0s: Maximum time a streaming connection can be idle before the connection is automatically closed. 0 indicates no timeout. Example: '5m'
--sync-frequency=1m0s: Max period between synchronizing running containers and config --sync-frequency=1m0s: Max period between synchronizing running containers and config
--system-container="": Optional resource-only container in which to place all non-kernel processes that are not already in a container. Empty for no container. Rolling back the flag requires a reboot. (Default: ""). --system-container="": Optional resource-only container in which to place all non-kernel processes that are not already in a container. Empty for no container. Rolling back the flag requires a reboot. (Default: "").
--system-reserved=: A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=150G) pairs that describe resources reserved for non-kubernetes components. Currently only cpu and memory are supported. See http://releases.k8s.io/HEAD/docs/user-guide/compute-resources.html for more detail. [default=none] --system-reserved=: A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=150G) pairs that describe resources reserved for non-kubernetes components. Currently only cpu and memory are supported. See http://releases.k8s.io/HEAD/docs/user-guide/compute-resources.html for more detail. [default=none]
@ -146,7 +146,7 @@ kubelet
--volume-plugin-dir="/usr/libexec/kubernetes/kubelet-plugins/volume/exec/": <Warning: Alpha feature> The full path of the directory in which to search for additional third party volume plugins --volume-plugin-dir="/usr/libexec/kubernetes/kubelet-plugins/volume/exec/": <Warning: Alpha feature> The full path of the directory in which to search for additional third party volume plugins
``` ```
###### Auto generated by spf13/cobra on 21-Jan-2016 ###### Auto generated by spf13/cobra on 29-Jan-2016
<!-- BEGIN MUNGE: GENERATED_ANALYTICS --> <!-- BEGIN MUNGE: GENERATED_ANALYTICS -->

View File

@ -161,20 +161,6 @@ func New(c *Config) *Master {
func (m *Master) InstallAPIs(c *Config) { func (m *Master) InstallAPIs(c *Config) {
apiGroupsInfo := []genericapiserver.APIGroupInfo{} apiGroupsInfo := []genericapiserver.APIGroupInfo{}
// Run the tunnel.
healthzChecks := []healthz.HealthzChecker{}
if m.tunneler != nil {
m.tunneler.Run(m.getNodeAddresses)
healthzChecks = append(healthzChecks, healthz.NamedCheck("SSH Tunnel Check", m.IsTunnelSyncHealthy))
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "apiserver_proxy_tunnel_sync_latency_secs",
Help: "The time since the last successful synchronization of the SSH tunnels for proxy requests.",
}, func() float64 { return float64(m.tunneler.SecondsSinceSync()) })
}
// TODO(nikhiljindal): Refactor generic parts of support services (like /versions) to genericapiserver.
apiserver.InstallSupport(m.MuxHelper, m.RootWebService, c.EnableProfiling, healthzChecks...)
// Install v1 unless disabled. // Install v1 unless disabled.
if !m.ApiGroupVersionOverrides["api/v1"].Disable { if !m.ApiGroupVersionOverrides["api/v1"].Disable {
// Install v1 API. // Install v1 API.
@ -192,6 +178,20 @@ func (m *Master) InstallAPIs(c *Config) {
apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo) apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo)
} }
// Run the tunneler.
healthzChecks := []healthz.HealthzChecker{}
if m.tunneler != nil {
m.tunneler.Run(m.getNodeAddresses)
healthzChecks = append(healthzChecks, healthz.NamedCheck("SSH Tunnel Check", m.IsTunnelSyncHealthy))
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "apiserver_proxy_tunnel_sync_latency_secs",
Help: "The time since the last successful synchronization of the SSH tunnels for proxy requests.",
}, func() float64 { return float64(m.tunneler.SecondsSinceSync()) })
}
// TODO(nikhiljindal): Refactor generic parts of support services (like /versions) to genericapiserver.
apiserver.InstallSupport(m.MuxHelper, m.RootWebService, c.EnableProfiling, healthzChecks...)
// Install root web services // Install root web services
m.HandlerContainer.Add(m.RootWebService) m.HandlerContainer.Add(m.RootWebService)

View File

@ -18,10 +18,9 @@ package master
import ( import (
"io/ioutil" "io/ioutil"
"math/rand"
"net" "net"
"net/url"
"os" "os"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -43,12 +42,12 @@ type Tunneler interface {
} }
type SSHTunneler struct { type SSHTunneler struct {
SSHUser string SSHUser string
SSHKeyfile string SSHKeyfile string
InstallSSHKey InstallSSHKey InstallSSHKey InstallSSHKey
HealthCheckURL *url.URL
tunnels *util.SSHTunnelList tunnels *util.SSHTunnelList
tunnelsLock sync.Mutex
lastSync int64 // Seconds since Epoch lastSync int64 // Seconds since Epoch
lastSyncMetric prometheus.GaugeFunc lastSyncMetric prometheus.GaugeFunc
clock util.Clock clock util.Clock
@ -57,13 +56,13 @@ type SSHTunneler struct {
stopChan chan struct{} stopChan chan struct{}
} }
func NewSSHTunneler(sshUser string, sshKeyfile string, installSSHKey InstallSSHKey) Tunneler { func NewSSHTunneler(sshUser, sshKeyfile string, healthCheckURL *url.URL, installSSHKey InstallSSHKey) Tunneler {
return &SSHTunneler{ return &SSHTunneler{
SSHUser: sshUser, SSHUser: sshUser,
SSHKeyfile: sshKeyfile, SSHKeyfile: sshKeyfile,
InstallSSHKey: installSSHKey, InstallSSHKey: installSSHKey,
HealthCheckURL: healthCheckURL,
clock: util.RealClock{}, clock: util.RealClock{},
} }
} }
@ -93,14 +92,17 @@ func (c *SSHTunneler) Run(getAddresses AddressFunc) {
glog.Errorf("Error detecting if key exists: %v", err) glog.Errorf("Error detecting if key exists: %v", err)
} else if !exists { } else if !exists {
glog.Infof("Key doesn't exist, attempting to create") glog.Infof("Key doesn't exist, attempting to create")
err := c.generateSSHKey(c.SSHUser, c.SSHKeyfile, publicKeyFile) if err := generateSSHKey(c.SSHKeyfile, publicKeyFile); err != nil {
if err != nil {
glog.Errorf("Failed to create key pair: %v", err) glog.Errorf("Failed to create key pair: %v", err)
} }
} }
c.tunnels = &util.SSHTunnelList{}
c.setupSecureProxy(c.SSHUser, c.SSHKeyfile, publicKeyFile) c.tunnels = util.NewSSHTunnelList(c.SSHUser, c.SSHKeyfile, c.HealthCheckURL, c.stopChan)
// Sync loop to ensure that the SSH key has been installed.
c.installSSHKeySyncLoop(c.SSHUser, publicKeyFile)
// Sync tunnelList w/ nodes.
c.lastSync = c.clock.Now().Unix() c.lastSync = c.clock.Now().Unix()
c.nodesSyncLoop()
} }
// Stop gracefully shuts down the tunneler // Stop gracefully shuts down the tunneler
@ -112,23 +114,7 @@ func (c *SSHTunneler) Stop() {
} }
func (c *SSHTunneler) Dial(net, addr string) (net.Conn, error) { func (c *SSHTunneler) Dial(net, addr string) (net.Conn, error) {
// Only lock while picking a tunnel. return c.tunnels.Dial(net, addr)
tunnel, err := func() (util.SSHTunnelEntry, error) {
c.tunnelsLock.Lock()
defer c.tunnelsLock.Unlock()
return c.tunnels.PickRandomTunnel()
}()
if err != nil {
return nil, err
}
start := time.Now()
id := rand.Int63() // So you can match begins/ends in the log.
glog.V(3).Infof("[%x: %v] Dialing...", id, tunnel.Address)
defer func() {
glog.V(3).Infof("[%x: %v] Dialed in %v.", id, tunnel.Address, time.Now().Sub(start))
}()
return tunnel.Tunnel.Dial(net, addr)
} }
func (c *SSHTunneler) SecondsSinceSync() int64 { func (c *SSHTunneler) SecondsSinceSync() int64 {
@ -137,61 +123,7 @@ func (c *SSHTunneler) SecondsSinceSync() int64 {
return now - then return now - then
} }
func (c *SSHTunneler) needToReplaceTunnels(addrs []string) bool { func (c *SSHTunneler) installSSHKeySyncLoop(user, publicKeyfile string) {
c.tunnelsLock.Lock()
defer c.tunnelsLock.Unlock()
if c.tunnels == nil || c.tunnels.Len() != len(addrs) {
return true
}
// TODO (cjcullen): This doesn't need to be n^2
for ix := range addrs {
if !c.tunnels.Has(addrs[ix]) {
return true
}
}
return false
}
func (c *SSHTunneler) replaceTunnels(user, keyfile string, newAddrs []string) error {
glog.Infof("replacing tunnels. New addrs: %v", newAddrs)
tunnels := util.MakeSSHTunnels(user, keyfile, newAddrs)
if err := tunnels.Open(); err != nil {
return err
}
c.tunnelsLock.Lock()
defer c.tunnelsLock.Unlock()
if c.tunnels != nil {
c.tunnels.Close()
}
c.tunnels = tunnels
atomic.StoreInt64(&c.lastSync, c.clock.Now().Unix())
return nil
}
func (c *SSHTunneler) loadTunnels(user, keyfile string) error {
addrs, err := c.getAddresses()
if err != nil {
return err
}
if !c.needToReplaceTunnels(addrs) {
return nil
}
// TODO: This is going to unnecessarily close connections to unchanged nodes.
// See comment about using Watch above.
glog.Info("found different nodes. Need to replace tunnels")
return c.replaceTunnels(user, keyfile, addrs)
}
func (c *SSHTunneler) refreshTunnels(user, keyfile string) error {
addrs, err := c.getAddresses()
if err != nil {
return err
}
return c.replaceTunnels(user, keyfile, addrs)
}
func (c *SSHTunneler) setupSecureProxy(user, privateKeyfile, publicKeyfile string) {
// Sync loop to ensure that the SSH key has been installed.
go util.Until(func() { go util.Until(func() {
if c.InstallSSHKey == nil { if c.InstallSSHKey == nil {
glog.Error("Won't attempt to install ssh key: InstallSSHKey function is nil") glog.Error("Won't attempt to install ssh key: InstallSSHKey function is nil")
@ -211,30 +143,24 @@ func (c *SSHTunneler) setupSecureProxy(user, privateKeyfile, publicKeyfile strin
glog.Errorf("Failed to install ssh key: %v", err) glog.Errorf("Failed to install ssh key: %v", err)
} }
}, 5*time.Minute, c.stopChan) }, 5*time.Minute, c.stopChan)
// Sync loop for tunnels
// TODO: switch this to watch.
go util.Until(func() {
if err := c.loadTunnels(user, privateKeyfile); err != nil {
glog.Errorf("Failed to load SSH Tunnels: %v", err)
}
if c.tunnels != nil && c.tunnels.Len() != 0 {
// Sleep for 10 seconds if we have some tunnels.
// TODO (cjcullen): tunnels can lag behind actually existing nodes.
time.Sleep(9 * time.Second)
}
}, 1*time.Second, c.stopChan)
// Refresh loop for tunnels
// TODO: could make this more controller-ish
go util.Until(func() {
time.Sleep(5 * time.Minute)
if err := c.refreshTunnels(user, privateKeyfile); err != nil {
glog.Errorf("Failed to refresh SSH Tunnels: %v", err)
}
}, 0*time.Second, c.stopChan)
} }
func (c *SSHTunneler) generateSSHKey(user, privateKeyfile, publicKeyfile string) error { // nodesSyncLoop lists nodes ever 15 seconds, calling Update() on the TunnelList
// TODO: user is not used. Consider removing it as an input to the function. // each time (Update() is a noop if no changes are necessary).
func (c *SSHTunneler) nodesSyncLoop() {
// TODO (cjcullen) make this watch.
go util.Until(func() {
addrs, err := c.getAddresses()
glog.Infof("Calling update w/ addrs: %v", addrs)
if err != nil {
glog.Errorf("Failed to getAddresses: %v", err)
}
c.tunnels.Update(addrs)
atomic.StoreInt64(&c.lastSync, c.clock.Now().Unix())
}, 15*time.Second, c.stopChan)
}
func generateSSHKey(privateKeyfile, publicKeyfile string) error {
private, public, err := util.GenerateKey(2048) private, public, err := util.GenerateKey(2048)
if err != nil { if err != nil {
return err return err

View File

@ -66,19 +66,6 @@ func TestSecondsSinceSync(t *testing.T) {
assert.Equal(int64(-2678400), tunneler.SecondsSinceSync()) assert.Equal(int64(-2678400), tunneler.SecondsSinceSync())
} }
// TestRefreshTunnels verifies that the function errors when no addresses
// are associated with nodes
func TestRefreshTunnels(t *testing.T) {
tunneler := &SSHTunneler{}
tunneler.getAddresses = func() ([]string, error) { return []string{}, nil }
assert := assert.New(t)
// Fail case (no addresses associated with nodes)
assert.Error(tunneler.refreshTunnels("test", "/somepath/undefined"))
// TODO: pass case without needing actual connections?
}
// TestIsTunnelSyncHealthy verifies that the 600 second lag test // TestIsTunnelSyncHealthy verifies that the 600 second lag test
// is honored. // is honored.
func TestIsTunnelSyncHealthy(t *testing.T) { func TestIsTunnelSyncHealthy(t *testing.T) {
@ -108,7 +95,6 @@ func generateTempFilePath(prefix string) string {
// TestGenerateSSHKey verifies that SSH key generation does indeed // TestGenerateSSHKey verifies that SSH key generation does indeed
// generate keys even with keys already exist. // generate keys even with keys already exist.
func TestGenerateSSHKey(t *testing.T) { func TestGenerateSSHKey(t *testing.T) {
tunneler := &SSHTunneler{}
assert := assert.New(t) assert := assert.New(t)
privateKey := generateTempFilePath("private") privateKey := generateTempFilePath("private")
@ -119,17 +105,17 @@ func TestGenerateSSHKey(t *testing.T) {
os.Remove(publicKey) os.Remove(publicKey)
// Pass case: Sunny day case // Pass case: Sunny day case
err := tunneler.generateSSHKey("unused", privateKey, publicKey) err := generateSSHKey(privateKey, publicKey)
assert.NoError(err, "generateSSHKey should not have retuend an error: %s", err) assert.NoError(err, "generateSSHKey should not have retuend an error: %s", err)
// Pass case: PrivateKey exists test case // Pass case: PrivateKey exists test case
os.Remove(publicKey) os.Remove(publicKey)
err = tunneler.generateSSHKey("unused", privateKey, publicKey) err = generateSSHKey(privateKey, publicKey)
assert.NoError(err, "generateSSHKey should not have retuend an error: %s", err) assert.NoError(err, "generateSSHKey should not have retuend an error: %s", err)
// Pass case: PublicKey exists test case // Pass case: PublicKey exists test case
os.Remove(privateKey) os.Remove(privateKey)
err = tunneler.generateSSHKey("unused", privateKey, publicKey) err = generateSSHKey(privateKey, publicKey)
assert.NoError(err, "generateSSHKey should not have retuend an error: %s", err) assert.NoError(err, "generateSSHKey should not have retuend an error: %s", err)
// Make sure we have no test keys laying around // Make sure we have no test keys laying around

View File

@ -20,6 +20,7 @@ import (
"bytes" "bytes"
"crypto/rand" "crypto/rand"
"crypto/rsa" "crypto/rsa"
"crypto/tls"
"crypto/x509" "crypto/x509"
"encoding/pem" "encoding/pem"
"errors" "errors"
@ -28,12 +29,17 @@ import (
"io/ioutil" "io/ioutil"
mathrand "math/rand" mathrand "math/rand"
"net" "net"
"net/http"
"net/url"
"os" "os"
"sync"
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"golang.org/x/crypto/ssh" "golang.org/x/crypto/ssh"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/runtime"
) )
@ -108,9 +114,8 @@ func (s *SSHTunnel) Open() error {
tunnelOpenCounter.Inc() tunnelOpenCounter.Inc()
if err != nil { if err != nil {
tunnelOpenFailCounter.Inc() tunnelOpenFailCounter.Inc()
return err
} }
return nil return err
} }
func (s *SSHTunnel) Dial(network, address string) (net.Conn, error) { func (s *SSHTunnel) Dial(network, address string) (net.Conn, error) {
@ -240,95 +245,193 @@ func ParsePublicKeyFromFile(keyFile string) (*rsa.PublicKey, error) {
return rsaKey, nil return rsaKey, nil
} }
// Should be thread safe. type tunnel interface {
type SSHTunnelEntry struct { Open() error
Close() error
Dial(network, address string) (net.Conn, error)
}
type sshTunnelEntry struct {
Address string Address string
Tunnel *SSHTunnel Tunnel tunnel
}
type sshTunnelCreator interface {
NewSSHTunnel(user, keyFile, healthCheckURL string) (tunnel, error)
}
type realTunnelCreator struct{}
func (*realTunnelCreator) NewSSHTunnel(user, keyFile, healthCheckURL string) (tunnel, error) {
return NewSSHTunnel(user, keyFile, healthCheckURL)
} }
// Not thread safe!
type SSHTunnelList struct { type SSHTunnelList struct {
entries []SSHTunnelEntry entries []sshTunnelEntry
adding map[string]bool
tunnelCreator sshTunnelCreator
tunnelsLock sync.Mutex
user string
keyfile string
healthCheckURL *url.URL
} }
func MakeSSHTunnels(user, keyfile string, addresses []string) *SSHTunnelList { func NewSSHTunnelList(user, keyfile string, healthCheckURL *url.URL, stopChan chan struct{}) *SSHTunnelList {
tunnels := []SSHTunnelEntry{} l := &SSHTunnelList{
for ix := range addresses { adding: make(map[string]bool),
addr := addresses[ix] tunnelCreator: &realTunnelCreator{},
tunnel, err := NewSSHTunnel(user, keyfile, addr) user: user,
if err != nil { keyfile: keyfile,
glog.Errorf("Failed to create tunnel for %q: %v", addr, err) healthCheckURL: healthCheckURL,
continue
}
tunnels = append(tunnels, SSHTunnelEntry{addr, tunnel})
} }
return &SSHTunnelList{tunnels} healthCheckPoll := 1 * time.Minute
go Until(func() {
l.tunnelsLock.Lock()
defer l.tunnelsLock.Unlock()
// Healthcheck each tunnel every minute
numTunnels := len(l.entries)
for i, entry := range l.entries {
// Stagger healthchecks evenly across duration of healthCheckPoll.
delay := healthCheckPoll * time.Duration(i) / time.Duration(numTunnels)
l.delayedHealthCheck(entry, delay)
}
}, healthCheckPoll, stopChan)
return l
} }
// Open attempts to open all tunnels in the list, and removes any tunnels that func (l *SSHTunnelList) delayedHealthCheck(e sshTunnelEntry, delay time.Duration) {
// failed to open. go func() {
func (l *SSHTunnelList) Open() error { defer runtime.HandleCrash()
var openTunnels []SSHTunnelEntry time.Sleep(delay)
for ix := range l.entries { if err := l.healthCheck(e); err != nil {
if err := l.entries[ix].Tunnel.Open(); err != nil { glog.Errorf("Healthcheck failed for tunnel to %q: %v", e.Address, err)
glog.Errorf("Failed to open tunnel %v: %v", l.entries[ix], err) glog.Infof("Attempting once to re-establish tunnel to %q", e.Address)
} else { l.removeAndReAdd(e)
openTunnels = append(openTunnels, l.entries[ix]) }
}()
}
func (l *SSHTunnelList) healthCheck(e sshTunnelEntry) error {
// GET the healthcheck path using the provided tunnel's dial function.
transport := utilnet.SetTransportDefaults(&http.Transport{
Dial: e.Tunnel.Dial,
// TODO(cjcullen): Plumb real TLS options through.
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
})
client := &http.Client{Transport: transport}
_, err := client.Get(l.healthCheckURL.String())
return err
}
func (l *SSHTunnelList) removeAndReAdd(e sshTunnelEntry) {
// Find the entry to replace.
l.tunnelsLock.Lock()
defer l.tunnelsLock.Unlock()
for i, entry := range l.entries {
if entry.Tunnel == e.Tunnel {
l.entries = append(l.entries[:i], l.entries[i+1:]...)
l.adding[e.Address] = true
go l.createAndAddTunnel(e.Address)
return
} }
} }
l.entries = openTunnels }
func (l *SSHTunnelList) Dial(net, addr string) (net.Conn, error) {
start := time.Now()
id := mathrand.Int63() // So you can match begins/ends in the log.
glog.Infof("[%x: %v] Dialing...", id, addr)
defer func() {
glog.Infof("[%x: %v] Dialed in %v.", id, addr, time.Now().Sub(start))
}()
tunnel, err := l.pickRandomTunnel()
if err != nil {
return nil, err
}
return tunnel.Dial(net, addr)
}
func (l *SSHTunnelList) pickRandomTunnel() (tunnel, error) {
l.tunnelsLock.Lock()
defer l.tunnelsLock.Unlock()
if len(l.entries) == 0 { if len(l.entries) == 0 {
return errors.New("Failed to open any tunnels.") return nil, fmt.Errorf("No SSH tunnels currently open. Were the targets able to accept an ssh-key for user %q?", l.user)
} }
return nil n := mathrand.Intn(len(l.entries))
return l.entries[n].Tunnel, nil
} }
// Close asynchronously closes all tunnels in the list after waiting for 1 // Update reconciles the list's entries with the specified addresses. Existing
// minute. Tunnels will still be open upon this function's return, but should // tunnels that are not in addresses are removed from entries and closed in a
// no longer be used. // background goroutine. New tunnels specified in addresses are opened in a
func (l *SSHTunnelList) Close() { // background goroutine and then added to entries.
for ix := range l.entries { func (l *SSHTunnelList) Update(addrs []string) {
entry := l.entries[ix] haveAddrsMap := make(map[string]bool)
go func() { wantAddrsMap := make(map[string]bool)
defer runtime.HandleCrash() func() {
time.Sleep(1 * time.Minute) l.tunnelsLock.Lock()
if err := entry.Tunnel.Close(); err != nil { defer l.tunnelsLock.Unlock()
glog.Errorf("Failed to close tunnel %v: %v", entry, err) // Build a map of what we currently have.
for i := range l.entries {
haveAddrsMap[l.entries[i].Address] = true
}
// Determine any necessary additions.
for i := range addrs {
// Add tunnel if it is not in l.entries or l.adding
if _, ok := haveAddrsMap[addrs[i]]; !ok {
if _, ok := l.adding[addrs[i]]; !ok {
l.adding[addrs[i]] = true
addr := addrs[i]
go func() {
defer runtime.HandleCrash()
// Actually adding tunnel to list will block until lock
// is released after deletions.
l.createAndAddTunnel(addr)
}()
}
} }
}() wantAddrsMap[addrs[i]] = true
}
}
/* this will make sense if we move the lock into SSHTunnelList.
func (l *SSHTunnelList) Dial(network, addr string) (net.Conn, error) {
if len(l.entries) == 0 {
return nil, fmt.Errorf("empty tunnel list.")
}
n := mathrand.Intn(len(l.entries))
return l.entries[n].Tunnel.Dial(network, addr)
}
*/
// Returns a random tunnel, xor an error if there are none.
func (l *SSHTunnelList) PickRandomTunnel() (SSHTunnelEntry, error) {
if len(l.entries) == 0 {
return SSHTunnelEntry{}, fmt.Errorf("empty tunnel list.")
}
n := mathrand.Intn(len(l.entries))
return l.entries[n], nil
}
func (l *SSHTunnelList) Has(addr string) bool {
for ix := range l.entries {
if l.entries[ix].Address == addr {
return true
} }
} // Determine any necessary deletions.
return false var newEntries []sshTunnelEntry
for i := range l.entries {
if _, ok := wantAddrsMap[l.entries[i].Address]; !ok {
tunnelEntry := l.entries[i]
glog.Infof("Removing tunnel to deleted node at %q", tunnelEntry.Address)
go func() {
defer runtime.HandleCrash()
if err := tunnelEntry.Tunnel.Close(); err != nil {
glog.Errorf("Failed to close tunnel to %q: %v", tunnelEntry.Address, err)
}
}()
} else {
newEntries = append(newEntries, l.entries[i])
}
}
l.entries = newEntries
}()
} }
func (l *SSHTunnelList) Len() int { func (l *SSHTunnelList) createAndAddTunnel(addr string) {
return len(l.entries) glog.Infof("Trying to add tunnel to %q", addr)
tunnel, err := l.tunnelCreator.NewSSHTunnel(l.user, l.keyfile, addr)
if err != nil {
glog.Errorf("Failed to create tunnel for %q: %v", addr, err)
return
}
if err := tunnel.Open(); err != nil {
glog.Errorf("Failed to open tunnel to %q: %v", addr, err)
l.tunnelsLock.Lock()
delete(l.adding, addr)
l.tunnelsLock.Unlock()
return
}
l.tunnelsLock.Lock()
l.entries = append(l.entries, sshTunnelEntry{addr, tunnel})
delete(l.adding, addr)
l.tunnelsLock.Unlock()
glog.Infof("Successfully added tunnel for %q", addr)
} }
func EncodePrivateKey(private *rsa.PrivateKey) []byte { func EncodePrivateKey(private *rsa.PrivateKey) []byte {

View File

@ -24,6 +24,9 @@ import (
"reflect" "reflect"
"strings" "strings"
"testing" "testing"
"time"
"k8s.io/kubernetes/pkg/util/wait"
"github.com/golang/glog" "github.com/golang/glog"
"golang.org/x/crypto/ssh" "golang.org/x/crypto/ssh"
@ -163,6 +166,88 @@ func TestSSHTunnel(t *testing.T) {
} }
} }
type fakeTunnel struct{}
func (*fakeTunnel) Open() error {
return nil
}
func (*fakeTunnel) Close() error {
return nil
}
func (*fakeTunnel) Dial(network, address string) (net.Conn, error) {
return nil, nil
}
type fakeTunnelCreator struct{}
func (*fakeTunnelCreator) NewSSHTunnel(string, string, string) (tunnel, error) {
return &fakeTunnel{}, nil
}
func TestSSHTunnelListUpdate(t *testing.T) {
// Start with an empty tunnel list.
l := &SSHTunnelList{
adding: make(map[string]bool),
tunnelCreator: &fakeTunnelCreator{},
}
// Start with 2 tunnels.
addressStrings := []string{"1.2.3.4", "5.6.7.8"}
l.Update(addressStrings)
checkTunnelsCorrect(t, l, addressStrings)
// Add another tunnel.
addressStrings = append(addressStrings, "9.10.11.12")
l.Update(addressStrings)
checkTunnelsCorrect(t, l, addressStrings)
// Go down to a single tunnel.
addressStrings = []string{"1.2.3.4"}
l.Update(addressStrings)
checkTunnelsCorrect(t, l, addressStrings)
// Replace w/ all new tunnels.
addressStrings = []string{"21.22.23.24", "25.26.27.28"}
l.Update(addressStrings)
checkTunnelsCorrect(t, l, addressStrings)
// Call update with the same tunnels.
l.Update(addressStrings)
checkTunnelsCorrect(t, l, addressStrings)
}
func checkTunnelsCorrect(t *testing.T, tunnelList *SSHTunnelList, addresses []string) {
if err := wait.Poll(100*time.Millisecond, 2*time.Second, func() (bool, error) {
return hasCorrectTunnels(tunnelList, addresses), nil
}); err != nil {
t.Errorf("Error waiting for tunnels to reach expected state: %v. Expected %v, had %v", err, addresses, tunnelList)
}
}
func hasCorrectTunnels(tunnelList *SSHTunnelList, addresses []string) bool {
tunnelList.tunnelsLock.Lock()
defer tunnelList.tunnelsLock.Unlock()
wantMap := make(map[string]bool)
for _, addr := range addresses {
wantMap[addr] = true
}
haveMap := make(map[string]bool)
for _, entry := range tunnelList.entries {
if wantMap[entry.Address] == false {
return false
}
haveMap[entry.Address] = true
}
for _, addr := range addresses {
if haveMap[addr] == false {
return false
}
}
return true
}
type mockSSHDialer struct { type mockSSHDialer struct {
network string network string
addr string addr string