Merge pull request #9292 from cjcullen/test_pull_8946

Add an ssh tunnel option to the /proxy endpoint
This commit is contained in:
krousey 2015-06-08 14:30:12 -07:00
commit 5aa0219ada
23 changed files with 481 additions and 80 deletions

View File

@ -220,9 +220,12 @@ mount-master-pd() {
mkdir -p /mnt/master-pd/srv/kubernetes
# Contains the cluster's initial config parameters and auth tokens
mkdir -p /mnt/master-pd/srv/salt-overlay
# Directory for kube-apiserver to store SSH key (if necessary)
mkdir -p /mnt/master-pd/srv/sshproxy
ln -s -f /mnt/master-pd/var/etcd /var/etcd
ln -s -f /mnt/master-pd/srv/kubernetes /srv/kubernetes
ln -s -f /mnt/master-pd/srv/sshproxy /srv/sshproxy
ln -s -f /mnt/master-pd/srv/salt-overlay /srv/salt-overlay
# This is a bit of a hack to get around the fact that salt has to run after the
@ -487,16 +490,18 @@ grains:
cbr-cidr: ${MASTER_IP_RANGE}
cloud: gce
EOF
if ! [[ -z "${PROJECT_ID:-}" ]] && ! [[ -z "${TOKEN_URL:-}" ]]; then
if ! [[ -z "${PROJECT_ID:-}" ]] && ! [[ -z "${TOKEN_URL:-}" ]] && ! [[ -z "${NODE_NETWORK:-}" ]] ; then
cat <<EOF >/etc/gce.conf
[global]
token-url = ${TOKEN_URL}
project-id = ${PROJECT_ID}
network-name = ${NODE_NETWORK}
EOF
EXTERNAL_IP=$(curl --fail --silent -H 'Metadata-Flavor: Google' "http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip")
cat <<EOF >>/etc/salt/minion.d/grains.conf
cloud_config: /etc/gce.conf
advertise_address: '${EXTERNAL_IP}'
proxy_ssh_user: '${INSTANCE_PREFIX}'
EOF
fi
}

View File

@ -102,9 +102,6 @@ kubectl="${KUBECTL_PATH:-${kubectl}}"
if [[ "$KUBERNETES_PROVIDER" == "gke" ]]; then
detect-project &> /dev/null
config=(
"--context=gke_${PROJECT}_${ZONE}_${CLUSTER_NAME}"
)
elif [[ "$KUBERNETES_PROVIDER" == "ubuntu" || "$KUBERNETES_PROVIDER" == "juju" ]]; then
detect-master > /dev/null
config=(

View File

@ -5,21 +5,17 @@
{% set cloud_provider = "" -%}
{% set cloud_config = "" -%}
{% set cloud_config_mount = "" -%}
{% set cloud_config_volume = "" -%}
{% if grains.cloud is defined -%}
{% set cloud_provider = "--cloud_provider=" + grains.cloud -%}
{% set cloud_provider = "--cloud_provider=" + grains.cloud -%}
{% if grains.cloud == 'gce' -%}
{% if grains.cloud_config is defined -%}
{% if grains.cloud in [ 'aws', 'gce' ] and grains.cloud_config is defined -%}
{% set cloud_config = "--cloud_config=" + grains.cloud_config -%}
{% set cloud_config_mount = "{\"name\": \"cloudconfigmount\",\"mountPath\": \"" + grains.cloud_config + "\", \"readOnly\": true}," -%}
{% set cloud_config_volume = "{\"name\": \"cloudconfigmount\",\"hostPath\": {\"path\": \"" + grains.cloud_config + "\"}}," -%}
{% endif -%}
{% elif grains.cloud == 'aws' -%}
{% if grains.cloud_config is defined -%}
{% set cloud_config = "--cloud_config=" + grains.cloud_config -%}
{% endif -%}
{% endif -%}
{% endif -%}
{% set advertise_address = "" -%}
@ -27,6 +23,11 @@
{% set advertise_address = "--advertise-address=" + grains.advertise_address -%}
{% endif -%}
{% set proxy_ssh_options = "" -%}
{% if grains.proxy_ssh_user is defined -%}
{% set proxy_ssh_options = "--ssh-user=" + grains.proxy_ssh_user + " --ssh-keyfile=/srv/sshproxy/.sshkeyfile" -%}
{% endif -%}
{% set address = "--address=127.0.0.1" -%}
{% set cluster_name = "" -%}
@ -85,7 +86,7 @@
{% endif -%}
{% set params = address + " " + etcd_servers + " " + cloud_provider + " " + cloud_config + " " + runtime_config + " " + admission_control + " " + service_cluster_ip_range + " " + client_ca_file + " " + basic_auth_file + " " + min_request_timeout -%}
{% set params = params + " " + cluster_name + " " + cert_file + " " + key_file + " --secure_port=" + secure_port + " " + token_auth_file + " " + bind_address + " " + pillar['log_level'] + " " + advertise_address -%}
{% set params = params + " " + cluster_name + " " + cert_file + " " + key_file + " --secure_port=" + secure_port + " " + token_auth_file + " " + bind_address + " " + pillar['log_level'] + " " + advertise_address + " " + proxy_ssh_options -%}
{
"apiVersion": "v1beta3",
@ -111,6 +112,7 @@
"hostPort": 8080}
],
"volumeMounts": [
{{cloud_config_mount}}
{ "name": "srvkube",
"mountPath": "/srv/kubernetes",
"readOnly": true},
@ -140,11 +142,15 @@
"readOnly": true},
{ "name": "etcpkitls",
"mountPath": "/etc/pki/tls",
"readOnly": true}
"readOnly": true},
{ "name": "srvsshproxy",
"mountPath": "/srv/sshproxy",
"readOnly": false}
]
}
],
"volumes":[
{{cloud_config_volume}}
{ "name": "srvkube",
"hostPath": {
"path": "/srv/kubernetes"}
@ -184,6 +190,10 @@
{ "name": "etcpkitls",
"hostPath": {
"path": "/etc/pki/tls"}
},
{ "name": "srvsshproxy",
"hostPath": {
"path": "/srv/sshproxy"}
}
]
}}

View File

@ -14,6 +14,8 @@
{% set cloud_provider = "" -%}
{% set cloud_config = "" -%}
{% set cloud_config_mount = "" -%}
{% set cloud_config_volume = "" -%}
{% if grains.cloud is defined -%}
{% set cloud_provider = "--cloud_provider=" + grains.cloud -%}
@ -21,6 +23,8 @@
{% if grains.cloud in [ 'aws', 'gce' ] and grains.cloud_config is defined -%}
{% set cloud_config = "--cloud_config=" + grains.cloud_config -%}
{% set cloud_config_mount = "{\"name\": \"cloudconfigmount\",\"mountPath\": \"" + grains.cloud_config + "\", \"readOnly\": true}," -%}
{% set cloud_config_volume = "{\"name\": \"cloudconfigmount\",\"hostPath\": {\"path\": \"" + grains.cloud_config + "\"}}," -%}
{% endif -%}
{% endif -%}
@ -42,6 +46,7 @@
"/usr/local/bin/kube-controller-manager {{params}} 1>>/var/log/kube-controller-manager.log 2>&1"
],
"volumeMounts": [
{{cloud_config_mount}}
{ "name": "srvkube",
"mountPath": "/srv/kubernetes",
"readOnly": true},
@ -76,6 +81,7 @@
}
],
"volumes":[
{{cloud_config_volume}}
{ "name": "srvkube",
"hostPath": {
"path": "/srv/kubernetes"}

View File

@ -98,6 +98,8 @@ type APIServer struct {
MaxRequestsInFlight int
MinRequestTimeout int
LongRunningRequestRE string
SSHUser string
SSHKeyfile string
}
// NewAPIServer creates a new APIServer object with default parameters
@ -202,6 +204,8 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&s.MaxRequestsInFlight, "max-requests-inflight", 400, "The maximum number of requests in flight at a given time. When the server exceeds this, it rejects requests. Zero for no limit.")
fs.IntVar(&s.MinRequestTimeout, "min-request-timeout", 1800, "An optional field indicating the minimum number of seconds a handler must keep a request open before timing it out. Currently only honored by the watch request handler, which picks a randomized value above this number as the connection timeout, to spread out load.")
fs.StringVar(&s.LongRunningRequestRE, "long-running-request-regexp", "[.*\\/watch$][^\\/proxy.*]", "A regular expression matching long running requests which should be excluded from maximum inflight request handling.")
fs.StringVar(&s.SSHUser, "ssh-user", "", "If non-empty, use secure SSH proxy to the nodes, using this user name")
fs.StringVar(&s.SSHKeyfile, "ssh-keyfile", "", "If non-empty, use secure SSH proxy to the nodes, using this user keyfile")
}
// TODO: Longer term we should read this from some config store, rather than a flag.
@ -353,7 +357,12 @@ func (s *APIServer) Run(_ []string) error {
}
}
}
var installSSH master.InstallSSHKey
if cloud != nil {
if instances, supported := cloud.Instances(); supported {
installSSH = instances.AddSSHKeyToAllInstances
}
}
config := &master.Config{
EtcdHelper: helper,
EventTTL: s.EventTTL,
@ -379,6 +388,9 @@ func (s *APIServer) Run(_ []string) error {
ClusterName: s.ClusterName,
ExternalHost: s.ExternalHost,
MinRequestTimeout: s.MinRequestTimeout,
SSHUser: s.SSHUser,
SSHKeyfile: s.SSHKeyfile,
InstallSSHKey: installSSH,
}
m := master.New(config)

View File

@ -18,6 +18,7 @@ package apiserver
import (
"fmt"
"net"
"net/http"
"net/url"
gpath "path"
@ -55,14 +56,14 @@ type action struct {
var errEmptyName = errors.NewBadRequest("name must be provided")
// Installs handlers for API resources.
func (a *APIInstaller) Install() (ws *restful.WebService, errors []error) {
func (a *APIInstaller) Install(proxyDialer func(network, addr string) (net.Conn, error)) (ws *restful.WebService, errors []error) {
errors = make([]error, 0)
// Create the WebService.
ws = a.newWebService()
redirectHandler := (&RedirectHandler{a.group.Storage, a.group.Codec, a.group.Context, a.info})
proxyHandler := (&ProxyHandler{a.prefix + "/proxy/", a.group.Storage, a.group.Codec, a.group.Context, a.info})
proxyHandler := (&ProxyHandler{a.prefix + "/proxy/", a.group.Storage, a.group.Codec, a.group.Context, a.info, proxyDialer})
// Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
paths := make([]string, len(a.group.Storage))

View File

@ -22,6 +22,7 @@ import (
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"path"
"strconv"
@ -149,7 +150,7 @@ type RestContainer struct {
// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
// It is expected that the provided path root prefix will serve all operations. Root MUST NOT end
// in a slash. A restful WebService is created for the group and version.
func (g *APIGroupVersion) InstallREST(container *RestContainer) error {
func (g *APIGroupVersion) InstallREST(container *RestContainer, proxyDialer func(network, addr string) (net.Conn, error)) error {
info := &APIRequestInfoResolver{util.NewStringSet(strings.TrimPrefix(g.Root, "/")), g.Mapper}
prefix := path.Join(g.Root, g.Version)
@ -159,7 +160,7 @@ func (g *APIGroupVersion) InstallREST(container *RestContainer) error {
prefix: prefix,
minRequestTimeout: container.MinRequestTimeout,
}
ws, registrationErrors := installer.Install()
ws, registrationErrors := installer.Install(proxyDialer)
container.Add(ws)
return errors.NewAggregate(registrationErrors)
}

View File

@ -231,7 +231,7 @@ func handleInternal(legacy bool, storage map[string]rest.Storage, admissionContr
container := restful.NewContainer()
container.Router(restful.CurlyRouter{})
mux := container.ServeMux
if err := group.InstallREST(&RestContainer{container, 0}); err != nil {
if err := group.InstallREST(&RestContainer{container, 0}, nil); err != nil {
panic(fmt.Sprintf("unable to install container %s: %v", group.Version, err))
}
ws := new(restful.WebService)
@ -1901,7 +1901,7 @@ func TestParentResourceIsRequired(t *testing.T) {
Codec: newCodec,
}
container := restful.NewContainer()
if err := group.InstallREST(&RestContainer{container, 0}); err == nil {
if err := group.InstallREST(&RestContainer{container, 0}, nil); err == nil {
t.Fatal("expected error")
}
@ -1929,7 +1929,7 @@ func TestParentResourceIsRequired(t *testing.T) {
Codec: newCodec,
}
container = restful.NewContainer()
if err := group.InstallREST(&RestContainer{container, 0}); err != nil {
if err := group.InstallREST(&RestContainer{container, 0}, nil); err != nil {
t.Fatal(err)
}

View File

@ -49,6 +49,8 @@ type ProxyHandler struct {
codec runtime.Codec
context api.RequestContextMapper
apiRequestInfoResolver *APIRequestInfoResolver
dial func(network, addr string) (net.Conn, error)
}
func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
@ -119,6 +121,10 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
httpCode = http.StatusNotFound
return
}
// If we have a custom dialer, and no pre-existing transport, initialize it to use the dialer.
if transport == nil && r.dial != nil {
transport = &http.Transport{Dial: r.dial}
}
// Default to http
if location.Scheme == "" {

View File

@ -102,6 +102,9 @@ type KubeletConfig struct {
// HTTPTimeout is used by the client to timeout http requests to Kubelet.
HTTPTimeout time.Duration
// Dial is a custom dialer used for the client
Dial func(net, addr string) (net.Conn, error)
}
// TLSClientConfig contains settings to enable transport layer security

View File

@ -45,14 +45,12 @@ type ConnectionInfoGetter interface {
// HTTPKubeletClient is the default implementation of KubeletHealthchecker, accesses the kubelet over HTTP.
type HTTPKubeletClient struct {
Client *http.Client
Config *KubeletConfig
Port uint
EnableHttps bool
}
// TODO: this structure is questionable, it should be using client.Config and overriding defaults.
func NewKubeletClient(config *KubeletConfig) (KubeletClient, error) {
transport := http.DefaultTransport
func MakeTransport(config *KubeletConfig) (http.RoundTripper, error) {
cfg := &Config{TLSClientConfig: config.TLSClientConfig}
if config.EnableHttps {
hasCA := len(config.CAFile) > 0 || len(config.CAData) > 0
@ -64,18 +62,29 @@ func NewKubeletClient(config *KubeletConfig) (KubeletClient, error) {
if err != nil {
return nil, err
}
if tlsConfig != nil {
transport = &http.Transport{
if config.Dial != nil || tlsConfig != nil {
return &http.Transport{
Dial: config.Dial,
TLSClientConfig: tlsConfig,
}
}, nil
} else {
return http.DefaultTransport, nil
}
}
// TODO: this structure is questionable, it should be using client.Config and overriding defaults.
func NewKubeletClient(config *KubeletConfig) (KubeletClient, error) {
transport, err := MakeTransport(config)
if err != nil {
return nil, err
}
c := &http.Client{
Transport: transport,
Timeout: config.HTTPTimeout,
}
return &HTTPKubeletClient{
Client: c,
Config: config,
Port: config.Port,
EnableHttps: config.EnableHttps,
}, nil

View File

@ -230,6 +230,10 @@ func newEc2Filter(name string, value string) *ec2.Filter {
return filter
}
func (self *AWSCloud) AddSSHKeyToAllInstances(user string, keyData []byte) error {
return errors.New("unimplemented")
}
// Implementation of EC2.Instances
func (self *awsSdkEC2) DescribeInstances(request *ec2.DescribeInstancesInput) ([]*ec2.Instance, error) {
// Instances are paged

View File

@ -108,6 +108,9 @@ type Instances interface {
List(filter string) ([]string, error)
// GetNodeResources gets the resources for a particular node
GetNodeResources(name string) (*api.NodeResources, error)
// AddSSHKeyToAllInstances adds an SSH public key as a legal identity for all instances
// expected format for the key is standard ssh-keygen format: <protocol> <blob>
AddSSHKeyToAllInstances(user string, keyData []byte) error
}
// Route is a representation of an advanced routing rule.

View File

@ -17,6 +17,7 @@ limitations under the License.
package fake_cloud
import (
"errors"
"fmt"
"net"
"regexp"
@ -144,6 +145,10 @@ func (f *FakeCloud) EnsureTCPLoadBalancerDeleted(name, region string) error {
return f.Err
}
func (f *FakeCloud) AddSSHKeyToAllInstances(user string, keyData []byte) error {
return errors.New("unimplemented")
}
// NodeAddresses is a test-spy implementation of Instances.NodeAddresses.
// It adds an entry "node-addresses" into the internal method call record.
func (f *FakeCloud) NodeAddresses(instance string) ([]api.NodeAddress, error) {

View File

@ -22,6 +22,7 @@ import (
"io/ioutil"
"net"
"net/http"
"os"
"path"
"strconv"
"strings"
@ -53,9 +54,7 @@ type GCECloud struct {
projectID string
zone string
instanceID string
// We assume here that nodes and master are in the same network. TODO(cjcullen) Fix it.
networkName string
networkName string
// Used for accessing the metadata server
metadataAccess func(string) (string, error)
@ -63,8 +62,9 @@ type GCECloud struct {
type Config struct {
Global struct {
TokenURL string `gcfg:"token-url"`
ProjectID string `gcfg:"project-id"`
TokenURL string `gcfg:"token-url"`
ProjectID string `gcfg:"project-id"`
NetworkName string `gcfg:"network-name"`
}
}
@ -153,10 +153,16 @@ func newGCECloud(config io.Reader) (*GCECloud, error) {
if config != nil {
var cfg Config
if err := gcfg.ReadInto(&cfg, config); err != nil {
glog.Errorf("Couldn't read config: %v", err)
return nil, err
}
if cfg.Global.ProjectID != "" && cfg.Global.TokenURL != "" {
if cfg.Global.ProjectID != "" {
projectID = cfg.Global.ProjectID
}
if cfg.Global.NetworkName != "" {
networkName = cfg.Global.NetworkName
}
if cfg.Global.TokenURL != "" {
tokenSource = newAltTokenSource(cfg.Global.TokenURL)
}
}
@ -474,6 +480,48 @@ func (gce *GCECloud) getInstanceByName(name string) (*compute.Instance, error) {
return res, nil
}
func (gce *GCECloud) AddSSHKeyToAllInstances(user string, keyData []byte) error {
project, err := gce.service.Projects.Get(gce.projectID).Do()
if err != nil {
return err
}
hostname, err := os.Hostname()
if err != nil {
return err
}
keyString := fmt.Sprintf("%s:%s %s@%s", user, strings.TrimSpace(string(keyData)), user, hostname)
found := false
for _, item := range project.CommonInstanceMetadata.Items {
if item.Key == "sshKeys" {
item.Value = addKey(item.Value, keyString)
found = true
break
}
}
if !found {
// This is super unlikely, so log.
glog.Infof("Failed to find sshKeys metadata, creating a new item")
project.CommonInstanceMetadata.Items = append(project.CommonInstanceMetadata.Items,
&compute.MetadataItems{
Key: "sshKeys",
Value: keyString,
})
}
op, err := gce.service.Projects.SetCommonInstanceMetadata(gce.projectID, project.CommonInstanceMetadata).Do()
if err != nil {
return err
}
return gce.waitForGlobalOp(op)
}
func addKey(metadataBefore, keyString string) string {
if strings.Contains(metadataBefore, keyString) {
// We've already added this key
return metadataBefore
}
return metadataBefore + "\n" + keyString
}
// NodeAddresses is an implementation of Instances.NodeAddresses.
func (gce *GCECloud) NodeAddresses(_ string) ([]api.NodeAddress, error) {
externalIP, err := gce.metadataAccess(EXTERNAL_IP_METADATA_URL)

View File

@ -78,6 +78,10 @@ func newMesosCloud(configReader io.Reader) (*MesosCloud, error) {
}
}
func (c *MesosCloud) AddSSHKeyToAllInstances(user string, keyData []byte) error {
return errors.New("unimplemented")
}
// Instances returns a copy of the Mesos cloud Instances implementation.
// Mesos natively provides minimal cloud-type resources. More robust cloud
// support requires a combination of Mesos and cloud-specific knowledge.

View File

@ -317,6 +317,10 @@ func getAddressByName(api *gophercloud.ServiceClient, name string) (string, erro
return s, nil
}
func (i *Instances) AddSSHKeyToAllInstances(user string, keyData []byte) error {
return errors.New("unimplemented")
}
func (i *Instances) NodeAddresses(name string) ([]api.NodeAddress, error) {
glog.V(4).Infof("NodeAddresses(%v) called", name)

View File

@ -18,6 +18,7 @@ package ovirt_cloud
import (
"encoding/xml"
"errors"
"fmt"
"io"
"io/ioutil"
@ -273,3 +274,7 @@ func (v *OVirtCloud) List(filter string) ([]string, error) {
func (v *OVirtCloud) GetNodeResources(name string) (*api.NodeResources, error) {
return nil, nil
}
func (v *OVirtCloud) AddSSHKeyToAllInstances(user string, keyData []byte) error {
return errors.New("unimplemented")
}

View File

@ -376,6 +376,10 @@ func (i *Instances) InstanceID(name string) (string, error) {
return "", nil
}
func (i *Instances) AddSSHKeyToAllInstances(user string, keyData []byte) error {
return errors.New("unimplemented")
}
func (i *Instances) GetNodeResources(name string) (*api.NodeResources, error) {
glog.V(2).Infof("GetNodeResources(%v) called", name)

View File

@ -131,6 +131,10 @@ func (v *VagrantCloud) getInstanceByAddress(address string) (*SaltMinion, error)
return nil, fmt.Errorf("unable to find instance for address: %s", address)
}
func (v *VagrantCloud) AddSSHKeyToAllInstances(user string, keyData []byte) error {
return errors.New("unimplemented")
}
// NodeAddresses returns the NodeAddresses of a particular machine instance.
func (v *VagrantCloud) NodeAddresses(instance string) ([]api.NodeAddress, error) {
// Due to vagrant not running with a dedicated DNS setup, we return the IP address of a minion as its hostname at this time

View File

@ -18,7 +18,9 @@ package master
import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/http/pprof"
@ -26,6 +28,7 @@ import (
rt "runtime"
"strconv"
"strings"
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/admission"
@ -39,6 +42,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/auth/authorizer"
"github.com/GoogleCloudPlatform/kubernetes/pkg/auth/handlers"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/componentstatus"
controlleretcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller/etcd"
@ -143,8 +148,15 @@ type Config struct {
// The range of ports to be assigned to services with type=NodePort or greater
ServiceNodePortRange util.PortRange
// Used for secure proxy. If empty, don't use secure proxy.
SSHUser string
SSHKeyfile string
InstallSSHKey InstallSSHKey
}
type InstallSSHKey func(user string, data []byte) error
// Master contains state for a Kubernetes cluster master/api server.
type Master struct {
// "Inputs", Copied from Config
@ -196,6 +208,11 @@ type Master struct {
// "Outputs"
Handler http.Handler
InsecureHandler http.Handler
// Used for secure proxy
tunnels util.SSHTunnelList
tunnelsLock sync.Mutex
installSSHKey InstallSSHKey
}
// NewEtcdHelper returns an EtcdHelper for the provided arguments or an error if the version
@ -325,6 +342,8 @@ func New(c *Config) *Master {
serviceReadWriteIP: serviceReadWriteIP,
// TODO: serviceReadWritePort should be passed in as an argument, it may not always be 443
serviceReadWritePort: 443,
installSSHKey: c.InstallSSHKey,
}
var handlerContainer *restful.Container
@ -474,15 +493,47 @@ func (m *Master) init(c *Config) {
"componentStatuses": componentstatus.NewStorage(func() map[string]apiserver.Server { return m.getServersToValidate(c) }),
}
var proxyDialer func(net, addr string) (net.Conn, error)
if len(c.SSHUser) > 0 {
glog.Infof("Setting up proxy: %s %s", c.SSHUser, c.SSHKeyfile)
exists, err := util.FileExists(c.SSHKeyfile)
if err != nil {
glog.Errorf("Error detecting if key exists: %v", err)
} else if !exists {
glog.Infof("Key doesn't exist, attempting to create")
err := m.generateSSHKey(c.SSHUser, c.SSHKeyfile)
if err != nil {
glog.Errorf("Failed to create key pair: %v", err)
}
}
m.setupSecureProxy(c.SSHUser, c.SSHKeyfile)
proxyDialer = m.Dial
// This is pretty ugly. A better solution would be to pull this all the way up into the
// server.go file.
httpKubeletClient, ok := c.KubeletClient.(*client.HTTPKubeletClient)
if ok {
httpKubeletClient.Config.Dial = m.Dial
transport, err := client.MakeTransport(httpKubeletClient.Config)
if err != nil {
glog.Errorf("Error setting up transport over SSH: %v", err)
} else {
httpKubeletClient.Client.Transport = transport
}
} else {
glog.Errorf("Failed to cast %v to HTTPKubeletClient, skipping SSH tunnel.")
}
}
apiVersions := []string{}
if m.v1beta3 {
if err := m.api_v1beta3().InstallREST(m.handlerContainer); err != nil {
if err := m.api_v1beta3().InstallREST(m.handlerContainer, proxyDialer); err != nil {
glog.Fatalf("Unable to setup API v1beta3: %v", err)
}
apiVersions = append(apiVersions, "v1beta3")
}
if m.v1 {
if err := m.api_v1().InstallREST(m.handlerContainer); err != nil {
if err := m.api_v1().InstallREST(m.handlerContainer, proxyDialer); err != nil {
glog.Fatalf("Unable to setup API v1: %v", err)
}
apiVersions = append(apiVersions, "v1")
@ -703,3 +754,138 @@ func (m *Master) api_v1() *apiserver.APIGroupVersion {
version.Codec = v1.Codec
return version
}
func findExternalAddress(node *api.Node) (string, error) {
for ix := range node.Status.Addresses {
addr := &node.Status.Addresses[ix]
if addr.Type == api.NodeExternalIP {
return addr.Address, nil
}
}
return "", fmt.Errorf("Couldn't find external address: %v", node)
}
func (m *Master) Dial(net, addr string) (net.Conn, error) {
m.tunnelsLock.Lock()
defer m.tunnelsLock.Unlock()
return m.tunnels.Dial(net, addr)
}
func (m *Master) needToReplaceTunnels(addrs []string) bool {
if len(m.tunnels) != len(addrs) {
return true
}
// TODO (cjcullen): This doesn't need to be n^2
for ix := range addrs {
if !m.tunnels.Has(addrs[ix]) {
return true
}
}
return false
}
func (m *Master) getNodeAddresses() ([]string, error) {
nodes, err := m.nodeRegistry.ListMinions(api.NewDefaultContext(), labels.Everything(), fields.Everything())
if err != nil {
return nil, err
}
addrs := []string{}
for ix := range nodes.Items {
node := &nodes.Items[ix]
addr, err := findExternalAddress(node)
if err != nil {
return nil, err
}
addrs = append(addrs, addr)
}
return addrs, nil
}
func (m *Master) replaceTunnels(user, keyfile string, newAddrs []string) error {
glog.Infof("replacing tunnels. New addrs: %v", newAddrs)
tunnels, err := util.MakeSSHTunnels(user, keyfile, newAddrs)
if err != nil {
return err
}
tunnels.Open()
if m.tunnels != nil {
m.tunnels.Close()
}
m.tunnels = tunnels
return nil
}
func (m *Master) loadTunnels(user, keyfile string) error {
m.tunnelsLock.Lock()
defer m.tunnelsLock.Unlock()
addrs, err := m.getNodeAddresses()
if err != nil {
return err
}
if !m.needToReplaceTunnels(addrs) {
return nil
}
// TODO: This is going to unnecessarily close connections to unchanged nodes.
// See comment about using Watch above.
glog.Info("found different nodes. Need to replace tunnels")
return m.replaceTunnels(user, keyfile, addrs)
}
func (m *Master) refreshTunnels(user, keyfile string) error {
m.tunnelsLock.Lock()
defer m.tunnelsLock.Unlock()
addrs, err := m.getNodeAddresses()
if err != nil {
return err
}
return m.replaceTunnels(user, keyfile, addrs)
}
func (m *Master) setupSecureProxy(user, keyfile string) {
// Sync loop for tunnels
// TODO: switch this to watch.
go func() {
for {
if err := m.loadTunnels(user, keyfile); err != nil {
glog.Errorf("Failed to load SSH Tunnels: %v", err)
}
var sleep time.Duration
if len(m.tunnels) == 0 {
sleep = time.Second
} else {
// tunnels could lag behind current set of nodes
sleep = 10 * time.Second
}
time.Sleep(sleep)
}
}()
// Refresh loop for tunnels
// TODO: could make this more controller-ish
go func() {
for {
time.Sleep(5 * time.Minute)
if err := m.refreshTunnels(user, keyfile); err != nil {
glog.Errorf("Failed to refresh SSH Tunnels: %v", err)
}
}
}()
}
func (m *Master) generateSSHKey(user, keyfile string) error {
if m.installSSHKey == nil {
return errors.New("ssh install function is null")
}
private, public, err := util.GenerateKey(2048)
if err != nil {
return err
}
if err := ioutil.WriteFile(keyfile, util.EncodePrivateKey(private), 0600); err != nil {
return err
}
data, err := util.EncodeSSHKey(public)
if err != nil {
return err
}
return m.installSSHKey(user, data)
}

View File

@ -18,11 +18,17 @@ package util
import (
"bytes"
"crypto/rand"
"crypto/rsa"
"crypto/x509"
"encoding/pem"
"fmt"
"io"
"io/ioutil"
mathrand "math/rand"
"net"
"os"
"time"
"github.com/golang/glog"
"golang.org/x/crypto/ssh"
@ -31,15 +37,12 @@ import (
// TODO: Unit tests for this code, we can spin up a test SSH server with instructions here:
// https://godoc.org/golang.org/x/crypto/ssh#ServerConn
type SSHTunnel struct {
Config *ssh.ClientConfig
Host string
SSHPort int
LocalPort int
RemoteHost string
RemotePort int
running bool
sock net.Listener
client *ssh.Client
Config *ssh.ClientConfig
Host string
SSHPort string
running bool
sock net.Listener
client *ssh.Client
}
func (s *SSHTunnel) copyBytes(out io.Writer, in io.Reader) {
@ -48,7 +51,7 @@ func (s *SSHTunnel) copyBytes(out io.Writer, in io.Reader) {
}
}
func NewSSHTunnel(user, keyfile, host, remoteHost string, localPort, remotePort int) (*SSHTunnel, error) {
func NewSSHTunnel(user, keyfile, host string) (*SSHTunnel, error) {
signer, err := MakePrivateKeySigner(keyfile)
if err != nil {
return nil, err
@ -58,44 +61,27 @@ func NewSSHTunnel(user, keyfile, host, remoteHost string, localPort, remotePort
Auth: []ssh.AuthMethod{ssh.PublicKeys(signer)},
}
return &SSHTunnel{
Config: &config,
Host: host,
SSHPort: 22,
LocalPort: localPort,
RemotePort: remotePort,
RemoteHost: remoteHost,
Config: &config,
Host: host,
SSHPort: "22",
}, nil
}
func (s *SSHTunnel) Open() error {
var err error
s.client, err = ssh.Dial("tcp", fmt.Sprintf("%s:%d", s.Host, s.SSHPort), s.Config)
s.client, err = ssh.Dial("tcp", net.JoinHostPort(s.Host, s.SSHPort), s.Config)
if err != nil {
return err
}
s.sock, err = net.Listen("tcp", fmt.Sprintf("localhost:%d", s.LocalPort))
if err != nil {
return err
}
s.running = true
return nil
}
func (s *SSHTunnel) Listen() {
for s.running {
conn, err := s.sock.Accept()
if err != nil {
glog.Errorf("Error listening for ssh tunnel to %s (%v)", s.RemoteHost, err)
continue
}
if err := s.tunnel(conn); err != nil {
glog.Errorf("Error starting tunnel: %v", err)
}
}
func (s *SSHTunnel) Dial(network, address string) (net.Conn, error) {
return s.client.Dial(network, address)
}
func (s *SSHTunnel) tunnel(conn net.Conn) error {
tunnel, err := s.client.Dial("tcp", fmt.Sprintf("%s:%d", s.RemoteHost, s.RemotePort))
func (s *SSHTunnel) tunnel(conn net.Conn, remoteHost, remotePort string) error {
tunnel, err := s.client.Dial("tcp", net.JoinHostPort(remoteHost, remotePort))
if err != nil {
return err
}
@ -105,12 +91,6 @@ func (s *SSHTunnel) tunnel(conn net.Conn) error {
}
func (s *SSHTunnel) Close() error {
// TODO: try to shutdown copying here?
s.running = false
// TODO: Aggregate errors and keep going?
if err := s.sock.Close(); err != nil {
return err
}
if err := s.client.Close(); err != nil {
return err
}
@ -172,3 +152,98 @@ func MakePrivateKeySigner(key string) (ssh.Signer, error) {
}
return signer, nil
}
type SSHTunnelEntry struct {
Address string
Tunnel *SSHTunnel
}
type SSHTunnelList []SSHTunnelEntry
func MakeSSHTunnels(user, keyfile string, addresses []string) (SSHTunnelList, error) {
tunnels := []SSHTunnelEntry{}
for ix := range addresses {
addr := addresses[ix]
tunnel, err := NewSSHTunnel(user, keyfile, addr)
if err != nil {
return nil, err
}
tunnels = append(tunnels, SSHTunnelEntry{addr, tunnel})
}
return tunnels, nil
}
func (l SSHTunnelList) Open() error {
for ix := range l {
if err := l[ix].Tunnel.Open(); err != nil {
return err
}
}
return nil
}
// Close asynchronously closes all tunnels in the list after waiting for 1
// minute. Tunnels will still be open upon this function's return, but should
// no longer be used.
func (l SSHTunnelList) Close() {
for ix := range l {
entry := l[ix]
go func() {
time.Sleep(1 * time.Minute)
if err := entry.Tunnel.Close(); err != nil {
glog.Errorf("Failed to close tunnel %v: %v", entry, err)
}
}()
}
}
func (l SSHTunnelList) Dial(network, addr string) (net.Conn, error) {
if len(l) == 0 {
return nil, fmt.Errorf("Empty tunnel list.")
}
return l[mathrand.Int()%len(l)].Tunnel.Dial(network, addr)
}
func (l SSHTunnelList) Has(addr string) bool {
for ix := range l {
if l[ix].Address == addr {
return true
}
}
return false
}
func EncodePrivateKey(private *rsa.PrivateKey) []byte {
return pem.EncodeToMemory(&pem.Block{
Bytes: x509.MarshalPKCS1PrivateKey(private),
Type: "RSA PRIVATE KEY",
})
}
func EncodePublicKey(public *rsa.PublicKey) ([]byte, error) {
publicBytes, err := x509.MarshalPKIXPublicKey(public)
if err != nil {
return nil, err
}
return pem.EncodeToMemory(&pem.Block{
Bytes: publicBytes,
Type: "PUBLIC KEY",
}), nil
}
func EncodeSSHKey(public *rsa.PublicKey) ([]byte, error) {
publicKey, err := ssh.NewPublicKey(public)
if err != nil {
return nil, err
}
return ssh.MarshalAuthorizedKey(publicKey), nil
}
func GenerateKey(bits int) (*rsa.PrivateKey, *rsa.PublicKey, error) {
private, err := rsa.GenerateKey(rand.Reader, bits)
if err != nil {
return nil, nil, err
}
return private, &private.PublicKey, nil
}

View File

@ -516,3 +516,12 @@ func ShortenString(str string, n int) string {
return str[:n]
}
}
func FileExists(filename string) (bool, error) {
if _, err := os.Stat(filename); os.IsNotExist(err) {
return false, nil
} else if err != nil {
return false, err
}
return true, nil
}