diff --git a/cluster/gce/configure-vm.sh b/cluster/gce/configure-vm.sh index fb1b46a3ca6..9ac82ace2df 100644 --- a/cluster/gce/configure-vm.sh +++ b/cluster/gce/configure-vm.sh @@ -220,9 +220,12 @@ mount-master-pd() { mkdir -p /mnt/master-pd/srv/kubernetes # Contains the cluster's initial config parameters and auth tokens mkdir -p /mnt/master-pd/srv/salt-overlay + # Directory for kube-apiserver to store SSH key (if necessary) + mkdir -p /mnt/master-pd/srv/sshproxy ln -s -f /mnt/master-pd/var/etcd /var/etcd ln -s -f /mnt/master-pd/srv/kubernetes /srv/kubernetes + ln -s -f /mnt/master-pd/srv/sshproxy /srv/sshproxy ln -s -f /mnt/master-pd/srv/salt-overlay /srv/salt-overlay # This is a bit of a hack to get around the fact that salt has to run after the @@ -487,16 +490,18 @@ grains: cbr-cidr: ${MASTER_IP_RANGE} cloud: gce EOF - if ! [[ -z "${PROJECT_ID:-}" ]] && ! [[ -z "${TOKEN_URL:-}" ]]; then + if ! [[ -z "${PROJECT_ID:-}" ]] && ! [[ -z "${TOKEN_URL:-}" ]] && ! [[ -z "${NODE_NETWORK:-}" ]] ; then cat </etc/gce.conf [global] token-url = ${TOKEN_URL} project-id = ${PROJECT_ID} +network-name = ${NODE_NETWORK} EOF EXTERNAL_IP=$(curl --fail --silent -H 'Metadata-Flavor: Google' "http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip") cat <>/etc/salt/minion.d/grains.conf cloud_config: /etc/gce.conf advertise_address: '${EXTERNAL_IP}' + proxy_ssh_user: '${INSTANCE_PREFIX}' EOF fi } diff --git a/cluster/kubectl.sh b/cluster/kubectl.sh index e62be51c42b..b62280bfb62 100755 --- a/cluster/kubectl.sh +++ b/cluster/kubectl.sh @@ -102,9 +102,6 @@ kubectl="${KUBECTL_PATH:-${kubectl}}" if [[ "$KUBERNETES_PROVIDER" == "gke" ]]; then detect-project &> /dev/null - config=( - "--context=gke_${PROJECT}_${ZONE}_${CLUSTER_NAME}" - ) elif [[ "$KUBERNETES_PROVIDER" == "ubuntu" || "$KUBERNETES_PROVIDER" == "juju" ]]; then detect-master > /dev/null config=( diff --git a/cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest b/cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest index 7012802e344..069df59a5eb 100644 --- a/cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest +++ b/cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest @@ -5,21 +5,17 @@ {% set cloud_provider = "" -%} {% set cloud_config = "" -%} +{% set cloud_config_mount = "" -%} +{% set cloud_config_volume = "" -%} {% if grains.cloud is defined -%} -{% set cloud_provider = "--cloud_provider=" + grains.cloud -%} + {% set cloud_provider = "--cloud_provider=" + grains.cloud -%} -{% if grains.cloud == 'gce' -%} - {% if grains.cloud_config is defined -%} + {% if grains.cloud in [ 'aws', 'gce' ] and grains.cloud_config is defined -%} {% set cloud_config = "--cloud_config=" + grains.cloud_config -%} + {% set cloud_config_mount = "{\"name\": \"cloudconfigmount\",\"mountPath\": \"" + grains.cloud_config + "\", \"readOnly\": true}," -%} + {% set cloud_config_volume = "{\"name\": \"cloudconfigmount\",\"hostPath\": {\"path\": \"" + grains.cloud_config + "\"}}," -%} {% endif -%} - -{% elif grains.cloud == 'aws' -%} - {% if grains.cloud_config is defined -%} - {% set cloud_config = "--cloud_config=" + grains.cloud_config -%} - {% endif -%} -{% endif -%} - {% endif -%} {% set advertise_address = "" -%} @@ -27,6 +23,11 @@ {% set advertise_address = "--advertise-address=" + grains.advertise_address -%} {% endif -%} +{% set proxy_ssh_options = "" -%} +{% if grains.proxy_ssh_user is defined -%} + {% set proxy_ssh_options = "--ssh-user=" + grains.proxy_ssh_user + " --ssh-keyfile=/srv/sshproxy/.sshkeyfile" -%} +{% endif -%} + {% set address = "--address=127.0.0.1" -%} {% set cluster_name = "" -%} @@ -85,7 +86,7 @@ {% endif -%} {% set params = address + " " + etcd_servers + " " + cloud_provider + " " + cloud_config + " " + runtime_config + " " + admission_control + " " + service_cluster_ip_range + " " + client_ca_file + " " + basic_auth_file + " " + min_request_timeout -%} -{% set params = params + " " + cluster_name + " " + cert_file + " " + key_file + " --secure_port=" + secure_port + " " + token_auth_file + " " + bind_address + " " + pillar['log_level'] + " " + advertise_address -%} +{% set params = params + " " + cluster_name + " " + cert_file + " " + key_file + " --secure_port=" + secure_port + " " + token_auth_file + " " + bind_address + " " + pillar['log_level'] + " " + advertise_address + " " + proxy_ssh_options -%} { "apiVersion": "v1beta3", @@ -111,6 +112,7 @@ "hostPort": 8080} ], "volumeMounts": [ + {{cloud_config_mount}} { "name": "srvkube", "mountPath": "/srv/kubernetes", "readOnly": true}, @@ -140,11 +142,15 @@ "readOnly": true}, { "name": "etcpkitls", "mountPath": "/etc/pki/tls", - "readOnly": true} + "readOnly": true}, + { "name": "srvsshproxy", + "mountPath": "/srv/sshproxy", + "readOnly": false} ] } ], "volumes":[ + {{cloud_config_volume}} { "name": "srvkube", "hostPath": { "path": "/srv/kubernetes"} @@ -184,6 +190,10 @@ { "name": "etcpkitls", "hostPath": { "path": "/etc/pki/tls"} + }, + { "name": "srvsshproxy", + "hostPath": { + "path": "/srv/sshproxy"} } ] }} diff --git a/cluster/saltbase/salt/kube-controller-manager/kube-controller-manager.manifest b/cluster/saltbase/salt/kube-controller-manager/kube-controller-manager.manifest index a446187bb99..b5e1736d521 100644 --- a/cluster/saltbase/salt/kube-controller-manager/kube-controller-manager.manifest +++ b/cluster/saltbase/salt/kube-controller-manager/kube-controller-manager.manifest @@ -14,6 +14,8 @@ {% set cloud_provider = "" -%} {% set cloud_config = "" -%} +{% set cloud_config_mount = "" -%} +{% set cloud_config_volume = "" -%} {% if grains.cloud is defined -%} {% set cloud_provider = "--cloud_provider=" + grains.cloud -%} @@ -21,6 +23,8 @@ {% if grains.cloud in [ 'aws', 'gce' ] and grains.cloud_config is defined -%} {% set cloud_config = "--cloud_config=" + grains.cloud_config -%} + {% set cloud_config_mount = "{\"name\": \"cloudconfigmount\",\"mountPath\": \"" + grains.cloud_config + "\", \"readOnly\": true}," -%} + {% set cloud_config_volume = "{\"name\": \"cloudconfigmount\",\"hostPath\": {\"path\": \"" + grains.cloud_config + "\"}}," -%} {% endif -%} {% endif -%} @@ -42,6 +46,7 @@ "/usr/local/bin/kube-controller-manager {{params}} 1>>/var/log/kube-controller-manager.log 2>&1" ], "volumeMounts": [ + {{cloud_config_mount}} { "name": "srvkube", "mountPath": "/srv/kubernetes", "readOnly": true}, @@ -76,6 +81,7 @@ } ], "volumes":[ + {{cloud_config_volume}} { "name": "srvkube", "hostPath": { "path": "/srv/kubernetes"} diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 30dc1305920..b8ecab263d0 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -98,6 +98,8 @@ type APIServer struct { MaxRequestsInFlight int MinRequestTimeout int LongRunningRequestRE string + SSHUser string + SSHKeyfile string } // NewAPIServer creates a new APIServer object with default parameters @@ -202,6 +204,8 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) { fs.IntVar(&s.MaxRequestsInFlight, "max-requests-inflight", 400, "The maximum number of requests in flight at a given time. When the server exceeds this, it rejects requests. Zero for no limit.") fs.IntVar(&s.MinRequestTimeout, "min-request-timeout", 1800, "An optional field indicating the minimum number of seconds a handler must keep a request open before timing it out. Currently only honored by the watch request handler, which picks a randomized value above this number as the connection timeout, to spread out load.") fs.StringVar(&s.LongRunningRequestRE, "long-running-request-regexp", "[.*\\/watch$][^\\/proxy.*]", "A regular expression matching long running requests which should be excluded from maximum inflight request handling.") + fs.StringVar(&s.SSHUser, "ssh-user", "", "If non-empty, use secure SSH proxy to the nodes, using this user name") + fs.StringVar(&s.SSHKeyfile, "ssh-keyfile", "", "If non-empty, use secure SSH proxy to the nodes, using this user keyfile") } // TODO: Longer term we should read this from some config store, rather than a flag. @@ -353,7 +357,12 @@ func (s *APIServer) Run(_ []string) error { } } } - + var installSSH master.InstallSSHKey + if cloud != nil { + if instances, supported := cloud.Instances(); supported { + installSSH = instances.AddSSHKeyToAllInstances + } + } config := &master.Config{ EtcdHelper: helper, EventTTL: s.EventTTL, @@ -379,6 +388,9 @@ func (s *APIServer) Run(_ []string) error { ClusterName: s.ClusterName, ExternalHost: s.ExternalHost, MinRequestTimeout: s.MinRequestTimeout, + SSHUser: s.SSHUser, + SSHKeyfile: s.SSHKeyfile, + InstallSSHKey: installSSH, } m := master.New(config) diff --git a/pkg/apiserver/api_installer.go b/pkg/apiserver/api_installer.go index 2fdcaba9073..7c0cc173df5 100644 --- a/pkg/apiserver/api_installer.go +++ b/pkg/apiserver/api_installer.go @@ -18,6 +18,7 @@ package apiserver import ( "fmt" + "net" "net/http" "net/url" gpath "path" @@ -55,14 +56,14 @@ type action struct { var errEmptyName = errors.NewBadRequest("name must be provided") // Installs handlers for API resources. -func (a *APIInstaller) Install() (ws *restful.WebService, errors []error) { +func (a *APIInstaller) Install(proxyDialer func(network, addr string) (net.Conn, error)) (ws *restful.WebService, errors []error) { errors = make([]error, 0) // Create the WebService. ws = a.newWebService() redirectHandler := (&RedirectHandler{a.group.Storage, a.group.Codec, a.group.Context, a.info}) - proxyHandler := (&ProxyHandler{a.prefix + "/proxy/", a.group.Storage, a.group.Codec, a.group.Context, a.info}) + proxyHandler := (&ProxyHandler{a.prefix + "/proxy/", a.group.Storage, a.group.Codec, a.group.Context, a.info, proxyDialer}) // Register the paths in a deterministic (sorted) order to get a deterministic swagger spec. paths := make([]string, len(a.group.Storage)) diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index addf2d447c5..3d7b440b3c3 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "io/ioutil" + "net" "net/http" "path" "strconv" @@ -149,7 +150,7 @@ type RestContainer struct { // InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container. // It is expected that the provided path root prefix will serve all operations. Root MUST NOT end // in a slash. A restful WebService is created for the group and version. -func (g *APIGroupVersion) InstallREST(container *RestContainer) error { +func (g *APIGroupVersion) InstallREST(container *RestContainer, proxyDialer func(network, addr string) (net.Conn, error)) error { info := &APIRequestInfoResolver{util.NewStringSet(strings.TrimPrefix(g.Root, "/")), g.Mapper} prefix := path.Join(g.Root, g.Version) @@ -159,7 +160,7 @@ func (g *APIGroupVersion) InstallREST(container *RestContainer) error { prefix: prefix, minRequestTimeout: container.MinRequestTimeout, } - ws, registrationErrors := installer.Install() + ws, registrationErrors := installer.Install(proxyDialer) container.Add(ws) return errors.NewAggregate(registrationErrors) } diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index fdefac6615d..3bab81bbac8 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -231,7 +231,7 @@ func handleInternal(legacy bool, storage map[string]rest.Storage, admissionContr container := restful.NewContainer() container.Router(restful.CurlyRouter{}) mux := container.ServeMux - if err := group.InstallREST(&RestContainer{container, 0}); err != nil { + if err := group.InstallREST(&RestContainer{container, 0}, nil); err != nil { panic(fmt.Sprintf("unable to install container %s: %v", group.Version, err)) } ws := new(restful.WebService) @@ -1901,7 +1901,7 @@ func TestParentResourceIsRequired(t *testing.T) { Codec: newCodec, } container := restful.NewContainer() - if err := group.InstallREST(&RestContainer{container, 0}); err == nil { + if err := group.InstallREST(&RestContainer{container, 0}, nil); err == nil { t.Fatal("expected error") } @@ -1929,7 +1929,7 @@ func TestParentResourceIsRequired(t *testing.T) { Codec: newCodec, } container = restful.NewContainer() - if err := group.InstallREST(&RestContainer{container, 0}); err != nil { + if err := group.InstallREST(&RestContainer{container, 0}, nil); err != nil { t.Fatal(err) } diff --git a/pkg/apiserver/proxy.go b/pkg/apiserver/proxy.go index 9dab573dc82..aa1967bceee 100644 --- a/pkg/apiserver/proxy.go +++ b/pkg/apiserver/proxy.go @@ -49,6 +49,8 @@ type ProxyHandler struct { codec runtime.Codec context api.RequestContextMapper apiRequestInfoResolver *APIRequestInfoResolver + + dial func(network, addr string) (net.Conn, error) } func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { @@ -119,6 +121,10 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { httpCode = http.StatusNotFound return } + // If we have a custom dialer, and no pre-existing transport, initialize it to use the dialer. + if transport == nil && r.dial != nil { + transport = &http.Transport{Dial: r.dial} + } // Default to http if location.Scheme == "" { diff --git a/pkg/client/helper.go b/pkg/client/helper.go index 8f58ab722a4..8834526a834 100644 --- a/pkg/client/helper.go +++ b/pkg/client/helper.go @@ -102,6 +102,9 @@ type KubeletConfig struct { // HTTPTimeout is used by the client to timeout http requests to Kubelet. HTTPTimeout time.Duration + + // Dial is a custom dialer used for the client + Dial func(net, addr string) (net.Conn, error) } // TLSClientConfig contains settings to enable transport layer security diff --git a/pkg/client/kubelet.go b/pkg/client/kubelet.go index f456f071127..4f000da923c 100644 --- a/pkg/client/kubelet.go +++ b/pkg/client/kubelet.go @@ -45,14 +45,12 @@ type ConnectionInfoGetter interface { // HTTPKubeletClient is the default implementation of KubeletHealthchecker, accesses the kubelet over HTTP. type HTTPKubeletClient struct { Client *http.Client + Config *KubeletConfig Port uint EnableHttps bool } -// TODO: this structure is questionable, it should be using client.Config and overriding defaults. -func NewKubeletClient(config *KubeletConfig) (KubeletClient, error) { - transport := http.DefaultTransport - +func MakeTransport(config *KubeletConfig) (http.RoundTripper, error) { cfg := &Config{TLSClientConfig: config.TLSClientConfig} if config.EnableHttps { hasCA := len(config.CAFile) > 0 || len(config.CAData) > 0 @@ -64,18 +62,29 @@ func NewKubeletClient(config *KubeletConfig) (KubeletClient, error) { if err != nil { return nil, err } - if tlsConfig != nil { - transport = &http.Transport{ + if config.Dial != nil || tlsConfig != nil { + return &http.Transport{ + Dial: config.Dial, TLSClientConfig: tlsConfig, - } + }, nil + } else { + return http.DefaultTransport, nil } +} +// TODO: this structure is questionable, it should be using client.Config and overriding defaults. +func NewKubeletClient(config *KubeletConfig) (KubeletClient, error) { + transport, err := MakeTransport(config) + if err != nil { + return nil, err + } c := &http.Client{ Transport: transport, Timeout: config.HTTPTimeout, } return &HTTPKubeletClient{ Client: c, + Config: config, Port: config.Port, EnableHttps: config.EnableHttps, }, nil diff --git a/pkg/cloudprovider/aws/aws.go b/pkg/cloudprovider/aws/aws.go index 6262f016a4e..70a77e8cc92 100644 --- a/pkg/cloudprovider/aws/aws.go +++ b/pkg/cloudprovider/aws/aws.go @@ -230,6 +230,10 @@ func newEc2Filter(name string, value string) *ec2.Filter { return filter } +func (self *AWSCloud) AddSSHKeyToAllInstances(user string, keyData []byte) error { + return errors.New("unimplemented") +} + // Implementation of EC2.Instances func (self *awsSdkEC2) DescribeInstances(request *ec2.DescribeInstancesInput) ([]*ec2.Instance, error) { // Instances are paged diff --git a/pkg/cloudprovider/cloud.go b/pkg/cloudprovider/cloud.go index 13fdaf4bf30..e5d371ffeb3 100644 --- a/pkg/cloudprovider/cloud.go +++ b/pkg/cloudprovider/cloud.go @@ -108,6 +108,9 @@ type Instances interface { List(filter string) ([]string, error) // GetNodeResources gets the resources for a particular node GetNodeResources(name string) (*api.NodeResources, error) + // AddSSHKeyToAllInstances adds an SSH public key as a legal identity for all instances + // expected format for the key is standard ssh-keygen format: + AddSSHKeyToAllInstances(user string, keyData []byte) error } // Route is a representation of an advanced routing rule. diff --git a/pkg/cloudprovider/fake/fake.go b/pkg/cloudprovider/fake/fake.go index 45ea2468b8a..ed81d523899 100644 --- a/pkg/cloudprovider/fake/fake.go +++ b/pkg/cloudprovider/fake/fake.go @@ -17,6 +17,7 @@ limitations under the License. package fake_cloud import ( + "errors" "fmt" "net" "regexp" @@ -144,6 +145,10 @@ func (f *FakeCloud) EnsureTCPLoadBalancerDeleted(name, region string) error { return f.Err } +func (f *FakeCloud) AddSSHKeyToAllInstances(user string, keyData []byte) error { + return errors.New("unimplemented") +} + // NodeAddresses is a test-spy implementation of Instances.NodeAddresses. // It adds an entry "node-addresses" into the internal method call record. func (f *FakeCloud) NodeAddresses(instance string) ([]api.NodeAddress, error) { diff --git a/pkg/cloudprovider/gce/gce.go b/pkg/cloudprovider/gce/gce.go index 50ac1fbcc2c..96d78daf0e1 100644 --- a/pkg/cloudprovider/gce/gce.go +++ b/pkg/cloudprovider/gce/gce.go @@ -22,6 +22,7 @@ import ( "io/ioutil" "net" "net/http" + "os" "path" "strconv" "strings" @@ -53,9 +54,7 @@ type GCECloud struct { projectID string zone string instanceID string - - // We assume here that nodes and master are in the same network. TODO(cjcullen) Fix it. - networkName string + networkName string // Used for accessing the metadata server metadataAccess func(string) (string, error) @@ -63,8 +62,9 @@ type GCECloud struct { type Config struct { Global struct { - TokenURL string `gcfg:"token-url"` - ProjectID string `gcfg:"project-id"` + TokenURL string `gcfg:"token-url"` + ProjectID string `gcfg:"project-id"` + NetworkName string `gcfg:"network-name"` } } @@ -153,10 +153,16 @@ func newGCECloud(config io.Reader) (*GCECloud, error) { if config != nil { var cfg Config if err := gcfg.ReadInto(&cfg, config); err != nil { + glog.Errorf("Couldn't read config: %v", err) return nil, err } - if cfg.Global.ProjectID != "" && cfg.Global.TokenURL != "" { + if cfg.Global.ProjectID != "" { projectID = cfg.Global.ProjectID + } + if cfg.Global.NetworkName != "" { + networkName = cfg.Global.NetworkName + } + if cfg.Global.TokenURL != "" { tokenSource = newAltTokenSource(cfg.Global.TokenURL) } } @@ -474,6 +480,48 @@ func (gce *GCECloud) getInstanceByName(name string) (*compute.Instance, error) { return res, nil } +func (gce *GCECloud) AddSSHKeyToAllInstances(user string, keyData []byte) error { + project, err := gce.service.Projects.Get(gce.projectID).Do() + if err != nil { + return err + } + hostname, err := os.Hostname() + if err != nil { + return err + } + keyString := fmt.Sprintf("%s:%s %s@%s", user, strings.TrimSpace(string(keyData)), user, hostname) + found := false + for _, item := range project.CommonInstanceMetadata.Items { + if item.Key == "sshKeys" { + item.Value = addKey(item.Value, keyString) + found = true + break + } + } + if !found { + // This is super unlikely, so log. + glog.Infof("Failed to find sshKeys metadata, creating a new item") + project.CommonInstanceMetadata.Items = append(project.CommonInstanceMetadata.Items, + &compute.MetadataItems{ + Key: "sshKeys", + Value: keyString, + }) + } + op, err := gce.service.Projects.SetCommonInstanceMetadata(gce.projectID, project.CommonInstanceMetadata).Do() + if err != nil { + return err + } + return gce.waitForGlobalOp(op) +} + +func addKey(metadataBefore, keyString string) string { + if strings.Contains(metadataBefore, keyString) { + // We've already added this key + return metadataBefore + } + return metadataBefore + "\n" + keyString +} + // NodeAddresses is an implementation of Instances.NodeAddresses. func (gce *GCECloud) NodeAddresses(_ string) ([]api.NodeAddress, error) { externalIP, err := gce.metadataAccess(EXTERNAL_IP_METADATA_URL) diff --git a/pkg/cloudprovider/mesos/mesos.go b/pkg/cloudprovider/mesos/mesos.go index cacc3b40b17..600c9050ac3 100644 --- a/pkg/cloudprovider/mesos/mesos.go +++ b/pkg/cloudprovider/mesos/mesos.go @@ -78,6 +78,10 @@ func newMesosCloud(configReader io.Reader) (*MesosCloud, error) { } } +func (c *MesosCloud) AddSSHKeyToAllInstances(user string, keyData []byte) error { + return errors.New("unimplemented") +} + // Instances returns a copy of the Mesos cloud Instances implementation. // Mesos natively provides minimal cloud-type resources. More robust cloud // support requires a combination of Mesos and cloud-specific knowledge. diff --git a/pkg/cloudprovider/openstack/openstack.go b/pkg/cloudprovider/openstack/openstack.go index 38520b28e7e..c4349ebf09d 100644 --- a/pkg/cloudprovider/openstack/openstack.go +++ b/pkg/cloudprovider/openstack/openstack.go @@ -317,6 +317,10 @@ func getAddressByName(api *gophercloud.ServiceClient, name string) (string, erro return s, nil } +func (i *Instances) AddSSHKeyToAllInstances(user string, keyData []byte) error { + return errors.New("unimplemented") +} + func (i *Instances) NodeAddresses(name string) ([]api.NodeAddress, error) { glog.V(4).Infof("NodeAddresses(%v) called", name) diff --git a/pkg/cloudprovider/ovirt/ovirt.go b/pkg/cloudprovider/ovirt/ovirt.go index 6c78202df1d..abcfff64539 100644 --- a/pkg/cloudprovider/ovirt/ovirt.go +++ b/pkg/cloudprovider/ovirt/ovirt.go @@ -18,6 +18,7 @@ package ovirt_cloud import ( "encoding/xml" + "errors" "fmt" "io" "io/ioutil" @@ -273,3 +274,7 @@ func (v *OVirtCloud) List(filter string) ([]string, error) { func (v *OVirtCloud) GetNodeResources(name string) (*api.NodeResources, error) { return nil, nil } + +func (v *OVirtCloud) AddSSHKeyToAllInstances(user string, keyData []byte) error { + return errors.New("unimplemented") +} diff --git a/pkg/cloudprovider/rackspace/rackspace.go b/pkg/cloudprovider/rackspace/rackspace.go index 8df8230da65..38f2c25ebaf 100644 --- a/pkg/cloudprovider/rackspace/rackspace.go +++ b/pkg/cloudprovider/rackspace/rackspace.go @@ -376,6 +376,10 @@ func (i *Instances) InstanceID(name string) (string, error) { return "", nil } +func (i *Instances) AddSSHKeyToAllInstances(user string, keyData []byte) error { + return errors.New("unimplemented") +} + func (i *Instances) GetNodeResources(name string) (*api.NodeResources, error) { glog.V(2).Infof("GetNodeResources(%v) called", name) diff --git a/pkg/cloudprovider/vagrant/vagrant.go b/pkg/cloudprovider/vagrant/vagrant.go index 2bd18150d17..e4e89ed453e 100644 --- a/pkg/cloudprovider/vagrant/vagrant.go +++ b/pkg/cloudprovider/vagrant/vagrant.go @@ -131,6 +131,10 @@ func (v *VagrantCloud) getInstanceByAddress(address string) (*SaltMinion, error) return nil, fmt.Errorf("unable to find instance for address: %s", address) } +func (v *VagrantCloud) AddSSHKeyToAllInstances(user string, keyData []byte) error { + return errors.New("unimplemented") +} + // NodeAddresses returns the NodeAddresses of a particular machine instance. func (v *VagrantCloud) NodeAddresses(instance string) ([]api.NodeAddress, error) { // Due to vagrant not running with a dedicated DNS setup, we return the IP address of a minion as its hostname at this time diff --git a/pkg/master/master.go b/pkg/master/master.go index 868832ba722..925b7129fc7 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -18,7 +18,9 @@ package master import ( "bytes" + "errors" "fmt" + "io/ioutil" "net" "net/http" "net/http/pprof" @@ -26,6 +28,7 @@ import ( rt "runtime" "strconv" "strings" + "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/admission" @@ -39,6 +42,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/auth/authorizer" "github.com/GoogleCloudPlatform/kubernetes/pkg/auth/handlers" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/componentstatus" controlleretcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller/etcd" @@ -143,8 +148,15 @@ type Config struct { // The range of ports to be assigned to services with type=NodePort or greater ServiceNodePortRange util.PortRange + + // Used for secure proxy. If empty, don't use secure proxy. + SSHUser string + SSHKeyfile string + InstallSSHKey InstallSSHKey } +type InstallSSHKey func(user string, data []byte) error + // Master contains state for a Kubernetes cluster master/api server. type Master struct { // "Inputs", Copied from Config @@ -196,6 +208,11 @@ type Master struct { // "Outputs" Handler http.Handler InsecureHandler http.Handler + + // Used for secure proxy + tunnels util.SSHTunnelList + tunnelsLock sync.Mutex + installSSHKey InstallSSHKey } // NewEtcdHelper returns an EtcdHelper for the provided arguments or an error if the version @@ -325,6 +342,8 @@ func New(c *Config) *Master { serviceReadWriteIP: serviceReadWriteIP, // TODO: serviceReadWritePort should be passed in as an argument, it may not always be 443 serviceReadWritePort: 443, + + installSSHKey: c.InstallSSHKey, } var handlerContainer *restful.Container @@ -474,15 +493,47 @@ func (m *Master) init(c *Config) { "componentStatuses": componentstatus.NewStorage(func() map[string]apiserver.Server { return m.getServersToValidate(c) }), } + var proxyDialer func(net, addr string) (net.Conn, error) + if len(c.SSHUser) > 0 { + glog.Infof("Setting up proxy: %s %s", c.SSHUser, c.SSHKeyfile) + exists, err := util.FileExists(c.SSHKeyfile) + if err != nil { + glog.Errorf("Error detecting if key exists: %v", err) + } else if !exists { + glog.Infof("Key doesn't exist, attempting to create") + err := m.generateSSHKey(c.SSHUser, c.SSHKeyfile) + if err != nil { + glog.Errorf("Failed to create key pair: %v", err) + } + } + m.setupSecureProxy(c.SSHUser, c.SSHKeyfile) + proxyDialer = m.Dial + + // This is pretty ugly. A better solution would be to pull this all the way up into the + // server.go file. + httpKubeletClient, ok := c.KubeletClient.(*client.HTTPKubeletClient) + if ok { + httpKubeletClient.Config.Dial = m.Dial + transport, err := client.MakeTransport(httpKubeletClient.Config) + if err != nil { + glog.Errorf("Error setting up transport over SSH: %v", err) + } else { + httpKubeletClient.Client.Transport = transport + } + } else { + glog.Errorf("Failed to cast %v to HTTPKubeletClient, skipping SSH tunnel.") + } + } + apiVersions := []string{} if m.v1beta3 { - if err := m.api_v1beta3().InstallREST(m.handlerContainer); err != nil { + if err := m.api_v1beta3().InstallREST(m.handlerContainer, proxyDialer); err != nil { glog.Fatalf("Unable to setup API v1beta3: %v", err) } apiVersions = append(apiVersions, "v1beta3") } if m.v1 { - if err := m.api_v1().InstallREST(m.handlerContainer); err != nil { + if err := m.api_v1().InstallREST(m.handlerContainer, proxyDialer); err != nil { glog.Fatalf("Unable to setup API v1: %v", err) } apiVersions = append(apiVersions, "v1") @@ -703,3 +754,138 @@ func (m *Master) api_v1() *apiserver.APIGroupVersion { version.Codec = v1.Codec return version } + +func findExternalAddress(node *api.Node) (string, error) { + for ix := range node.Status.Addresses { + addr := &node.Status.Addresses[ix] + if addr.Type == api.NodeExternalIP { + return addr.Address, nil + } + } + return "", fmt.Errorf("Couldn't find external address: %v", node) +} + +func (m *Master) Dial(net, addr string) (net.Conn, error) { + m.tunnelsLock.Lock() + defer m.tunnelsLock.Unlock() + return m.tunnels.Dial(net, addr) +} + +func (m *Master) needToReplaceTunnels(addrs []string) bool { + if len(m.tunnels) != len(addrs) { + return true + } + // TODO (cjcullen): This doesn't need to be n^2 + for ix := range addrs { + if !m.tunnels.Has(addrs[ix]) { + return true + } + } + return false +} + +func (m *Master) getNodeAddresses() ([]string, error) { + nodes, err := m.nodeRegistry.ListMinions(api.NewDefaultContext(), labels.Everything(), fields.Everything()) + if err != nil { + return nil, err + } + addrs := []string{} + for ix := range nodes.Items { + node := &nodes.Items[ix] + addr, err := findExternalAddress(node) + if err != nil { + return nil, err + } + addrs = append(addrs, addr) + } + return addrs, nil +} + +func (m *Master) replaceTunnels(user, keyfile string, newAddrs []string) error { + glog.Infof("replacing tunnels. New addrs: %v", newAddrs) + tunnels, err := util.MakeSSHTunnels(user, keyfile, newAddrs) + if err != nil { + return err + } + tunnels.Open() + if m.tunnels != nil { + m.tunnels.Close() + } + m.tunnels = tunnels + return nil +} + +func (m *Master) loadTunnels(user, keyfile string) error { + m.tunnelsLock.Lock() + defer m.tunnelsLock.Unlock() + addrs, err := m.getNodeAddresses() + if err != nil { + return err + } + if !m.needToReplaceTunnels(addrs) { + return nil + } + // TODO: This is going to unnecessarily close connections to unchanged nodes. + // See comment about using Watch above. + glog.Info("found different nodes. Need to replace tunnels") + return m.replaceTunnels(user, keyfile, addrs) +} + +func (m *Master) refreshTunnels(user, keyfile string) error { + m.tunnelsLock.Lock() + defer m.tunnelsLock.Unlock() + addrs, err := m.getNodeAddresses() + if err != nil { + return err + } + return m.replaceTunnels(user, keyfile, addrs) +} + +func (m *Master) setupSecureProxy(user, keyfile string) { + // Sync loop for tunnels + // TODO: switch this to watch. + go func() { + for { + if err := m.loadTunnels(user, keyfile); err != nil { + glog.Errorf("Failed to load SSH Tunnels: %v", err) + } + var sleep time.Duration + if len(m.tunnels) == 0 { + sleep = time.Second + } else { + // tunnels could lag behind current set of nodes + sleep = 10 * time.Second + } + time.Sleep(sleep) + } + }() + // Refresh loop for tunnels + // TODO: could make this more controller-ish + go func() { + for { + time.Sleep(5 * time.Minute) + if err := m.refreshTunnels(user, keyfile); err != nil { + glog.Errorf("Failed to refresh SSH Tunnels: %v", err) + } + } + }() +} + +func (m *Master) generateSSHKey(user, keyfile string) error { + if m.installSSHKey == nil { + return errors.New("ssh install function is null") + } + + private, public, err := util.GenerateKey(2048) + if err != nil { + return err + } + if err := ioutil.WriteFile(keyfile, util.EncodePrivateKey(private), 0600); err != nil { + return err + } + data, err := util.EncodeSSHKey(public) + if err != nil { + return err + } + return m.installSSHKey(user, data) +} diff --git a/pkg/util/ssh.go b/pkg/util/ssh.go index 7760761b339..04f265cf938 100644 --- a/pkg/util/ssh.go +++ b/pkg/util/ssh.go @@ -18,11 +18,17 @@ package util import ( "bytes" + "crypto/rand" + "crypto/rsa" + "crypto/x509" + "encoding/pem" "fmt" "io" "io/ioutil" + mathrand "math/rand" "net" "os" + "time" "github.com/golang/glog" "golang.org/x/crypto/ssh" @@ -31,15 +37,12 @@ import ( // TODO: Unit tests for this code, we can spin up a test SSH server with instructions here: // https://godoc.org/golang.org/x/crypto/ssh#ServerConn type SSHTunnel struct { - Config *ssh.ClientConfig - Host string - SSHPort int - LocalPort int - RemoteHost string - RemotePort int - running bool - sock net.Listener - client *ssh.Client + Config *ssh.ClientConfig + Host string + SSHPort string + running bool + sock net.Listener + client *ssh.Client } func (s *SSHTunnel) copyBytes(out io.Writer, in io.Reader) { @@ -48,7 +51,7 @@ func (s *SSHTunnel) copyBytes(out io.Writer, in io.Reader) { } } -func NewSSHTunnel(user, keyfile, host, remoteHost string, localPort, remotePort int) (*SSHTunnel, error) { +func NewSSHTunnel(user, keyfile, host string) (*SSHTunnel, error) { signer, err := MakePrivateKeySigner(keyfile) if err != nil { return nil, err @@ -58,44 +61,27 @@ func NewSSHTunnel(user, keyfile, host, remoteHost string, localPort, remotePort Auth: []ssh.AuthMethod{ssh.PublicKeys(signer)}, } return &SSHTunnel{ - Config: &config, - Host: host, - SSHPort: 22, - LocalPort: localPort, - RemotePort: remotePort, - RemoteHost: remoteHost, + Config: &config, + Host: host, + SSHPort: "22", }, nil } func (s *SSHTunnel) Open() error { var err error - s.client, err = ssh.Dial("tcp", fmt.Sprintf("%s:%d", s.Host, s.SSHPort), s.Config) + s.client, err = ssh.Dial("tcp", net.JoinHostPort(s.Host, s.SSHPort), s.Config) if err != nil { return err } - s.sock, err = net.Listen("tcp", fmt.Sprintf("localhost:%d", s.LocalPort)) - if err != nil { - return err - } - s.running = true return nil } -func (s *SSHTunnel) Listen() { - for s.running { - conn, err := s.sock.Accept() - if err != nil { - glog.Errorf("Error listening for ssh tunnel to %s (%v)", s.RemoteHost, err) - continue - } - if err := s.tunnel(conn); err != nil { - glog.Errorf("Error starting tunnel: %v", err) - } - } +func (s *SSHTunnel) Dial(network, address string) (net.Conn, error) { + return s.client.Dial(network, address) } -func (s *SSHTunnel) tunnel(conn net.Conn) error { - tunnel, err := s.client.Dial("tcp", fmt.Sprintf("%s:%d", s.RemoteHost, s.RemotePort)) +func (s *SSHTunnel) tunnel(conn net.Conn, remoteHost, remotePort string) error { + tunnel, err := s.client.Dial("tcp", net.JoinHostPort(remoteHost, remotePort)) if err != nil { return err } @@ -105,12 +91,6 @@ func (s *SSHTunnel) tunnel(conn net.Conn) error { } func (s *SSHTunnel) Close() error { - // TODO: try to shutdown copying here? - s.running = false - // TODO: Aggregate errors and keep going? - if err := s.sock.Close(); err != nil { - return err - } if err := s.client.Close(); err != nil { return err } @@ -172,3 +152,98 @@ func MakePrivateKeySigner(key string) (ssh.Signer, error) { } return signer, nil } + +type SSHTunnelEntry struct { + Address string + Tunnel *SSHTunnel +} + +type SSHTunnelList []SSHTunnelEntry + +func MakeSSHTunnels(user, keyfile string, addresses []string) (SSHTunnelList, error) { + tunnels := []SSHTunnelEntry{} + for ix := range addresses { + addr := addresses[ix] + tunnel, err := NewSSHTunnel(user, keyfile, addr) + if err != nil { + return nil, err + } + tunnels = append(tunnels, SSHTunnelEntry{addr, tunnel}) + } + return tunnels, nil +} + +func (l SSHTunnelList) Open() error { + for ix := range l { + if err := l[ix].Tunnel.Open(); err != nil { + return err + } + } + return nil +} + +// Close asynchronously closes all tunnels in the list after waiting for 1 +// minute. Tunnels will still be open upon this function's return, but should +// no longer be used. +func (l SSHTunnelList) Close() { + for ix := range l { + entry := l[ix] + go func() { + time.Sleep(1 * time.Minute) + if err := entry.Tunnel.Close(); err != nil { + glog.Errorf("Failed to close tunnel %v: %v", entry, err) + } + }() + } +} + +func (l SSHTunnelList) Dial(network, addr string) (net.Conn, error) { + if len(l) == 0 { + return nil, fmt.Errorf("Empty tunnel list.") + } + return l[mathrand.Int()%len(l)].Tunnel.Dial(network, addr) +} + +func (l SSHTunnelList) Has(addr string) bool { + for ix := range l { + if l[ix].Address == addr { + return true + } + } + return false +} + +func EncodePrivateKey(private *rsa.PrivateKey) []byte { + return pem.EncodeToMemory(&pem.Block{ + Bytes: x509.MarshalPKCS1PrivateKey(private), + Type: "RSA PRIVATE KEY", + }) +} + +func EncodePublicKey(public *rsa.PublicKey) ([]byte, error) { + publicBytes, err := x509.MarshalPKIXPublicKey(public) + if err != nil { + return nil, err + } + + return pem.EncodeToMemory(&pem.Block{ + Bytes: publicBytes, + Type: "PUBLIC KEY", + }), nil +} + +func EncodeSSHKey(public *rsa.PublicKey) ([]byte, error) { + publicKey, err := ssh.NewPublicKey(public) + if err != nil { + return nil, err + } + return ssh.MarshalAuthorizedKey(publicKey), nil +} + +func GenerateKey(bits int) (*rsa.PrivateKey, *rsa.PublicKey, error) { + private, err := rsa.GenerateKey(rand.Reader, bits) + if err != nil { + return nil, nil, err + } + return private, &private.PublicKey, nil +} diff --git a/pkg/util/util.go b/pkg/util/util.go index 7ba9c78f3dd..17f56187ea4 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -516,3 +516,12 @@ func ShortenString(str string, n int) string { return str[:n] } } + +func FileExists(filename string) (bool, error) { + if _, err := os.Stat(filename); os.IsNotExist(err) { + return false, nil + } else if err != nil { + return false, err + } + return true, nil +}