From 82afaaf31d83bb180508915a0f002ada292905c0 Mon Sep 17 00:00:00 2001 From: CJ Cullen Date: Mon, 18 May 2015 10:34:50 -0700 Subject: [PATCH 01/10] Mount cloud-config files for cloudproviders in kube-apiserver & kube-controllermanager. --- .../kube-apiserver/kube-apiserver.manifest | 18 ++++++++---------- .../kube-controller-manager.manifest | 6 ++++++ 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest b/cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest index 7012802e344..479c6a10b4c 100644 --- a/cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest +++ b/cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest @@ -5,21 +5,17 @@ {% set cloud_provider = "" -%} {% set cloud_config = "" -%} +{% set cloud_config_mount = "" -%} +{% set cloud_config_volume = "" -%} {% if grains.cloud is defined -%} -{% set cloud_provider = "--cloud_provider=" + grains.cloud -%} + {% set cloud_provider = "--cloud_provider=" + grains.cloud -%} -{% if grains.cloud == 'gce' -%} - {% if grains.cloud_config is defined -%} + {% if grains.cloud in [ 'aws', 'gce' ] and grains.cloud_config is defined -%} {% set cloud_config = "--cloud_config=" + grains.cloud_config -%} + {% set cloud_config_mount = "{\"name\": \"cloudconfigmount\",\"mountPath\": \"" + grains.cloud_config + "\", \"readOnly\": true}," -%} + {% set cloud_config_volume = "{\"name\": \"cloudconfigmount\",\"hostPath\": {\"path\": \"" + grains.cloud_config + "\"}}," -%} {% endif -%} - -{% elif grains.cloud == 'aws' -%} - {% if grains.cloud_config is defined -%} - {% set cloud_config = "--cloud_config=" + grains.cloud_config -%} - {% endif -%} -{% endif -%} - {% endif -%} {% set advertise_address = "" -%} @@ -111,6 +107,7 @@ "hostPort": 8080} ], "volumeMounts": [ + {{cloud_config_mount}} { "name": "srvkube", "mountPath": "/srv/kubernetes", "readOnly": true}, @@ -145,6 +142,7 @@ } ], "volumes":[ + {{cloud_config_volume}} { "name": "srvkube", "hostPath": { "path": "/srv/kubernetes"} diff --git a/cluster/saltbase/salt/kube-controller-manager/kube-controller-manager.manifest b/cluster/saltbase/salt/kube-controller-manager/kube-controller-manager.manifest index a446187bb99..b5e1736d521 100644 --- a/cluster/saltbase/salt/kube-controller-manager/kube-controller-manager.manifest +++ b/cluster/saltbase/salt/kube-controller-manager/kube-controller-manager.manifest @@ -14,6 +14,8 @@ {% set cloud_provider = "" -%} {% set cloud_config = "" -%} +{% set cloud_config_mount = "" -%} +{% set cloud_config_volume = "" -%} {% if grains.cloud is defined -%} {% set cloud_provider = "--cloud_provider=" + grains.cloud -%} @@ -21,6 +23,8 @@ {% if grains.cloud in [ 'aws', 'gce' ] and grains.cloud_config is defined -%} {% set cloud_config = "--cloud_config=" + grains.cloud_config -%} + {% set cloud_config_mount = "{\"name\": \"cloudconfigmount\",\"mountPath\": \"" + grains.cloud_config + "\", \"readOnly\": true}," -%} + {% set cloud_config_volume = "{\"name\": \"cloudconfigmount\",\"hostPath\": {\"path\": \"" + grains.cloud_config + "\"}}," -%} {% endif -%} {% endif -%} @@ -42,6 +46,7 @@ "/usr/local/bin/kube-controller-manager {{params}} 1>>/var/log/kube-controller-manager.log 2>&1" ], "volumeMounts": [ + {{cloud_config_mount}} { "name": "srvkube", "mountPath": "/srv/kubernetes", "readOnly": true}, @@ -76,6 +81,7 @@ } ], "volumes":[ + {{cloud_config_volume}} { "name": "srvkube", "hostPath": { "path": "/srv/kubernetes"} From 631cf34d40c78f8082d62d8e22b36e2369f48953 Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Wed, 27 May 2015 19:34:31 -0700 Subject: [PATCH 02/10] Add tunnelling to the proxy API handler. --- pkg/apiserver/proxy.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/apiserver/proxy.go b/pkg/apiserver/proxy.go index 9dab573dc82..8609da4e442 100644 --- a/pkg/apiserver/proxy.go +++ b/pkg/apiserver/proxy.go @@ -119,6 +119,9 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { httpCode = http.StatusNotFound return } + // TODO: make this dynamic + location.Host = "localhost" + location.Scheme = "http" // Default to http if location.Scheme == "" { From 30a89968a49c01f84bf6b61167c1a5401989731f Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Wed, 27 May 2015 21:38:21 -0700 Subject: [PATCH 03/10] Initial proxy tunnelling. --- cmd/kube-apiserver/app/server.go | 6 ++ pkg/apiserver/api_installer.go | 5 +- pkg/apiserver/apiserver.go | 5 +- pkg/apiserver/proxy.go | 8 +- pkg/master/master.go | 102 +++++++++++++++++++- pkg/util/ssh.go | 155 +++++++++++++++++++++++++------ 6 files changed, 244 insertions(+), 37 deletions(-) diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 4a6e7322830..dc4b4f6859b 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -97,6 +97,8 @@ type APIServer struct { MaxRequestsInFlight int MinRequestTimeout int LongRunningRequestRE string + SSHUser string + SSHKeyfile string } // NewAPIServer creates a new APIServer object with default parameters @@ -201,6 +203,8 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) { fs.IntVar(&s.MaxRequestsInFlight, "max-requests-inflight", 400, "The maximum number of requests in flight at a given time. When the server exceeds this, it rejects requests. Zero for no limit.") fs.IntVar(&s.MinRequestTimeout, "min-request-timeout", 1800, "An optional field indicating the minimum number of seconds a handler must keep a request open before timing it out. Currently only honored by the watch request handler, which picks a randomized value above this number as the connection timeout, to spread out load.") fs.StringVar(&s.LongRunningRequestRE, "long-running-request-regexp", "[.*\\/watch$][^\\/proxy.*]", "A regular expression matching long running requests which should be excluded from maximum inflight request handling.") + fs.StringVar(&s.SSHUser, "ssh-user", "", "If non-empty, use secure SSH proxy to the nodes, using this user name") + fs.StringVar(&s.SSHKeyfile, "ssh-keyfile", "", "If non-empty, use secure SSH proxy to the nodes, using this user keyfile") } // TODO: Longer term we should read this from some config store, rather than a flag. @@ -378,6 +382,8 @@ func (s *APIServer) Run(_ []string) error { ClusterName: s.ClusterName, ExternalHost: s.ExternalHost, MinRequestTimeout: s.MinRequestTimeout, + SSHUser: s.SSHUser, + SSHKeyfile: s.SSHKeyfile, } m := master.New(config) diff --git a/pkg/apiserver/api_installer.go b/pkg/apiserver/api_installer.go index 2fdcaba9073..7c0cc173df5 100644 --- a/pkg/apiserver/api_installer.go +++ b/pkg/apiserver/api_installer.go @@ -18,6 +18,7 @@ package apiserver import ( "fmt" + "net" "net/http" "net/url" gpath "path" @@ -55,14 +56,14 @@ type action struct { var errEmptyName = errors.NewBadRequest("name must be provided") // Installs handlers for API resources. -func (a *APIInstaller) Install() (ws *restful.WebService, errors []error) { +func (a *APIInstaller) Install(proxyDialer func(network, addr string) (net.Conn, error)) (ws *restful.WebService, errors []error) { errors = make([]error, 0) // Create the WebService. ws = a.newWebService() redirectHandler := (&RedirectHandler{a.group.Storage, a.group.Codec, a.group.Context, a.info}) - proxyHandler := (&ProxyHandler{a.prefix + "/proxy/", a.group.Storage, a.group.Codec, a.group.Context, a.info}) + proxyHandler := (&ProxyHandler{a.prefix + "/proxy/", a.group.Storage, a.group.Codec, a.group.Context, a.info, proxyDialer}) // Register the paths in a deterministic (sorted) order to get a deterministic swagger spec. paths := make([]string, len(a.group.Storage)) diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index addf2d447c5..3d7b440b3c3 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "io/ioutil" + "net" "net/http" "path" "strconv" @@ -149,7 +150,7 @@ type RestContainer struct { // InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container. // It is expected that the provided path root prefix will serve all operations. Root MUST NOT end // in a slash. A restful WebService is created for the group and version. -func (g *APIGroupVersion) InstallREST(container *RestContainer) error { +func (g *APIGroupVersion) InstallREST(container *RestContainer, proxyDialer func(network, addr string) (net.Conn, error)) error { info := &APIRequestInfoResolver{util.NewStringSet(strings.TrimPrefix(g.Root, "/")), g.Mapper} prefix := path.Join(g.Root, g.Version) @@ -159,7 +160,7 @@ func (g *APIGroupVersion) InstallREST(container *RestContainer) error { prefix: prefix, minRequestTimeout: container.MinRequestTimeout, } - ws, registrationErrors := installer.Install() + ws, registrationErrors := installer.Install(proxyDialer) container.Add(ws) return errors.NewAggregate(registrationErrors) } diff --git a/pkg/apiserver/proxy.go b/pkg/apiserver/proxy.go index 8609da4e442..3f5bd826ac0 100644 --- a/pkg/apiserver/proxy.go +++ b/pkg/apiserver/proxy.go @@ -49,6 +49,8 @@ type ProxyHandler struct { codec runtime.Codec context api.RequestContextMapper apiRequestInfoResolver *APIRequestInfoResolver + + dial func(network, addr string) (net.Conn, error) } func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { @@ -119,9 +121,9 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { httpCode = http.StatusNotFound return } - // TODO: make this dynamic - location.Host = "localhost" - location.Scheme = "http" + if r.dial != nil { + transport = &http.Transport{Dial: r.dial} + } // Default to http if location.Scheme == "" { diff --git a/pkg/master/master.go b/pkg/master/master.go index 868832ba722..94835933052 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -39,6 +39,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/auth/authorizer" "github.com/GoogleCloudPlatform/kubernetes/pkg/auth/handlers" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/componentstatus" controlleretcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller/etcd" @@ -143,6 +145,10 @@ type Config struct { // The range of ports to be assigned to services with type=NodePort or greater ServiceNodePortRange util.PortRange + + // Used for secure proxy. If empty, don't use secure proxy. + SSHUser string + SSHKeyfile string } // Master contains state for a Kubernetes cluster master/api server. @@ -196,6 +202,9 @@ type Master struct { // "Outputs" Handler http.Handler InsecureHandler http.Handler + + // Used for secure proxy + tunnels util.SSHTunnelList } // NewEtcdHelper returns an EtcdHelper for the provided arguments or an error if the version @@ -474,15 +483,22 @@ func (m *Master) init(c *Config) { "componentStatuses": componentstatus.NewStorage(func() map[string]apiserver.Server { return m.getServersToValidate(c) }), } + var proxyDialer func(net, addr string) (net.Conn, error) + if len(c.SSHUser) > 0 { + glog.Infof("Setting up proxy: %s %s", c.SSHUser, c.SSHKeyfile) + m.setupSecureProxy(c.SSHUser, c.SSHKeyfile) + proxyDialer = m.Dial + } + apiVersions := []string{} if m.v1beta3 { - if err := m.api_v1beta3().InstallREST(m.handlerContainer); err != nil { + if err := m.api_v1beta3().InstallREST(m.handlerContainer, proxyDialer); err != nil { glog.Fatalf("Unable to setup API v1beta3: %v", err) } apiVersions = append(apiVersions, "v1beta3") } if m.v1 { - if err := m.api_v1().InstallREST(m.handlerContainer); err != nil { + if err := m.api_v1().InstallREST(m.handlerContainer, proxyDialer); err != nil { glog.Fatalf("Unable to setup API v1: %v", err) } apiVersions = append(apiVersions, "v1") @@ -703,3 +719,85 @@ func (m *Master) api_v1() *apiserver.APIGroupVersion { version.Codec = v1.Codec return version } + +func findExternalAddress(node *api.Node) (string, error) { + for ix := range node.Status.Addresses { + addr := &node.Status.Addresses[ix] + if addr.Type == api.NodeExternalIP { + return addr.Address, nil + } + } + return "", fmt.Errorf("Couldn't find external address: %v", node) +} + +func (m *Master) Dial(net, addr string) (net.Conn, error) { + return m.tunnels.Dial(net, addr) +} + +func (m *Master) detectTunnelChanges(addrs []string) bool { + if len(m.tunnels) != len(addrs) { + return true + } + for ix := range addrs { + if !m.tunnels.Has(addrs[ix]) { + return true + } + } + return false +} + +func (m *Master) loadTunnels(user, keyfile string) error { + nodes, err := m.nodeRegistry.ListMinions(api.NewDefaultContext(), labels.Everything(), fields.Everything()) + if err != nil { + return err + } + result := []string{} + for ix := range nodes.Items { + node := &nodes.Items[ix] + addr, err := findExternalAddress(node) + if err != nil { + return err + } + result = append(result, addr) + } + changesExist := m.detectTunnelChanges(result) + if !changesExist { + return nil + } + + // TODO: This is going to drop connections in the middle. See comment about using Watch above. + tunnels, err := util.MakeSSHTunnels(user, keyfile, result) + if err != nil { + return err + } + tunnels.Open() + if m.tunnels != nil { + m.tunnels.Close() + } + m.tunnels = tunnels + return nil +} + +func (m *Master) setupSecureProxy(user, keyfile string) { + loadTunnelsPrintError := func() { + if err := m.loadTunnels(user, keyfile); err != nil { + glog.Errorf("Failed to load SSH Tunnels: %v", err) + } + } + + // Sync loop for tunnels + // TODO: switch this to watch. + go func() { + for { + loadTunnelsPrintError() + + var sleep time.Duration + if len(m.tunnels) == 0 { + sleep = time.Second + } else { + sleep = time.Second * 120 + } + time.Sleep(sleep) + } + }() +} diff --git a/pkg/util/ssh.go b/pkg/util/ssh.go index 7760761b339..09b82c2d200 100644 --- a/pkg/util/ssh.go +++ b/pkg/util/ssh.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "io/ioutil" + "math/rand" "net" "os" @@ -31,15 +32,12 @@ import ( // TODO: Unit tests for this code, we can spin up a test SSH server with instructions here: // https://godoc.org/golang.org/x/crypto/ssh#ServerConn type SSHTunnel struct { - Config *ssh.ClientConfig - Host string - SSHPort int - LocalPort int - RemoteHost string - RemotePort int - running bool - sock net.Listener - client *ssh.Client + Config *ssh.ClientConfig + Host string + SSHPort string + running bool + sock net.Listener + client *ssh.Client } func (s *SSHTunnel) copyBytes(out io.Writer, in io.Reader) { @@ -48,7 +46,7 @@ func (s *SSHTunnel) copyBytes(out io.Writer, in io.Reader) { } } -func NewSSHTunnel(user, keyfile, host, remoteHost string, localPort, remotePort int) (*SSHTunnel, error) { +func NewSSHTunnel(user, keyfile, host string) (*SSHTunnel, error) { signer, err := MakePrivateKeySigner(keyfile) if err != nil { return nil, err @@ -58,44 +56,51 @@ func NewSSHTunnel(user, keyfile, host, remoteHost string, localPort, remotePort Auth: []ssh.AuthMethod{ssh.PublicKeys(signer)}, } return &SSHTunnel{ - Config: &config, - Host: host, - SSHPort: 22, - LocalPort: localPort, - RemotePort: remotePort, - RemoteHost: remoteHost, + Config: &config, + Host: host, + SSHPort: "22", }, nil } func (s *SSHTunnel) Open() error { var err error - s.client, err = ssh.Dial("tcp", fmt.Sprintf("%s:%d", s.Host, s.SSHPort), s.Config) + s.client, err = ssh.Dial("tcp", net.JoinHostPort(s.Host, s.SSHPort), s.Config) if err != nil { return err } - s.sock, err = net.Listen("tcp", fmt.Sprintf("localhost:%d", s.LocalPort)) + return nil +} + +func (s *SSHTunnel) Dial(network, address string) (net.Conn, error) { + return s.client.Dial(network, address) +} + +func (s *SSHTunnel) Listen(remoteHost, localPort, remotePort string) error { + var err error + s.sock, err = net.Listen("tcp", net.JoinHostPort("localhost", localPort)) if err != nil { return err } s.running = true - return nil -} - -func (s *SSHTunnel) Listen() { for s.running { conn, err := s.sock.Accept() if err != nil { - glog.Errorf("Error listening for ssh tunnel to %s (%v)", s.RemoteHost, err) + if s.running { + glog.Errorf("Error listening for ssh tunnel to %s (%v)", remoteHost, err) + } else { + glog.V(4).Infof("Error listening for ssh tunnel to %s (%v), this is likely due to the tunnel shutting down.", remoteHost, err) + } continue } - if err := s.tunnel(conn); err != nil { + if err := s.tunnel(conn, remoteHost, remotePort); err != nil { glog.Errorf("Error starting tunnel: %v", err) } } + return nil } -func (s *SSHTunnel) tunnel(conn net.Conn) error { - tunnel, err := s.client.Dial("tcp", fmt.Sprintf("%s:%d", s.RemoteHost, s.RemotePort)) +func (s *SSHTunnel) tunnel(conn net.Conn, remoteHost, remotePort string) error { + tunnel, err := s.client.Dial("tcp", net.JoinHostPort(remoteHost, remotePort)) if err != nil { return err } @@ -104,13 +109,16 @@ func (s *SSHTunnel) tunnel(conn net.Conn) error { return nil } -func (s *SSHTunnel) Close() error { +func (s *SSHTunnel) StopListening() error { // TODO: try to shutdown copying here? s.running = false - // TODO: Aggregate errors and keep going? if err := s.sock.Close(); err != nil { return err } + return nil +} + +func (s *SSHTunnel) Close() error { if err := s.client.Close(); err != nil { return err } @@ -172,3 +180,94 @@ func MakePrivateKeySigner(key string) (ssh.Signer, error) { } return signer, nil } + +/* +if len(r.tunnels) == 0 { + list, err := listNodes() + if err != nil { + glog.Errorf("unexpected error making tunnels: %v", err) + return + } + tunnels, err := MakeNodeSSHTunnels(list) + if err != nil { + status := errToAPIStatus(err) + writeJSON(status.Code, r.codec, status, w) + httpCode = status.Code + return + } + r.tunnels = tunnels + } + // TODO: round robin here + tunnel := r.tunnels[0] + if err != nil { + status := errToAPIStatus(err) + writeJSON(status.Code, r.codec, status, w) + httpCode = status.Code + return + } + defer func() { + if err := tunnel.Close(); err != nil { + glog.Errorf("Error closing ssh tunnel: %v", err) + } + }() + if err := tunnel.Open(); err != nil { + status := errToAPIStatus(err) + writeJSON(status.Code, r.codec, status, w) + httpCode = status.Code + return + } +*/ + +type SSHTunnelEntry struct { + Address string + Tunnel *SSHTunnel +} + +type SSHTunnelList []SSHTunnelEntry + +func MakeSSHTunnels(user, keyfile string, addresses []string) (SSHTunnelList, error) { + tunnels := []SSHTunnelEntry{} + for ix := range addresses { + addr := addresses[ix] + tunnel, err := NewSSHTunnel(user, keyfile, addr) + if err != nil { + return nil, err + } + tunnels = append(tunnels, SSHTunnelEntry{addr, tunnel}) + } + return tunnels, nil +} + +func (l SSHTunnelList) Open() error { + for ix := range l { + if err := l[ix].Tunnel.Open(); err != nil { + return err + } + } + return nil +} + +func (l SSHTunnelList) Close() error { + for ix := range l { + if err := l[ix].Tunnel.Close(); err != nil { + return err + } + } + return nil +} + +func (l SSHTunnelList) Dial(network, addr string) (net.Conn, error) { + if len(l) == 0 { + return nil, fmt.Errorf("Empty tunnel list.") + } + return l[rand.Int()%len(l)].Tunnel.Dial(network, addr) +} + +func (l SSHTunnelList) Has(addr string) bool { + for ix := range l { + if l[ix].Address == addr { + return true + } + } + return false +} From 5115fd5703c3d1f6ac5c5e2ab3deeec1f206f637 Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Thu, 28 May 2015 11:45:08 -0700 Subject: [PATCH 04/10] Add key generation. --- cmd/kube-apiserver/app/server.go | 6 ++++ pkg/cloudprovider/aws/aws.go | 4 +++ pkg/cloudprovider/cloud.go | 3 ++ pkg/cloudprovider/fake/fake.go | 4 +++ pkg/cloudprovider/gce/gce.go | 34 +++++++++++++++++++ pkg/cloudprovider/mesos/mesos.go | 4 +++ pkg/cloudprovider/openstack/openstack.go | 4 +++ pkg/cloudprovider/ovirt/ovirt.go | 5 +++ pkg/cloudprovider/rackspace/rackspace.go | 4 +++ pkg/cloudprovider/vagrant/vagrant.go | 4 +++ pkg/master/master.go | 40 ++++++++++++++++++++-- pkg/util/ssh.go | 43 ++++++++++++++++++++++-- pkg/util/util.go | 12 +++++++ 13 files changed, 162 insertions(+), 5 deletions(-) diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index dc4b4f6859b..111c6c6cb77 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -357,6 +357,11 @@ func (s *APIServer) Run(_ []string) error { } } + var installSSH master.InstallSSHKey + instances, supported := cloud.Instances() + if supported { + installSSH = instances.AddSSHKeyToAllInstances + } config := &master.Config{ EtcdHelper: helper, EventTTL: s.EventTTL, @@ -384,6 +389,7 @@ func (s *APIServer) Run(_ []string) error { MinRequestTimeout: s.MinRequestTimeout, SSHUser: s.SSHUser, SSHKeyfile: s.SSHKeyfile, + InstallSSHKey: installSSH, } m := master.New(config) diff --git a/pkg/cloudprovider/aws/aws.go b/pkg/cloudprovider/aws/aws.go index 6262f016a4e..70a77e8cc92 100644 --- a/pkg/cloudprovider/aws/aws.go +++ b/pkg/cloudprovider/aws/aws.go @@ -230,6 +230,10 @@ func newEc2Filter(name string, value string) *ec2.Filter { return filter } +func (self *AWSCloud) AddSSHKeyToAllInstances(user string, keyData []byte) error { + return errors.New("unimplemented") +} + // Implementation of EC2.Instances func (self *awsSdkEC2) DescribeInstances(request *ec2.DescribeInstancesInput) ([]*ec2.Instance, error) { // Instances are paged diff --git a/pkg/cloudprovider/cloud.go b/pkg/cloudprovider/cloud.go index 13fdaf4bf30..e5d371ffeb3 100644 --- a/pkg/cloudprovider/cloud.go +++ b/pkg/cloudprovider/cloud.go @@ -108,6 +108,9 @@ type Instances interface { List(filter string) ([]string, error) // GetNodeResources gets the resources for a particular node GetNodeResources(name string) (*api.NodeResources, error) + // AddSSHKeyToAllInstances adds an SSH public key as a legal identity for all instances + // expected format for the key is standard ssh-keygen format: + AddSSHKeyToAllInstances(user string, keyData []byte) error } // Route is a representation of an advanced routing rule. diff --git a/pkg/cloudprovider/fake/fake.go b/pkg/cloudprovider/fake/fake.go index 45ea2468b8a..b3a235f543d 100644 --- a/pkg/cloudprovider/fake/fake.go +++ b/pkg/cloudprovider/fake/fake.go @@ -144,6 +144,10 @@ func (f *FakeCloud) EnsureTCPLoadBalancerDeleted(name, region string) error { return f.Err } +func (f *FakeCloud) AddSSHKeyToAllInstances(user string, keyData []byte) error { + return errors.New("unimplemented") +} + // NodeAddresses is a test-spy implementation of Instances.NodeAddresses. // It adds an entry "node-addresses" into the internal method call record. func (f *FakeCloud) NodeAddresses(instance string) ([]api.NodeAddress, error) { diff --git a/pkg/cloudprovider/gce/gce.go b/pkg/cloudprovider/gce/gce.go index 50ac1fbcc2c..7e9ae02f24e 100644 --- a/pkg/cloudprovider/gce/gce.go +++ b/pkg/cloudprovider/gce/gce.go @@ -22,6 +22,7 @@ import ( "io/ioutil" "net" "net/http" + "os" "path" "strconv" "strings" @@ -474,6 +475,39 @@ func (gce *GCECloud) getInstanceByName(name string) (*compute.Instance, error) { return res, nil } +func (gce *GCECloud) AddSSHKeyToAllInstances(user string, keyData []byte) error { + project, err := gce.service.Projects.Get(gce.projectID).Do() + if err != nil { + return err + } + hostname, err := os.Hostname() + if err != nil { + return err + } + found := false + for _, item := range project.CommonInstanceMetadata.Items { + if item.Key == "sshKeys" { + item.Value = fmt.Sprintf("%s\n%s:%s %s@%s", item.Value, user, string(keyData), user, hostname) + found = true + break + } + } + if !found { + // This is super unlikely, so log. + glog.Infof("Failed to find sshKeys metadata, creating a new item") + project.CommonInstanceMetadata.Items = append(project.CommonInstanceMetadata.Items, + &compute.MetadataItems{ + Key: "sshKeys", + Value: fmt.Sprint("%s:%s %s@%s", user, string(keyData), user, hostname), + }) + } + op, err := gce.service.Projects.SetCommonInstanceMetadata(gce.projectID, project.CommonInstanceMetadata).Do() + if err != nil { + return err + } + return gce.waitForGlobalOp(op) +} + // NodeAddresses is an implementation of Instances.NodeAddresses. func (gce *GCECloud) NodeAddresses(_ string) ([]api.NodeAddress, error) { externalIP, err := gce.metadataAccess(EXTERNAL_IP_METADATA_URL) diff --git a/pkg/cloudprovider/mesos/mesos.go b/pkg/cloudprovider/mesos/mesos.go index cacc3b40b17..600c9050ac3 100644 --- a/pkg/cloudprovider/mesos/mesos.go +++ b/pkg/cloudprovider/mesos/mesos.go @@ -78,6 +78,10 @@ func newMesosCloud(configReader io.Reader) (*MesosCloud, error) { } } +func (c *MesosCloud) AddSSHKeyToAllInstances(user string, keyData []byte) error { + return errors.New("unimplemented") +} + // Instances returns a copy of the Mesos cloud Instances implementation. // Mesos natively provides minimal cloud-type resources. More robust cloud // support requires a combination of Mesos and cloud-specific knowledge. diff --git a/pkg/cloudprovider/openstack/openstack.go b/pkg/cloudprovider/openstack/openstack.go index 38520b28e7e..c4349ebf09d 100644 --- a/pkg/cloudprovider/openstack/openstack.go +++ b/pkg/cloudprovider/openstack/openstack.go @@ -317,6 +317,10 @@ func getAddressByName(api *gophercloud.ServiceClient, name string) (string, erro return s, nil } +func (i *Instances) AddSSHKeyToAllInstances(user string, keyData []byte) error { + return errors.New("unimplemented") +} + func (i *Instances) NodeAddresses(name string) ([]api.NodeAddress, error) { glog.V(4).Infof("NodeAddresses(%v) called", name) diff --git a/pkg/cloudprovider/ovirt/ovirt.go b/pkg/cloudprovider/ovirt/ovirt.go index 6c78202df1d..abcfff64539 100644 --- a/pkg/cloudprovider/ovirt/ovirt.go +++ b/pkg/cloudprovider/ovirt/ovirt.go @@ -18,6 +18,7 @@ package ovirt_cloud import ( "encoding/xml" + "errors" "fmt" "io" "io/ioutil" @@ -273,3 +274,7 @@ func (v *OVirtCloud) List(filter string) ([]string, error) { func (v *OVirtCloud) GetNodeResources(name string) (*api.NodeResources, error) { return nil, nil } + +func (v *OVirtCloud) AddSSHKeyToAllInstances(user string, keyData []byte) error { + return errors.New("unimplemented") +} diff --git a/pkg/cloudprovider/rackspace/rackspace.go b/pkg/cloudprovider/rackspace/rackspace.go index 8df8230da65..38f2c25ebaf 100644 --- a/pkg/cloudprovider/rackspace/rackspace.go +++ b/pkg/cloudprovider/rackspace/rackspace.go @@ -376,6 +376,10 @@ func (i *Instances) InstanceID(name string) (string, error) { return "", nil } +func (i *Instances) AddSSHKeyToAllInstances(user string, keyData []byte) error { + return errors.New("unimplemented") +} + func (i *Instances) GetNodeResources(name string) (*api.NodeResources, error) { glog.V(2).Infof("GetNodeResources(%v) called", name) diff --git a/pkg/cloudprovider/vagrant/vagrant.go b/pkg/cloudprovider/vagrant/vagrant.go index 2bd18150d17..e4e89ed453e 100644 --- a/pkg/cloudprovider/vagrant/vagrant.go +++ b/pkg/cloudprovider/vagrant/vagrant.go @@ -131,6 +131,10 @@ func (v *VagrantCloud) getInstanceByAddress(address string) (*SaltMinion, error) return nil, fmt.Errorf("unable to find instance for address: %s", address) } +func (v *VagrantCloud) AddSSHKeyToAllInstances(user string, keyData []byte) error { + return errors.New("unimplemented") +} + // NodeAddresses returns the NodeAddresses of a particular machine instance. func (v *VagrantCloud) NodeAddresses(instance string) ([]api.NodeAddress, error) { // Due to vagrant not running with a dedicated DNS setup, we return the IP address of a minion as its hostname at this time diff --git a/pkg/master/master.go b/pkg/master/master.go index 94835933052..a36b620634a 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -18,7 +18,9 @@ package master import ( "bytes" + "errors" "fmt" + "io/ioutil" "net" "net/http" "net/http/pprof" @@ -147,10 +149,13 @@ type Config struct { ServiceNodePortRange util.PortRange // Used for secure proxy. If empty, don't use secure proxy. - SSHUser string - SSHKeyfile string + SSHUser string + SSHKeyfile string + InstallSSHKey InstallSSHKey } +type InstallSSHKey func(user string, data []byte) error + // Master contains state for a Kubernetes cluster master/api server. type Master struct { // "Inputs", Copied from Config @@ -204,7 +209,8 @@ type Master struct { InsecureHandler http.Handler // Used for secure proxy - tunnels util.SSHTunnelList + tunnels util.SSHTunnelList + installSSHKey InstallSSHKey } // NewEtcdHelper returns an EtcdHelper for the provided arguments or an error if the version @@ -486,6 +492,16 @@ func (m *Master) init(c *Config) { var proxyDialer func(net, addr string) (net.Conn, error) if len(c.SSHUser) > 0 { glog.Infof("Setting up proxy: %s %s", c.SSHUser, c.SSHKeyfile) + exists, err := util.FileExists(c.SSHKeyfile) + if err != nil { + glog.Errorf("Error detecting if key exists: %v", err) + } else if !exists { + glog.Infof("Key doesn't exist, attempting to create") + err := m.generateSSHKey(c.SSHUser, c.SSHKeyfile) + if err != nil { + glog.Errorf("Failed to create key pair: %v", err) + } + } m.setupSecureProxy(c.SSHUser, c.SSHKeyfile) proxyDialer = m.Dial } @@ -801,3 +817,21 @@ func (m *Master) setupSecureProxy(user, keyfile string) { } }() } + +func (m *Master) generateSSHKey(user, keyfile string) error { + if m.installSSHKey == nil { + return errors.New("ssh install function is null") + } + + private, public, err := util.GenerateKey(2048) + if err != nil { + return err + } + ioutil.WriteFile(keyfile, util.EncodePrivateKey(private), 0600) + data, err := util.EncodeSSHKey(public) + if err != nil { + return err + } + fmt.Printf("FOO: %s", data) + return m.installSSHKey(user, data) +} diff --git a/pkg/util/ssh.go b/pkg/util/ssh.go index 09b82c2d200..2ed99e74bc6 100644 --- a/pkg/util/ssh.go +++ b/pkg/util/ssh.go @@ -18,10 +18,14 @@ package util import ( "bytes" + "crypto/rand" + "crypto/rsa" + "crypto/x509" + "encoding/pem" "fmt" "io" "io/ioutil" - "math/rand" + mathrand "math/rand" "net" "os" @@ -260,7 +264,7 @@ func (l SSHTunnelList) Dial(network, addr string) (net.Conn, error) { if len(l) == 0 { return nil, fmt.Errorf("Empty tunnel list.") } - return l[rand.Int()%len(l)].Tunnel.Dial(network, addr) + return l[mathrand.Int()%len(l)].Tunnel.Dial(network, addr) } func (l SSHTunnelList) Has(addr string) bool { @@ -271,3 +275,38 @@ func (l SSHTunnelList) Has(addr string) bool { } return false } + +func EncodePrivateKey(private *rsa.PrivateKey) []byte { + return pem.EncodeToMemory(&pem.Block{ + Bytes: x509.MarshalPKCS1PrivateKey(private), + Type: "RSA PRIVATE KEY", + }) +} + +func EncodePublicKey(public *rsa.PublicKey) ([]byte, error) { + publicBytes, err := x509.MarshalPKIXPublicKey(public) + if err != nil { + return nil, err + } + + return pem.EncodeToMemory(&pem.Block{ + Bytes: publicBytes, + Type: "PUBLIC KEY", + }), nil +} + +func EncodeSSHKey(public *rsa.PublicKey) ([]byte, error) { + publicKey, err := ssh.NewPublicKey(public) + if err != nil { + return nil, err + } + return ssh.MarshalAuthorizedKey(publicKey), nil +} + +func GenerateKey(bits int) (*rsa.PrivateKey, *rsa.PublicKey, error) { + private, err := rsa.GenerateKey(rand.Reader, bits) + if err != nil { + return nil, nil, err + } + return private, &private.PublicKey, nil +} diff --git a/pkg/util/util.go b/pkg/util/util.go index 7ba9c78f3dd..a6e33774caf 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -515,4 +515,16 @@ func ShortenString(str string, n int) string { } else { return str[:n] } + +func FileExists(filename string) (bool, error) { + file, err := os.Open(filename) + defer file.Close() + if err != nil { + if os.IsNotExist(err) { + return false, nil + } else { + return false, err + } + } + return true, nil } From de9a5f43bc66a8cf695e69bdd436acde4078a665 Mon Sep 17 00:00:00 2001 From: CJ Cullen Date: Fri, 29 May 2015 14:29:17 -0700 Subject: [PATCH 05/10] Specify sshUser, sshKeyfile in kube-apiserver manifest. Trim space on ssh key so GCE doesn't treat it as 2 lines. A couple other minor fixes. --- .../saltbase/salt/kube-apiserver/kube-apiserver.manifest | 2 +- pkg/cloudprovider/gce/gce.go | 2 ++ pkg/master/master.go | 6 +++++- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest b/cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest index 479c6a10b4c..fe7ffb2c3d7 100644 --- a/cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest +++ b/cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest @@ -81,7 +81,7 @@ {% endif -%} {% set params = address + " " + etcd_servers + " " + cloud_provider + " " + cloud_config + " " + runtime_config + " " + admission_control + " " + service_cluster_ip_range + " " + client_ca_file + " " + basic_auth_file + " " + min_request_timeout -%} -{% set params = params + " " + cluster_name + " " + cert_file + " " + key_file + " --secure_port=" + secure_port + " " + token_auth_file + " " + bind_address + " " + pillar['log_level'] + " " + advertise_address -%} +{% set params = params + " " + cluster_name + " " + cert_file + " " + key_file + " --secure_port=" + secure_port + " " + token_auth_file + " " + bind_address + " " + pillar['log_level'] + " " + advertise_address + " --ssh-user=root --ssh-keyfile=/.sshkeyfile"-%} { "apiVersion": "v1beta3", diff --git a/pkg/cloudprovider/gce/gce.go b/pkg/cloudprovider/gce/gce.go index 7e9ae02f24e..4f951406402 100644 --- a/pkg/cloudprovider/gce/gce.go +++ b/pkg/cloudprovider/gce/gce.go @@ -17,6 +17,7 @@ limitations under the License. package gce_cloud import ( + "bytes" "fmt" "io" "io/ioutil" @@ -484,6 +485,7 @@ func (gce *GCECloud) AddSSHKeyToAllInstances(user string, keyData []byte) error if err != nil { return err } + keyData = bytes.TrimSpace(keyData) found := false for _, item := range project.CommonInstanceMetadata.Items { if item.Key == "sshKeys" { diff --git a/pkg/master/master.go b/pkg/master/master.go index a36b620634a..9619255defa 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -340,6 +340,8 @@ func New(c *Config) *Master { serviceReadWriteIP: serviceReadWriteIP, // TODO: serviceReadWritePort should be passed in as an argument, it may not always be 443 serviceReadWritePort: 443, + + installSSHKey: c.InstallSSHKey, } var handlerContainer *restful.Container @@ -827,7 +829,9 @@ func (m *Master) generateSSHKey(user, keyfile string) error { if err != nil { return err } - ioutil.WriteFile(keyfile, util.EncodePrivateKey(private), 0600) + if err := ioutil.WriteFile(keyfile, util.EncodePrivateKey(private), 0600); err != nil { + return err + } data, err := util.EncodeSSHKey(public) if err != nil { return err From 7ea533d871dc0b80b3b84f2e35d14a0401918a21 Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Fri, 29 May 2015 15:33:22 -0700 Subject: [PATCH 06/10] Add the SSHTunnel transport to the kubelet client. --- pkg/apiserver/apiserver_test.go | 6 +++--- pkg/apiserver/proxy.go | 3 ++- pkg/client/helper.go | 3 +++ pkg/client/kubelet.go | 23 +++++++++++++++++++---- pkg/cloudprovider/fake/fake.go | 1 + pkg/master/master.go | 15 +++++++++++++++ 6 files changed, 43 insertions(+), 8 deletions(-) diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index fdefac6615d..3bab81bbac8 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -231,7 +231,7 @@ func handleInternal(legacy bool, storage map[string]rest.Storage, admissionContr container := restful.NewContainer() container.Router(restful.CurlyRouter{}) mux := container.ServeMux - if err := group.InstallREST(&RestContainer{container, 0}); err != nil { + if err := group.InstallREST(&RestContainer{container, 0}, nil); err != nil { panic(fmt.Sprintf("unable to install container %s: %v", group.Version, err)) } ws := new(restful.WebService) @@ -1901,7 +1901,7 @@ func TestParentResourceIsRequired(t *testing.T) { Codec: newCodec, } container := restful.NewContainer() - if err := group.InstallREST(&RestContainer{container, 0}); err == nil { + if err := group.InstallREST(&RestContainer{container, 0}, nil); err == nil { t.Fatal("expected error") } @@ -1929,7 +1929,7 @@ func TestParentResourceIsRequired(t *testing.T) { Codec: newCodec, } container = restful.NewContainer() - if err := group.InstallREST(&RestContainer{container, 0}); err != nil { + if err := group.InstallREST(&RestContainer{container, 0}, nil); err != nil { t.Fatal(err) } diff --git a/pkg/apiserver/proxy.go b/pkg/apiserver/proxy.go index 3f5bd826ac0..aa1967bceee 100644 --- a/pkg/apiserver/proxy.go +++ b/pkg/apiserver/proxy.go @@ -121,7 +121,8 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { httpCode = http.StatusNotFound return } - if r.dial != nil { + // If we have a custom dialer, and no pre-existing transport, initialize it to use the dialer. + if transport == nil && r.dial != nil { transport = &http.Transport{Dial: r.dial} } diff --git a/pkg/client/helper.go b/pkg/client/helper.go index 8f58ab722a4..8834526a834 100644 --- a/pkg/client/helper.go +++ b/pkg/client/helper.go @@ -102,6 +102,9 @@ type KubeletConfig struct { // HTTPTimeout is used by the client to timeout http requests to Kubelet. HTTPTimeout time.Duration + + // Dial is a custom dialer used for the client + Dial func(net, addr string) (net.Conn, error) } // TLSClientConfig contains settings to enable transport layer security diff --git a/pkg/client/kubelet.go b/pkg/client/kubelet.go index f456f071127..fbdeff1677f 100644 --- a/pkg/client/kubelet.go +++ b/pkg/client/kubelet.go @@ -45,14 +45,20 @@ type ConnectionInfoGetter interface { // HTTPKubeletClient is the default implementation of KubeletHealthchecker, accesses the kubelet over HTTP. type HTTPKubeletClient struct { Client *http.Client + Config *KubeletConfig Port uint EnableHttps bool } -// TODO: this structure is questionable, it should be using client.Config and overriding defaults. -func NewKubeletClient(config *KubeletConfig) (KubeletClient, error) { - transport := http.DefaultTransport - +func MakeTransport(config *KubeletConfig) (http.RoundTripper, error) { + var transport http.RoundTripper + if config.Dial == nil { + transport = http.DefaultTransport + } else { + transport = &http.Transport{ + Dial: config.Dial, + } + } cfg := &Config{TLSClientConfig: config.TLSClientConfig} if config.EnableHttps { hasCA := len(config.CAFile) > 0 || len(config.CAData) > 0 @@ -69,13 +75,22 @@ func NewKubeletClient(config *KubeletConfig) (KubeletClient, error) { TLSClientConfig: tlsConfig, } } + return transport, nil +} +// TODO: this structure is questionable, it should be using client.Config and overriding defaults. +func NewKubeletClient(config *KubeletConfig) (KubeletClient, error) { + transport, err := MakeTransport(config) + if err != nil { + return nil, err + } c := &http.Client{ Transport: transport, Timeout: config.HTTPTimeout, } return &HTTPKubeletClient{ Client: c, + Config: config, Port: config.Port, EnableHttps: config.EnableHttps, }, nil diff --git a/pkg/cloudprovider/fake/fake.go b/pkg/cloudprovider/fake/fake.go index b3a235f543d..ed81d523899 100644 --- a/pkg/cloudprovider/fake/fake.go +++ b/pkg/cloudprovider/fake/fake.go @@ -17,6 +17,7 @@ limitations under the License. package fake_cloud import ( + "errors" "fmt" "net" "regexp" diff --git a/pkg/master/master.go b/pkg/master/master.go index 9619255defa..c95af6769da 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -506,6 +506,21 @@ func (m *Master) init(c *Config) { } m.setupSecureProxy(c.SSHUser, c.SSHKeyfile) proxyDialer = m.Dial + + // This is pretty ugly. A better solution would be to pull this all the way up into the + // server.go file. + httpKubeletClient, ok := c.KubeletClient.(*client.HTTPKubeletClient) + if ok { + httpKubeletClient.Config.Dial = m.Dial + transport, err := client.MakeTransport(httpKubeletClient.Config) + if err != nil { + glog.Errorf("Error setting up transport over SSH: %v", err) + } else { + httpKubeletClient.Client.Transport = transport + } + } else { + glog.Errorf("Failed to cast %v to HTTPKubeletClient, skipping SSH tunnel.") + } } apiVersions := []string{} From 1ae8801387d6bc7bc1fce9a90a873d3ff72939cf Mon Sep 17 00:00:00 2001 From: CJ Cullen Date: Tue, 2 Jun 2015 09:52:35 -0700 Subject: [PATCH 07/10] Fix transport creation logic. Refactor loadTunnels to allow one path for load, another for refresh. Make SSHTunnelList.Close sleep for a minute before actually closing each tunnel. --- cluster/kubectl.sh | 3 -- pkg/client/kubelet.go | 16 +++++------ pkg/master/master.go | 64 +++++++++++++++++++++++++++++-------------- pkg/util/ssh.go | 51 ++++++---------------------------- 4 files changed, 60 insertions(+), 74 deletions(-) diff --git a/cluster/kubectl.sh b/cluster/kubectl.sh index e62be51c42b..b62280bfb62 100755 --- a/cluster/kubectl.sh +++ b/cluster/kubectl.sh @@ -102,9 +102,6 @@ kubectl="${KUBECTL_PATH:-${kubectl}}" if [[ "$KUBERNETES_PROVIDER" == "gke" ]]; then detect-project &> /dev/null - config=( - "--context=gke_${PROJECT}_${ZONE}_${CLUSTER_NAME}" - ) elif [[ "$KUBERNETES_PROVIDER" == "ubuntu" || "$KUBERNETES_PROVIDER" == "juju" ]]; then detect-master > /dev/null config=( diff --git a/pkg/client/kubelet.go b/pkg/client/kubelet.go index fbdeff1677f..468a25961bc 100644 --- a/pkg/client/kubelet.go +++ b/pkg/client/kubelet.go @@ -51,14 +51,7 @@ type HTTPKubeletClient struct { } func MakeTransport(config *KubeletConfig) (http.RoundTripper, error) { - var transport http.RoundTripper - if config.Dial == nil { - transport = http.DefaultTransport - } else { - transport = &http.Transport{ - Dial: config.Dial, - } - } + cfg := &Config{TLSClientConfig: config.TLSClientConfig} if config.EnableHttps { hasCA := len(config.CAFile) > 0 || len(config.CAData) > 0 @@ -70,10 +63,15 @@ func MakeTransport(config *KubeletConfig) (http.RoundTripper, error) { if err != nil { return nil, err } - if tlsConfig != nil { + + var transport http.RoundTripper + if config.Dial != nil || tlsConfig != nil { transport = &http.Transport{ + Dial: config.Dial, TLSClientConfig: tlsConfig, } + } else { + transport = http.DefaultTransport } return transport, nil } diff --git a/pkg/master/master.go b/pkg/master/master.go index c95af6769da..2929c05232b 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -767,7 +767,7 @@ func (m *Master) Dial(net, addr string) (net.Conn, error) { return m.tunnels.Dial(net, addr) } -func (m *Master) detectTunnelChanges(addrs []string) bool { +func (m *Master) needToReplaceTunnels(addrs []string) bool { if len(m.tunnels) != len(addrs) { return true } @@ -779,27 +779,25 @@ func (m *Master) detectTunnelChanges(addrs []string) bool { return false } -func (m *Master) loadTunnels(user, keyfile string) error { +func (m *Master) getNodeAddresses() ([]string, error) { nodes, err := m.nodeRegistry.ListMinions(api.NewDefaultContext(), labels.Everything(), fields.Everything()) if err != nil { - return err + return nil, err } - result := []string{} + addrs := []string{} for ix := range nodes.Items { node := &nodes.Items[ix] addr, err := findExternalAddress(node) if err != nil { - return err + return nil, err } - result = append(result, addr) - } - changesExist := m.detectTunnelChanges(result) - if !changesExist { - return nil + addrs = append(addrs, addr) } + return addrs, nil +} - // TODO: This is going to drop connections in the middle. See comment about using Watch above. - tunnels, err := util.MakeSSHTunnels(user, keyfile, result) +func (m *Master) replaceTunnels(user, keyfile string, newAddrs []string) error { + tunnels, err := util.MakeSSHTunnels(user, keyfile, newAddrs) if err != nil { return err } @@ -811,28 +809,54 @@ func (m *Master) loadTunnels(user, keyfile string) error { return nil } -func (m *Master) setupSecureProxy(user, keyfile string) { - loadTunnelsPrintError := func() { - if err := m.loadTunnels(user, keyfile); err != nil { - glog.Errorf("Failed to load SSH Tunnels: %v", err) - } +func (m *Master) loadTunnels(user, keyfile string) error { + addrs, err := m.getNodeAddresses() + if err != nil { + return err } + if !m.needToReplaceTunnels(addrs) { + return nil + } + // TODO: This is going to unnecessarily close connections to unchanged nodes. + // See comment about using Watch above. + return m.replaceTunnels(user, keyfile, addrs) +} +func (m *Master) refreshTunnels(user, keyfile string) error { + addrs, err := m.getNodeAddresses() + if err != nil { + return err + } + return m.replaceTunnels(user, keyfile, addrs) +} + +func (m *Master) setupSecureProxy(user, keyfile string) { // Sync loop for tunnels // TODO: switch this to watch. go func() { for { - loadTunnelsPrintError() - + if err := m.loadTunnels(user, keyfile); err != nil { + glog.Errorf("Failed to load SSH Tunnels: %v", err) + } var sleep time.Duration if len(m.tunnels) == 0 { sleep = time.Second } else { - sleep = time.Second * 120 + sleep = time.Minute } time.Sleep(sleep) } }() + // Refresh loop for tunnels + // TODO: could make this more controller-ish + go func() { + for { + if err := m.refreshTunnels(user, keyfile); err != nil { + glog.Errorf("Failed to refresh SSH Tunnels: %v", err) + } + time.Sleep(5 * time.Minute) + } + }() } func (m *Master) generateSSHKey(user, keyfile string) error { diff --git a/pkg/util/ssh.go b/pkg/util/ssh.go index 2ed99e74bc6..28ba337fc5c 100644 --- a/pkg/util/ssh.go +++ b/pkg/util/ssh.go @@ -28,6 +28,7 @@ import ( mathrand "math/rand" "net" "os" + "time" "github.com/golang/glog" "golang.org/x/crypto/ssh" @@ -185,43 +186,6 @@ func MakePrivateKeySigner(key string) (ssh.Signer, error) { return signer, nil } -/* -if len(r.tunnels) == 0 { - list, err := listNodes() - if err != nil { - glog.Errorf("unexpected error making tunnels: %v", err) - return - } - tunnels, err := MakeNodeSSHTunnels(list) - if err != nil { - status := errToAPIStatus(err) - writeJSON(status.Code, r.codec, status, w) - httpCode = status.Code - return - } - r.tunnels = tunnels - } - // TODO: round robin here - tunnel := r.tunnels[0] - if err != nil { - status := errToAPIStatus(err) - writeJSON(status.Code, r.codec, status, w) - httpCode = status.Code - return - } - defer func() { - if err := tunnel.Close(); err != nil { - glog.Errorf("Error closing ssh tunnel: %v", err) - } - }() - if err := tunnel.Open(); err != nil { - status := errToAPIStatus(err) - writeJSON(status.Code, r.codec, status, w) - httpCode = status.Code - return - } -*/ - type SSHTunnelEntry struct { Address string Tunnel *SSHTunnel @@ -251,13 +215,16 @@ func (l SSHTunnelList) Open() error { return nil } -func (l SSHTunnelList) Close() error { +func (l SSHTunnelList) Close() { for ix := range l { - if err := l[ix].Tunnel.Close(); err != nil { - return err - } + entry := l[ix] + go func() { + time.Sleep(1 * time.Minute) + if err := entry.Tunnel.Close(); err != nil { + glog.Errorf("Failed to close tunnel %v: %v", entry, err) + } + }() } - return nil } func (l SSHTunnelList) Dial(network, addr string) (net.Conn, error) { From cb317604abe31c2d85baab26340e3e9d2a2ee828 Mon Sep 17 00:00:00 2001 From: CJ Cullen Date: Thu, 4 Jun 2015 11:58:38 -0700 Subject: [PATCH 08/10] Some refactoring. Only selectively use ssh proxy. Add NetworkName to gce.Config. Add locking to uses of master.tunnels. --- cluster/gce/configure-vm.sh | 4 ++- .../kube-apiserver/kube-apiserver.manifest | 15 ++++++-- cmd/kube-apiserver/app/server.go | 8 ++--- pkg/client/kubelet.go | 10 ++---- pkg/cloudprovider/gce/gce.go | 32 +++++++++++------ pkg/master/master.go | 17 ++++++++-- pkg/util/ssh.go | 34 +------------------ 7 files changed, 60 insertions(+), 60 deletions(-) diff --git a/cluster/gce/configure-vm.sh b/cluster/gce/configure-vm.sh index fb1b46a3ca6..4a6ab822883 100644 --- a/cluster/gce/configure-vm.sh +++ b/cluster/gce/configure-vm.sh @@ -487,16 +487,18 @@ grains: cbr-cidr: ${MASTER_IP_RANGE} cloud: gce EOF - if ! [[ -z "${PROJECT_ID:-}" ]] && ! [[ -z "${TOKEN_URL:-}" ]]; then + if ! [[ -z "${PROJECT_ID:-}" ]] && ! [[ -z "${TOKEN_URL:-}" ]] && ! [[ -z "${NODE_NETWORK:-}" ]] ; then cat </etc/gce.conf [global] token-url = ${TOKEN_URL} project-id = ${PROJECT_ID} +network-name = ${NODE_NETWORK} EOF EXTERNAL_IP=$(curl --fail --silent -H 'Metadata-Flavor: Google' "http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip") cat <>/etc/salt/minion.d/grains.conf cloud_config: /etc/gce.conf advertise_address: '${EXTERNAL_IP}' + proxy_ssh_user: '${INSTANCE_PREFIX}' EOF fi } diff --git a/cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest b/cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest index fe7ffb2c3d7..f8a903a1452 100644 --- a/cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest +++ b/cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest @@ -23,6 +23,11 @@ {% set advertise_address = "--advertise-address=" + grains.advertise_address -%} {% endif -%} +{% set proxy_ssh_options = "" -%} +{% if grains.proxy_ssh_user is defined -%} + {% set proxy_ssh_options = "--ssh-user=" + grains.proxy_ssh_user + " --ssh-keyfile=/sshproxy/.sshkeyfile" -%} +{% endif -%} + {% set address = "--address=127.0.0.1" -%} {% set cluster_name = "" -%} @@ -81,7 +86,7 @@ {% endif -%} {% set params = address + " " + etcd_servers + " " + cloud_provider + " " + cloud_config + " " + runtime_config + " " + admission_control + " " + service_cluster_ip_range + " " + client_ca_file + " " + basic_auth_file + " " + min_request_timeout -%} -{% set params = params + " " + cluster_name + " " + cert_file + " " + key_file + " --secure_port=" + secure_port + " " + token_auth_file + " " + bind_address + " " + pillar['log_level'] + " " + advertise_address + " --ssh-user=root --ssh-keyfile=/.sshkeyfile"-%} +{% set params = params + " " + cluster_name + " " + cert_file + " " + key_file + " --secure_port=" + secure_port + " " + token_auth_file + " " + bind_address + " " + pillar['log_level'] + " " + advertise_address + " " + proxy_ssh_options -%} { "apiVersion": "v1beta3", @@ -137,7 +142,10 @@ "readOnly": true}, { "name": "etcpkitls", "mountPath": "/etc/pki/tls", - "readOnly": true} + "readOnly": true}, + { "name": "sshproxy", + "mountPath": "/sshproxy", + "readOnly": false} ] } ], @@ -182,6 +190,9 @@ { "name": "etcpkitls", "hostPath": { "path": "/etc/pki/tls"} + }, + { "name": "sshproxy", + "emptyDir": {} } ] }} diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 111c6c6cb77..702db16ce0b 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -356,11 +356,11 @@ func (s *APIServer) Run(_ []string) error { } } } - var installSSH master.InstallSSHKey - instances, supported := cloud.Instances() - if supported { - installSSH = instances.AddSSHKeyToAllInstances + if cloud != nil { + if instances, supported := cloud.Instances(); supported { + installSSH = instances.AddSSHKeyToAllInstances + } } config := &master.Config{ EtcdHelper: helper, diff --git a/pkg/client/kubelet.go b/pkg/client/kubelet.go index 468a25961bc..4f000da923c 100644 --- a/pkg/client/kubelet.go +++ b/pkg/client/kubelet.go @@ -51,7 +51,6 @@ type HTTPKubeletClient struct { } func MakeTransport(config *KubeletConfig) (http.RoundTripper, error) { - cfg := &Config{TLSClientConfig: config.TLSClientConfig} if config.EnableHttps { hasCA := len(config.CAFile) > 0 || len(config.CAData) > 0 @@ -63,17 +62,14 @@ func MakeTransport(config *KubeletConfig) (http.RoundTripper, error) { if err != nil { return nil, err } - - var transport http.RoundTripper if config.Dial != nil || tlsConfig != nil { - transport = &http.Transport{ + return &http.Transport{ Dial: config.Dial, TLSClientConfig: tlsConfig, - } + }, nil } else { - transport = http.DefaultTransport + return http.DefaultTransport, nil } - return transport, nil } // TODO: this structure is questionable, it should be using client.Config and overriding defaults. diff --git a/pkg/cloudprovider/gce/gce.go b/pkg/cloudprovider/gce/gce.go index 4f951406402..96d78daf0e1 100644 --- a/pkg/cloudprovider/gce/gce.go +++ b/pkg/cloudprovider/gce/gce.go @@ -17,7 +17,6 @@ limitations under the License. package gce_cloud import ( - "bytes" "fmt" "io" "io/ioutil" @@ -55,9 +54,7 @@ type GCECloud struct { projectID string zone string instanceID string - - // We assume here that nodes and master are in the same network. TODO(cjcullen) Fix it. - networkName string + networkName string // Used for accessing the metadata server metadataAccess func(string) (string, error) @@ -65,8 +62,9 @@ type GCECloud struct { type Config struct { Global struct { - TokenURL string `gcfg:"token-url"` - ProjectID string `gcfg:"project-id"` + TokenURL string `gcfg:"token-url"` + ProjectID string `gcfg:"project-id"` + NetworkName string `gcfg:"network-name"` } } @@ -155,10 +153,16 @@ func newGCECloud(config io.Reader) (*GCECloud, error) { if config != nil { var cfg Config if err := gcfg.ReadInto(&cfg, config); err != nil { + glog.Errorf("Couldn't read config: %v", err) return nil, err } - if cfg.Global.ProjectID != "" && cfg.Global.TokenURL != "" { + if cfg.Global.ProjectID != "" { projectID = cfg.Global.ProjectID + } + if cfg.Global.NetworkName != "" { + networkName = cfg.Global.NetworkName + } + if cfg.Global.TokenURL != "" { tokenSource = newAltTokenSource(cfg.Global.TokenURL) } } @@ -485,11 +489,11 @@ func (gce *GCECloud) AddSSHKeyToAllInstances(user string, keyData []byte) error if err != nil { return err } - keyData = bytes.TrimSpace(keyData) + keyString := fmt.Sprintf("%s:%s %s@%s", user, strings.TrimSpace(string(keyData)), user, hostname) found := false for _, item := range project.CommonInstanceMetadata.Items { if item.Key == "sshKeys" { - item.Value = fmt.Sprintf("%s\n%s:%s %s@%s", item.Value, user, string(keyData), user, hostname) + item.Value = addKey(item.Value, keyString) found = true break } @@ -500,7 +504,7 @@ func (gce *GCECloud) AddSSHKeyToAllInstances(user string, keyData []byte) error project.CommonInstanceMetadata.Items = append(project.CommonInstanceMetadata.Items, &compute.MetadataItems{ Key: "sshKeys", - Value: fmt.Sprint("%s:%s %s@%s", user, string(keyData), user, hostname), + Value: keyString, }) } op, err := gce.service.Projects.SetCommonInstanceMetadata(gce.projectID, project.CommonInstanceMetadata).Do() @@ -510,6 +514,14 @@ func (gce *GCECloud) AddSSHKeyToAllInstances(user string, keyData []byte) error return gce.waitForGlobalOp(op) } +func addKey(metadataBefore, keyString string) string { + if strings.Contains(metadataBefore, keyString) { + // We've already added this key + return metadataBefore + } + return metadataBefore + "\n" + keyString +} + // NodeAddresses is an implementation of Instances.NodeAddresses. func (gce *GCECloud) NodeAddresses(_ string) ([]api.NodeAddress, error) { externalIP, err := gce.metadataAccess(EXTERNAL_IP_METADATA_URL) diff --git a/pkg/master/master.go b/pkg/master/master.go index 2929c05232b..dc9c282e717 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -28,6 +28,7 @@ import ( rt "runtime" "strconv" "strings" + "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/admission" @@ -210,6 +211,7 @@ type Master struct { // Used for secure proxy tunnels util.SSHTunnelList + tunnelsLock sync.Mutex installSSHKey InstallSSHKey } @@ -340,7 +342,7 @@ func New(c *Config) *Master { serviceReadWriteIP: serviceReadWriteIP, // TODO: serviceReadWritePort should be passed in as an argument, it may not always be 443 serviceReadWritePort: 443, - + installSSHKey: c.InstallSSHKey, } @@ -764,6 +766,8 @@ func findExternalAddress(node *api.Node) (string, error) { } func (m *Master) Dial(net, addr string) (net.Conn, error) { + m.tunnelsLock.Lock() + defer m.tunnelsLock.Unlock() return m.tunnels.Dial(net, addr) } @@ -771,6 +775,7 @@ func (m *Master) needToReplaceTunnels(addrs []string) bool { if len(m.tunnels) != len(addrs) { return true } + // TODO (cjcullen): This doesn't need to be n^2 for ix := range addrs { if !m.tunnels.Has(addrs[ix]) { return true @@ -797,6 +802,7 @@ func (m *Master) getNodeAddresses() ([]string, error) { } func (m *Master) replaceTunnels(user, keyfile string, newAddrs []string) error { + glog.Infof("replacing tunnels. New addrs: %v", newAddrs) tunnels, err := util.MakeSSHTunnels(user, keyfile, newAddrs) if err != nil { return err @@ -810,6 +816,8 @@ func (m *Master) replaceTunnels(user, keyfile string, newAddrs []string) error { } func (m *Master) loadTunnels(user, keyfile string) error { + m.tunnelsLock.Lock() + defer m.tunnelsLock.Unlock() addrs, err := m.getNodeAddresses() if err != nil { return err @@ -819,10 +827,13 @@ func (m *Master) loadTunnels(user, keyfile string) error { } // TODO: This is going to unnecessarily close connections to unchanged nodes. // See comment about using Watch above. + glog.Info("found different nodes. Need to replace tunnels") return m.replaceTunnels(user, keyfile, addrs) } func (m *Master) refreshTunnels(user, keyfile string) error { + m.tunnelsLock.Lock() + defer m.tunnelsLock.Unlock() addrs, err := m.getNodeAddresses() if err != nil { return err @@ -842,6 +853,7 @@ func (m *Master) setupSecureProxy(user, keyfile string) { if len(m.tunnels) == 0 { sleep = time.Second } else { + // tunnels could lag behind current set of nodes sleep = time.Minute } time.Sleep(sleep) @@ -851,10 +863,10 @@ func (m *Master) setupSecureProxy(user, keyfile string) { // TODO: could make this more controller-ish go func() { for { + time.Sleep(5 * time.Minute) if err := m.refreshTunnels(user, keyfile); err != nil { glog.Errorf("Failed to refresh SSH Tunnels: %v", err) } - time.Sleep(5 * time.Minute) } }() } @@ -875,6 +887,5 @@ func (m *Master) generateSSHKey(user, keyfile string) error { if err != nil { return err } - fmt.Printf("FOO: %s", data) return m.installSSHKey(user, data) } diff --git a/pkg/util/ssh.go b/pkg/util/ssh.go index 28ba337fc5c..82285fd74ac 100644 --- a/pkg/util/ssh.go +++ b/pkg/util/ssh.go @@ -80,30 +80,6 @@ func (s *SSHTunnel) Dial(network, address string) (net.Conn, error) { return s.client.Dial(network, address) } -func (s *SSHTunnel) Listen(remoteHost, localPort, remotePort string) error { - var err error - s.sock, err = net.Listen("tcp", net.JoinHostPort("localhost", localPort)) - if err != nil { - return err - } - s.running = true - for s.running { - conn, err := s.sock.Accept() - if err != nil { - if s.running { - glog.Errorf("Error listening for ssh tunnel to %s (%v)", remoteHost, err) - } else { - glog.V(4).Infof("Error listening for ssh tunnel to %s (%v), this is likely due to the tunnel shutting down.", remoteHost, err) - } - continue - } - if err := s.tunnel(conn, remoteHost, remotePort); err != nil { - glog.Errorf("Error starting tunnel: %v", err) - } - } - return nil -} - func (s *SSHTunnel) tunnel(conn net.Conn, remoteHost, remotePort string) error { tunnel, err := s.client.Dial("tcp", net.JoinHostPort(remoteHost, remotePort)) if err != nil { @@ -114,16 +90,8 @@ func (s *SSHTunnel) tunnel(conn net.Conn, remoteHost, remotePort string) error { return nil } -func (s *SSHTunnel) StopListening() error { - // TODO: try to shutdown copying here? - s.running = false - if err := s.sock.Close(); err != nil { - return err - } - return nil -} - func (s *SSHTunnel) Close() error { + glog.Infof("Closing tunnel for host: %q", s.Host) if err := s.client.Close(); err != nil { return err } From 04cd9b3c752e17e3cf26f89eefe1179b77c19d73 Mon Sep 17 00:00:00 2001 From: CJ Cullen Date: Fri, 5 Jun 2015 14:49:26 -0700 Subject: [PATCH 09/10] Make sshproxy use a hostmount on master PD (don't spam sshKeys on upgrade/reboot). Add comment describing what SSHTunnelList.Close() does. Simplify util.FileExists. --- cluster/gce/configure-vm.sh | 3 +++ .../salt/kube-apiserver/kube-apiserver.manifest | 11 ++++++----- pkg/util/ssh.go | 4 +++- pkg/util/util.go | 13 +++++-------- 4 files changed, 17 insertions(+), 14 deletions(-) diff --git a/cluster/gce/configure-vm.sh b/cluster/gce/configure-vm.sh index 4a6ab822883..9ac82ace2df 100644 --- a/cluster/gce/configure-vm.sh +++ b/cluster/gce/configure-vm.sh @@ -220,9 +220,12 @@ mount-master-pd() { mkdir -p /mnt/master-pd/srv/kubernetes # Contains the cluster's initial config parameters and auth tokens mkdir -p /mnt/master-pd/srv/salt-overlay + # Directory for kube-apiserver to store SSH key (if necessary) + mkdir -p /mnt/master-pd/srv/sshproxy ln -s -f /mnt/master-pd/var/etcd /var/etcd ln -s -f /mnt/master-pd/srv/kubernetes /srv/kubernetes + ln -s -f /mnt/master-pd/srv/sshproxy /srv/sshproxy ln -s -f /mnt/master-pd/srv/salt-overlay /srv/salt-overlay # This is a bit of a hack to get around the fact that salt has to run after the diff --git a/cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest b/cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest index f8a903a1452..069df59a5eb 100644 --- a/cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest +++ b/cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest @@ -25,7 +25,7 @@ {% set proxy_ssh_options = "" -%} {% if grains.proxy_ssh_user is defined -%} - {% set proxy_ssh_options = "--ssh-user=" + grains.proxy_ssh_user + " --ssh-keyfile=/sshproxy/.sshkeyfile" -%} + {% set proxy_ssh_options = "--ssh-user=" + grains.proxy_ssh_user + " --ssh-keyfile=/srv/sshproxy/.sshkeyfile" -%} {% endif -%} {% set address = "--address=127.0.0.1" -%} @@ -143,8 +143,8 @@ { "name": "etcpkitls", "mountPath": "/etc/pki/tls", "readOnly": true}, - { "name": "sshproxy", - "mountPath": "/sshproxy", + { "name": "srvsshproxy", + "mountPath": "/srv/sshproxy", "readOnly": false} ] } @@ -191,8 +191,9 @@ "hostPath": { "path": "/etc/pki/tls"} }, - { "name": "sshproxy", - "emptyDir": {} + { "name": "srvsshproxy", + "hostPath": { + "path": "/srv/sshproxy"} } ] }} diff --git a/pkg/util/ssh.go b/pkg/util/ssh.go index 82285fd74ac..04f265cf938 100644 --- a/pkg/util/ssh.go +++ b/pkg/util/ssh.go @@ -91,7 +91,6 @@ func (s *SSHTunnel) tunnel(conn net.Conn, remoteHost, remotePort string) error { } func (s *SSHTunnel) Close() error { - glog.Infof("Closing tunnel for host: %q", s.Host) if err := s.client.Close(); err != nil { return err } @@ -183,6 +182,9 @@ func (l SSHTunnelList) Open() error { return nil } +// Close asynchronously closes all tunnels in the list after waiting for 1 +// minute. Tunnels will still be open upon this function's return, but should +// no longer be used. func (l SSHTunnelList) Close() { for ix := range l { entry := l[ix] diff --git a/pkg/util/util.go b/pkg/util/util.go index a6e33774caf..17f56187ea4 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -515,16 +515,13 @@ func ShortenString(str string, n int) string { } else { return str[:n] } +} func FileExists(filename string) (bool, error) { - file, err := os.Open(filename) - defer file.Close() - if err != nil { - if os.IsNotExist(err) { - return false, nil - } else { - return false, err - } + if _, err := os.Stat(filename); os.IsNotExist(err) { + return false, nil + } else if err != nil { + return false, err } return true, nil } From 9ab329827a1424482358d72b1a83424ee19fb83b Mon Sep 17 00:00:00 2001 From: CJ Cullen Date: Fri, 5 Jun 2015 15:24:17 -0700 Subject: [PATCH 10/10] Change sshproxy to poll registry for nodes every 10 seconds (reduces window where closed tunnels from scaling down may exist). --- pkg/master/master.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/master/master.go b/pkg/master/master.go index dc9c282e717..925b7129fc7 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -854,7 +854,7 @@ func (m *Master) setupSecureProxy(user, keyfile string) { sleep = time.Second } else { // tunnels could lag behind current set of nodes - sleep = time.Minute + sleep = 10 * time.Second } time.Sleep(sleep) }