diff --git a/cluster/gce/configure-vm.sh b/cluster/gce/configure-vm.sh index fb1b46a3ca6..4a6ab822883 100644 --- a/cluster/gce/configure-vm.sh +++ b/cluster/gce/configure-vm.sh @@ -487,16 +487,18 @@ grains: cbr-cidr: ${MASTER_IP_RANGE} cloud: gce EOF - if ! [[ -z "${PROJECT_ID:-}" ]] && ! [[ -z "${TOKEN_URL:-}" ]]; then + if ! [[ -z "${PROJECT_ID:-}" ]] && ! [[ -z "${TOKEN_URL:-}" ]] && ! [[ -z "${NODE_NETWORK:-}" ]] ; then cat </etc/gce.conf [global] token-url = ${TOKEN_URL} project-id = ${PROJECT_ID} +network-name = ${NODE_NETWORK} EOF EXTERNAL_IP=$(curl --fail --silent -H 'Metadata-Flavor: Google' "http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip") cat <>/etc/salt/minion.d/grains.conf cloud_config: /etc/gce.conf advertise_address: '${EXTERNAL_IP}' + proxy_ssh_user: '${INSTANCE_PREFIX}' EOF fi } diff --git a/cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest b/cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest index fe7ffb2c3d7..f8a903a1452 100644 --- a/cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest +++ b/cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest @@ -23,6 +23,11 @@ {% set advertise_address = "--advertise-address=" + grains.advertise_address -%} {% endif -%} +{% set proxy_ssh_options = "" -%} +{% if grains.proxy_ssh_user is defined -%} + {% set proxy_ssh_options = "--ssh-user=" + grains.proxy_ssh_user + " --ssh-keyfile=/sshproxy/.sshkeyfile" -%} +{% endif -%} + {% set address = "--address=127.0.0.1" -%} {% set cluster_name = "" -%} @@ -81,7 +86,7 @@ {% endif -%} {% set params = address + " " + etcd_servers + " " + cloud_provider + " " + cloud_config + " " + runtime_config + " " + admission_control + " " + service_cluster_ip_range + " " + client_ca_file + " " + basic_auth_file + " " + min_request_timeout -%} -{% set params = params + " " + cluster_name + " " + cert_file + " " + key_file + " --secure_port=" + secure_port + " " + token_auth_file + " " + bind_address + " " + pillar['log_level'] + " " + advertise_address + " --ssh-user=root --ssh-keyfile=/.sshkeyfile"-%} +{% set params = params + " " + cluster_name + " " + cert_file + " " + key_file + " --secure_port=" + secure_port + " " + token_auth_file + " " + bind_address + " " + pillar['log_level'] + " " + advertise_address + " " + proxy_ssh_options -%} { "apiVersion": "v1beta3", @@ -137,7 +142,10 @@ "readOnly": true}, { "name": "etcpkitls", "mountPath": "/etc/pki/tls", - "readOnly": true} + "readOnly": true}, + { "name": "sshproxy", + "mountPath": "/sshproxy", + "readOnly": false} ] } ], @@ -182,6 +190,9 @@ { "name": "etcpkitls", "hostPath": { "path": "/etc/pki/tls"} + }, + { "name": "sshproxy", + "emptyDir": {} } ] }} diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 111c6c6cb77..702db16ce0b 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -356,11 +356,11 @@ func (s *APIServer) Run(_ []string) error { } } } - var installSSH master.InstallSSHKey - instances, supported := cloud.Instances() - if supported { - installSSH = instances.AddSSHKeyToAllInstances + if cloud != nil { + if instances, supported := cloud.Instances(); supported { + installSSH = instances.AddSSHKeyToAllInstances + } } config := &master.Config{ EtcdHelper: helper, diff --git a/pkg/client/kubelet.go b/pkg/client/kubelet.go index 468a25961bc..4f000da923c 100644 --- a/pkg/client/kubelet.go +++ b/pkg/client/kubelet.go @@ -51,7 +51,6 @@ type HTTPKubeletClient struct { } func MakeTransport(config *KubeletConfig) (http.RoundTripper, error) { - cfg := &Config{TLSClientConfig: config.TLSClientConfig} if config.EnableHttps { hasCA := len(config.CAFile) > 0 || len(config.CAData) > 0 @@ -63,17 +62,14 @@ func MakeTransport(config *KubeletConfig) (http.RoundTripper, error) { if err != nil { return nil, err } - - var transport http.RoundTripper if config.Dial != nil || tlsConfig != nil { - transport = &http.Transport{ + return &http.Transport{ Dial: config.Dial, TLSClientConfig: tlsConfig, - } + }, nil } else { - transport = http.DefaultTransport + return http.DefaultTransport, nil } - return transport, nil } // TODO: this structure is questionable, it should be using client.Config and overriding defaults. diff --git a/pkg/cloudprovider/gce/gce.go b/pkg/cloudprovider/gce/gce.go index 4f951406402..96d78daf0e1 100644 --- a/pkg/cloudprovider/gce/gce.go +++ b/pkg/cloudprovider/gce/gce.go @@ -17,7 +17,6 @@ limitations under the License. package gce_cloud import ( - "bytes" "fmt" "io" "io/ioutil" @@ -55,9 +54,7 @@ type GCECloud struct { projectID string zone string instanceID string - - // We assume here that nodes and master are in the same network. TODO(cjcullen) Fix it. - networkName string + networkName string // Used for accessing the metadata server metadataAccess func(string) (string, error) @@ -65,8 +62,9 @@ type GCECloud struct { type Config struct { Global struct { - TokenURL string `gcfg:"token-url"` - ProjectID string `gcfg:"project-id"` + TokenURL string `gcfg:"token-url"` + ProjectID string `gcfg:"project-id"` + NetworkName string `gcfg:"network-name"` } } @@ -155,10 +153,16 @@ func newGCECloud(config io.Reader) (*GCECloud, error) { if config != nil { var cfg Config if err := gcfg.ReadInto(&cfg, config); err != nil { + glog.Errorf("Couldn't read config: %v", err) return nil, err } - if cfg.Global.ProjectID != "" && cfg.Global.TokenURL != "" { + if cfg.Global.ProjectID != "" { projectID = cfg.Global.ProjectID + } + if cfg.Global.NetworkName != "" { + networkName = cfg.Global.NetworkName + } + if cfg.Global.TokenURL != "" { tokenSource = newAltTokenSource(cfg.Global.TokenURL) } } @@ -485,11 +489,11 @@ func (gce *GCECloud) AddSSHKeyToAllInstances(user string, keyData []byte) error if err != nil { return err } - keyData = bytes.TrimSpace(keyData) + keyString := fmt.Sprintf("%s:%s %s@%s", user, strings.TrimSpace(string(keyData)), user, hostname) found := false for _, item := range project.CommonInstanceMetadata.Items { if item.Key == "sshKeys" { - item.Value = fmt.Sprintf("%s\n%s:%s %s@%s", item.Value, user, string(keyData), user, hostname) + item.Value = addKey(item.Value, keyString) found = true break } @@ -500,7 +504,7 @@ func (gce *GCECloud) AddSSHKeyToAllInstances(user string, keyData []byte) error project.CommonInstanceMetadata.Items = append(project.CommonInstanceMetadata.Items, &compute.MetadataItems{ Key: "sshKeys", - Value: fmt.Sprint("%s:%s %s@%s", user, string(keyData), user, hostname), + Value: keyString, }) } op, err := gce.service.Projects.SetCommonInstanceMetadata(gce.projectID, project.CommonInstanceMetadata).Do() @@ -510,6 +514,14 @@ func (gce *GCECloud) AddSSHKeyToAllInstances(user string, keyData []byte) error return gce.waitForGlobalOp(op) } +func addKey(metadataBefore, keyString string) string { + if strings.Contains(metadataBefore, keyString) { + // We've already added this key + return metadataBefore + } + return metadataBefore + "\n" + keyString +} + // NodeAddresses is an implementation of Instances.NodeAddresses. func (gce *GCECloud) NodeAddresses(_ string) ([]api.NodeAddress, error) { externalIP, err := gce.metadataAccess(EXTERNAL_IP_METADATA_URL) diff --git a/pkg/master/master.go b/pkg/master/master.go index 2929c05232b..dc9c282e717 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -28,6 +28,7 @@ import ( rt "runtime" "strconv" "strings" + "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/admission" @@ -210,6 +211,7 @@ type Master struct { // Used for secure proxy tunnels util.SSHTunnelList + tunnelsLock sync.Mutex installSSHKey InstallSSHKey } @@ -340,7 +342,7 @@ func New(c *Config) *Master { serviceReadWriteIP: serviceReadWriteIP, // TODO: serviceReadWritePort should be passed in as an argument, it may not always be 443 serviceReadWritePort: 443, - + installSSHKey: c.InstallSSHKey, } @@ -764,6 +766,8 @@ func findExternalAddress(node *api.Node) (string, error) { } func (m *Master) Dial(net, addr string) (net.Conn, error) { + m.tunnelsLock.Lock() + defer m.tunnelsLock.Unlock() return m.tunnels.Dial(net, addr) } @@ -771,6 +775,7 @@ func (m *Master) needToReplaceTunnels(addrs []string) bool { if len(m.tunnels) != len(addrs) { return true } + // TODO (cjcullen): This doesn't need to be n^2 for ix := range addrs { if !m.tunnels.Has(addrs[ix]) { return true @@ -797,6 +802,7 @@ func (m *Master) getNodeAddresses() ([]string, error) { } func (m *Master) replaceTunnels(user, keyfile string, newAddrs []string) error { + glog.Infof("replacing tunnels. New addrs: %v", newAddrs) tunnels, err := util.MakeSSHTunnels(user, keyfile, newAddrs) if err != nil { return err @@ -810,6 +816,8 @@ func (m *Master) replaceTunnels(user, keyfile string, newAddrs []string) error { } func (m *Master) loadTunnels(user, keyfile string) error { + m.tunnelsLock.Lock() + defer m.tunnelsLock.Unlock() addrs, err := m.getNodeAddresses() if err != nil { return err @@ -819,10 +827,13 @@ func (m *Master) loadTunnels(user, keyfile string) error { } // TODO: This is going to unnecessarily close connections to unchanged nodes. // See comment about using Watch above. + glog.Info("found different nodes. Need to replace tunnels") return m.replaceTunnels(user, keyfile, addrs) } func (m *Master) refreshTunnels(user, keyfile string) error { + m.tunnelsLock.Lock() + defer m.tunnelsLock.Unlock() addrs, err := m.getNodeAddresses() if err != nil { return err @@ -842,6 +853,7 @@ func (m *Master) setupSecureProxy(user, keyfile string) { if len(m.tunnels) == 0 { sleep = time.Second } else { + // tunnels could lag behind current set of nodes sleep = time.Minute } time.Sleep(sleep) @@ -851,10 +863,10 @@ func (m *Master) setupSecureProxy(user, keyfile string) { // TODO: could make this more controller-ish go func() { for { + time.Sleep(5 * time.Minute) if err := m.refreshTunnels(user, keyfile); err != nil { glog.Errorf("Failed to refresh SSH Tunnels: %v", err) } - time.Sleep(5 * time.Minute) } }() } @@ -875,6 +887,5 @@ func (m *Master) generateSSHKey(user, keyfile string) error { if err != nil { return err } - fmt.Printf("FOO: %s", data) return m.installSSHKey(user, data) } diff --git a/pkg/util/ssh.go b/pkg/util/ssh.go index 28ba337fc5c..82285fd74ac 100644 --- a/pkg/util/ssh.go +++ b/pkg/util/ssh.go @@ -80,30 +80,6 @@ func (s *SSHTunnel) Dial(network, address string) (net.Conn, error) { return s.client.Dial(network, address) } -func (s *SSHTunnel) Listen(remoteHost, localPort, remotePort string) error { - var err error - s.sock, err = net.Listen("tcp", net.JoinHostPort("localhost", localPort)) - if err != nil { - return err - } - s.running = true - for s.running { - conn, err := s.sock.Accept() - if err != nil { - if s.running { - glog.Errorf("Error listening for ssh tunnel to %s (%v)", remoteHost, err) - } else { - glog.V(4).Infof("Error listening for ssh tunnel to %s (%v), this is likely due to the tunnel shutting down.", remoteHost, err) - } - continue - } - if err := s.tunnel(conn, remoteHost, remotePort); err != nil { - glog.Errorf("Error starting tunnel: %v", err) - } - } - return nil -} - func (s *SSHTunnel) tunnel(conn net.Conn, remoteHost, remotePort string) error { tunnel, err := s.client.Dial("tcp", net.JoinHostPort(remoteHost, remotePort)) if err != nil { @@ -114,16 +90,8 @@ func (s *SSHTunnel) tunnel(conn net.Conn, remoteHost, remotePort string) error { return nil } -func (s *SSHTunnel) StopListening() error { - // TODO: try to shutdown copying here? - s.running = false - if err := s.sock.Close(); err != nil { - return err - } - return nil -} - func (s *SSHTunnel) Close() error { + glog.Infof("Closing tunnel for host: %q", s.Host) if err := s.client.Close(); err != nil { return err }