Some refactoring. Only selectively use ssh proxy.

Add NetworkName to gce.Config.
Add locking to uses of master.tunnels.
This commit is contained in:
CJ Cullen 2015-06-04 11:58:38 -07:00
parent 1ae8801387
commit cb317604ab
7 changed files with 60 additions and 60 deletions

View File

@ -487,16 +487,18 @@ grains:
cbr-cidr: ${MASTER_IP_RANGE}
cloud: gce
EOF
if ! [[ -z "${PROJECT_ID:-}" ]] && ! [[ -z "${TOKEN_URL:-}" ]]; then
if ! [[ -z "${PROJECT_ID:-}" ]] && ! [[ -z "${TOKEN_URL:-}" ]] && ! [[ -z "${NODE_NETWORK:-}" ]] ; then
cat <<EOF >/etc/gce.conf
[global]
token-url = ${TOKEN_URL}
project-id = ${PROJECT_ID}
network-name = ${NODE_NETWORK}
EOF
EXTERNAL_IP=$(curl --fail --silent -H 'Metadata-Flavor: Google' "http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip")
cat <<EOF >>/etc/salt/minion.d/grains.conf
cloud_config: /etc/gce.conf
advertise_address: '${EXTERNAL_IP}'
proxy_ssh_user: '${INSTANCE_PREFIX}'
EOF
fi
}

View File

@ -23,6 +23,11 @@
{% set advertise_address = "--advertise-address=" + grains.advertise_address -%}
{% endif -%}
{% set proxy_ssh_options = "" -%}
{% if grains.proxy_ssh_user is defined -%}
{% set proxy_ssh_options = "--ssh-user=" + grains.proxy_ssh_user + " --ssh-keyfile=/sshproxy/.sshkeyfile" -%}
{% endif -%}
{% set address = "--address=127.0.0.1" -%}
{% set cluster_name = "" -%}
@ -81,7 +86,7 @@
{% endif -%}
{% set params = address + " " + etcd_servers + " " + cloud_provider + " " + cloud_config + " " + runtime_config + " " + admission_control + " " + service_cluster_ip_range + " " + client_ca_file + " " + basic_auth_file + " " + min_request_timeout -%}
{% set params = params + " " + cluster_name + " " + cert_file + " " + key_file + " --secure_port=" + secure_port + " " + token_auth_file + " " + bind_address + " " + pillar['log_level'] + " " + advertise_address + " --ssh-user=root --ssh-keyfile=/.sshkeyfile"-%}
{% set params = params + " " + cluster_name + " " + cert_file + " " + key_file + " --secure_port=" + secure_port + " " + token_auth_file + " " + bind_address + " " + pillar['log_level'] + " " + advertise_address + " " + proxy_ssh_options -%}
{
"apiVersion": "v1beta3",
@ -137,7 +142,10 @@
"readOnly": true},
{ "name": "etcpkitls",
"mountPath": "/etc/pki/tls",
"readOnly": true}
"readOnly": true},
{ "name": "sshproxy",
"mountPath": "/sshproxy",
"readOnly": false}
]
}
],
@ -182,6 +190,9 @@
{ "name": "etcpkitls",
"hostPath": {
"path": "/etc/pki/tls"}
},
{ "name": "sshproxy",
"emptyDir": {}
}
]
}}

View File

@ -356,11 +356,11 @@ func (s *APIServer) Run(_ []string) error {
}
}
}
var installSSH master.InstallSSHKey
instances, supported := cloud.Instances()
if supported {
installSSH = instances.AddSSHKeyToAllInstances
if cloud != nil {
if instances, supported := cloud.Instances(); supported {
installSSH = instances.AddSSHKeyToAllInstances
}
}
config := &master.Config{
EtcdHelper: helper,

View File

@ -51,7 +51,6 @@ type HTTPKubeletClient struct {
}
func MakeTransport(config *KubeletConfig) (http.RoundTripper, error) {
cfg := &Config{TLSClientConfig: config.TLSClientConfig}
if config.EnableHttps {
hasCA := len(config.CAFile) > 0 || len(config.CAData) > 0
@ -63,17 +62,14 @@ func MakeTransport(config *KubeletConfig) (http.RoundTripper, error) {
if err != nil {
return nil, err
}
var transport http.RoundTripper
if config.Dial != nil || tlsConfig != nil {
transport = &http.Transport{
return &http.Transport{
Dial: config.Dial,
TLSClientConfig: tlsConfig,
}
}, nil
} else {
transport = http.DefaultTransport
return http.DefaultTransport, nil
}
return transport, nil
}
// TODO: this structure is questionable, it should be using client.Config and overriding defaults.

View File

@ -17,7 +17,6 @@ limitations under the License.
package gce_cloud
import (
"bytes"
"fmt"
"io"
"io/ioutil"
@ -55,9 +54,7 @@ type GCECloud struct {
projectID string
zone string
instanceID string
// We assume here that nodes and master are in the same network. TODO(cjcullen) Fix it.
networkName string
networkName string
// Used for accessing the metadata server
metadataAccess func(string) (string, error)
@ -65,8 +62,9 @@ type GCECloud struct {
type Config struct {
Global struct {
TokenURL string `gcfg:"token-url"`
ProjectID string `gcfg:"project-id"`
TokenURL string `gcfg:"token-url"`
ProjectID string `gcfg:"project-id"`
NetworkName string `gcfg:"network-name"`
}
}
@ -155,10 +153,16 @@ func newGCECloud(config io.Reader) (*GCECloud, error) {
if config != nil {
var cfg Config
if err := gcfg.ReadInto(&cfg, config); err != nil {
glog.Errorf("Couldn't read config: %v", err)
return nil, err
}
if cfg.Global.ProjectID != "" && cfg.Global.TokenURL != "" {
if cfg.Global.ProjectID != "" {
projectID = cfg.Global.ProjectID
}
if cfg.Global.NetworkName != "" {
networkName = cfg.Global.NetworkName
}
if cfg.Global.TokenURL != "" {
tokenSource = newAltTokenSource(cfg.Global.TokenURL)
}
}
@ -485,11 +489,11 @@ func (gce *GCECloud) AddSSHKeyToAllInstances(user string, keyData []byte) error
if err != nil {
return err
}
keyData = bytes.TrimSpace(keyData)
keyString := fmt.Sprintf("%s:%s %s@%s", user, strings.TrimSpace(string(keyData)), user, hostname)
found := false
for _, item := range project.CommonInstanceMetadata.Items {
if item.Key == "sshKeys" {
item.Value = fmt.Sprintf("%s\n%s:%s %s@%s", item.Value, user, string(keyData), user, hostname)
item.Value = addKey(item.Value, keyString)
found = true
break
}
@ -500,7 +504,7 @@ func (gce *GCECloud) AddSSHKeyToAllInstances(user string, keyData []byte) error
project.CommonInstanceMetadata.Items = append(project.CommonInstanceMetadata.Items,
&compute.MetadataItems{
Key: "sshKeys",
Value: fmt.Sprint("%s:%s %s@%s", user, string(keyData), user, hostname),
Value: keyString,
})
}
op, err := gce.service.Projects.SetCommonInstanceMetadata(gce.projectID, project.CommonInstanceMetadata).Do()
@ -510,6 +514,14 @@ func (gce *GCECloud) AddSSHKeyToAllInstances(user string, keyData []byte) error
return gce.waitForGlobalOp(op)
}
func addKey(metadataBefore, keyString string) string {
if strings.Contains(metadataBefore, keyString) {
// We've already added this key
return metadataBefore
}
return metadataBefore + "\n" + keyString
}
// NodeAddresses is an implementation of Instances.NodeAddresses.
func (gce *GCECloud) NodeAddresses(_ string) ([]api.NodeAddress, error) {
externalIP, err := gce.metadataAccess(EXTERNAL_IP_METADATA_URL)

View File

@ -28,6 +28,7 @@ import (
rt "runtime"
"strconv"
"strings"
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/admission"
@ -210,6 +211,7 @@ type Master struct {
// Used for secure proxy
tunnels util.SSHTunnelList
tunnelsLock sync.Mutex
installSSHKey InstallSSHKey
}
@ -340,7 +342,7 @@ func New(c *Config) *Master {
serviceReadWriteIP: serviceReadWriteIP,
// TODO: serviceReadWritePort should be passed in as an argument, it may not always be 443
serviceReadWritePort: 443,
installSSHKey: c.InstallSSHKey,
}
@ -764,6 +766,8 @@ func findExternalAddress(node *api.Node) (string, error) {
}
func (m *Master) Dial(net, addr string) (net.Conn, error) {
m.tunnelsLock.Lock()
defer m.tunnelsLock.Unlock()
return m.tunnels.Dial(net, addr)
}
@ -771,6 +775,7 @@ func (m *Master) needToReplaceTunnels(addrs []string) bool {
if len(m.tunnels) != len(addrs) {
return true
}
// TODO (cjcullen): This doesn't need to be n^2
for ix := range addrs {
if !m.tunnels.Has(addrs[ix]) {
return true
@ -797,6 +802,7 @@ func (m *Master) getNodeAddresses() ([]string, error) {
}
func (m *Master) replaceTunnels(user, keyfile string, newAddrs []string) error {
glog.Infof("replacing tunnels. New addrs: %v", newAddrs)
tunnels, err := util.MakeSSHTunnels(user, keyfile, newAddrs)
if err != nil {
return err
@ -810,6 +816,8 @@ func (m *Master) replaceTunnels(user, keyfile string, newAddrs []string) error {
}
func (m *Master) loadTunnels(user, keyfile string) error {
m.tunnelsLock.Lock()
defer m.tunnelsLock.Unlock()
addrs, err := m.getNodeAddresses()
if err != nil {
return err
@ -819,10 +827,13 @@ func (m *Master) loadTunnels(user, keyfile string) error {
}
// TODO: This is going to unnecessarily close connections to unchanged nodes.
// See comment about using Watch above.
glog.Info("found different nodes. Need to replace tunnels")
return m.replaceTunnels(user, keyfile, addrs)
}
func (m *Master) refreshTunnels(user, keyfile string) error {
m.tunnelsLock.Lock()
defer m.tunnelsLock.Unlock()
addrs, err := m.getNodeAddresses()
if err != nil {
return err
@ -842,6 +853,7 @@ func (m *Master) setupSecureProxy(user, keyfile string) {
if len(m.tunnels) == 0 {
sleep = time.Second
} else {
// tunnels could lag behind current set of nodes
sleep = time.Minute
}
time.Sleep(sleep)
@ -851,10 +863,10 @@ func (m *Master) setupSecureProxy(user, keyfile string) {
// TODO: could make this more controller-ish
go func() {
for {
time.Sleep(5 * time.Minute)
if err := m.refreshTunnels(user, keyfile); err != nil {
glog.Errorf("Failed to refresh SSH Tunnels: %v", err)
}
time.Sleep(5 * time.Minute)
}
}()
}
@ -875,6 +887,5 @@ func (m *Master) generateSSHKey(user, keyfile string) error {
if err != nil {
return err
}
fmt.Printf("FOO: %s", data)
return m.installSSHKey(user, data)
}

View File

@ -80,30 +80,6 @@ func (s *SSHTunnel) Dial(network, address string) (net.Conn, error) {
return s.client.Dial(network, address)
}
func (s *SSHTunnel) Listen(remoteHost, localPort, remotePort string) error {
var err error
s.sock, err = net.Listen("tcp", net.JoinHostPort("localhost", localPort))
if err != nil {
return err
}
s.running = true
for s.running {
conn, err := s.sock.Accept()
if err != nil {
if s.running {
glog.Errorf("Error listening for ssh tunnel to %s (%v)", remoteHost, err)
} else {
glog.V(4).Infof("Error listening for ssh tunnel to %s (%v), this is likely due to the tunnel shutting down.", remoteHost, err)
}
continue
}
if err := s.tunnel(conn, remoteHost, remotePort); err != nil {
glog.Errorf("Error starting tunnel: %v", err)
}
}
return nil
}
func (s *SSHTunnel) tunnel(conn net.Conn, remoteHost, remotePort string) error {
tunnel, err := s.client.Dial("tcp", net.JoinHostPort(remoteHost, remotePort))
if err != nil {
@ -114,16 +90,8 @@ func (s *SSHTunnel) tunnel(conn net.Conn, remoteHost, remotePort string) error {
return nil
}
func (s *SSHTunnel) StopListening() error {
// TODO: try to shutdown copying here?
s.running = false
if err := s.sock.Close(); err != nil {
return err
}
return nil
}
func (s *SSHTunnel) Close() error {
glog.Infof("Closing tunnel for host: %q", s.Host)
if err := s.client.Close(); err != nil {
return err
}