Refactor SSH tunneling, fix proxy transport TLS/Dial extraction

This commit is contained in:
Jordan Liggitt
2015-10-09 01:18:16 -04:00
parent 826459e51e
commit 1043126135
26 changed files with 739 additions and 513 deletions

View File

@@ -17,18 +17,15 @@ limitations under the License.
package master
import (
"crypto/tls"
"fmt"
"io/ioutil"
"math/rand"
"net"
"net/http"
"net/http/pprof"
"net/url"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"k8s.io/kubernetes/pkg/admission"
@@ -240,10 +237,12 @@ type Config struct {
// The range of ports to be assigned to services with type=NodePort or greater
ServiceNodePortRange util.PortRange
// Used for secure proxy. If empty, don't use secure proxy.
SSHUser string
SSHKeyfile string
InstallSSHKey InstallSSHKey
// Used to customize default proxy dial/tls options
ProxyDialer apiserver.ProxyDialerFunc
ProxyTLSClientConfig *tls.Config
// Used to start and monitor tunneling
Tunneler Tunneler
KubernetesServiceNodePort int
}
@@ -305,14 +304,11 @@ type Master struct {
Handler http.Handler
InsecureHandler http.Handler
// Used for secure proxy
dialer apiserver.ProxyDialerFunc
tunnels *util.SSHTunnelList
tunnelsLock sync.Mutex
installSSHKey InstallSSHKey
lastSync int64 // Seconds since Epoch
lastSyncMetric prometheus.GaugeFunc
clock util.Clock
// Used for custom proxy dialing, and proxy TLS options
proxyTransport http.RoundTripper
// Used to start and monitor tunneling
tunneler Tunneler
// storage for third party objects
thirdPartyStorage storage.Interface
@@ -453,7 +449,8 @@ func New(c *Config) *Master {
// TODO: serviceReadWritePort should be passed in as an argument, it may not always be 443
serviceReadWritePort: 443,
installSSHKey: c.InstallSSHKey,
tunneler: c.Tunneler,
KubernetesServiceNodePort: c.KubernetesServiceNodePort,
}
@@ -505,10 +502,18 @@ func NewHandlerContainer(mux *http.ServeMux) *restful.Container {
// init initializes master.
func (m *Master) init(c *Config) {
if c.ProxyDialer != nil || c.ProxyTLSClientConfig != nil {
m.proxyTransport = util.SetTransportDefaults(&http.Transport{
Dial: c.ProxyDialer,
TLSClientConfig: c.ProxyTLSClientConfig,
})
}
healthzChecks := []healthz.HealthzChecker{}
m.clock = util.RealClock{}
dbClient := func(resource string) storage.Interface { return c.StorageDestinations.get("", resource) }
podStorage := podetcd.NewStorage(dbClient("pods"), c.EnableWatchCache, c.KubeletClient)
podStorage := podetcd.NewStorage(dbClient("pods"), c.EnableWatchCache, c.KubeletClient, m.proxyTransport)
podTemplateStorage := podtemplateetcd.NewREST(dbClient("podTemplates"))
@@ -527,7 +532,7 @@ func (m *Master) init(c *Config) {
endpointsStorage := endpointsetcd.NewREST(dbClient("endpoints"), c.EnableWatchCache)
m.endpointRegistry = endpoint.NewRegistry(endpointsStorage)
nodeStorage, nodeStatusStorage := nodeetcd.NewREST(dbClient("nodes"), c.EnableWatchCache, c.KubeletClient)
nodeStorage, nodeStatusStorage := nodeetcd.NewREST(dbClient("nodes"), c.EnableWatchCache, c.KubeletClient, m.proxyTransport)
m.nodeRegistry = node.NewRegistry(nodeStorage)
serviceStorage := serviceetcd.NewREST(dbClient("services"))
@@ -569,7 +574,7 @@ func (m *Master) init(c *Config) {
"replicationControllers": controllerStorage,
"replicationControllers/status": controllerStatusStorage,
"services": service.NewStorage(m.serviceRegistry, m.endpointRegistry, serviceClusterIPAllocator, serviceNodePortAllocator),
"services": service.NewStorage(m.serviceRegistry, m.endpointRegistry, serviceClusterIPAllocator, serviceNodePortAllocator, m.proxyTransport),
"endpoints": endpointsStorage,
"nodes": nodeStorage,
"nodes/status": nodeStatusStorage,
@@ -591,51 +596,13 @@ func (m *Master) init(c *Config) {
"componentStatuses": componentstatus.NewStorage(func() map[string]apiserver.Server { return m.getServersToValidate(c) }),
}
// establish the node proxy dialer
if len(c.SSHUser) > 0 {
// Usernames are capped @ 32
if len(c.SSHUser) > 32 {
glog.Warning("SSH User is too long, truncating to 32 chars")
c.SSHUser = c.SSHUser[0:32]
}
glog.Infof("Setting up proxy: %s %s", c.SSHUser, c.SSHKeyfile)
// public keyfile is written last, so check for that.
publicKeyFile := c.SSHKeyfile + ".pub"
exists, err := util.FileExists(publicKeyFile)
if err != nil {
glog.Errorf("Error detecting if key exists: %v", err)
} else if !exists {
glog.Infof("Key doesn't exist, attempting to create")
err := m.generateSSHKey(c.SSHUser, c.SSHKeyfile, publicKeyFile)
if err != nil {
glog.Errorf("Failed to create key pair: %v", err)
}
}
m.tunnels = &util.SSHTunnelList{}
m.dialer = m.Dial
m.setupSecureProxy(c.SSHUser, c.SSHKeyfile, publicKeyFile)
m.lastSync = m.clock.Now().Unix()
// This is pretty ugly. A better solution would be to pull this all the way up into the
// server.go file.
httpKubeletClient, ok := c.KubeletClient.(*client.HTTPKubeletClient)
if ok {
httpKubeletClient.Config.Dial = m.dialer
transport, err := client.MakeTransport(httpKubeletClient.Config)
if err != nil {
glog.Errorf("Error setting up transport over SSH: %v", err)
} else {
httpKubeletClient.Client.Transport = transport
}
} else {
glog.Errorf("Failed to cast %v to HTTPKubeletClient, skipping SSH tunnel.", c.KubeletClient)
}
if m.tunneler != nil {
m.tunneler.Run(m.getNodeAddresses)
healthzChecks = append(healthzChecks, healthz.NamedCheck("SSH Tunnel Check", m.IsTunnelSyncHealthy))
m.lastSyncMetric = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "apiserver_proxy_tunnel_sync_latency_secs",
Help: "The time since the last successful synchronization of the SSH tunnels for proxy requests.",
}, func() float64 { return float64(m.secondsSinceSync()) })
}, func() float64 { return float64(m.tunneler.SecondsSinceSync()) })
}
apiVersions := []string{}
@@ -875,7 +842,6 @@ func (m *Master) defaultAPIGroupVersion() *apiserver.APIGroupVersion {
Admit: m.admissionControl,
Context: m.requestContextMapper,
ProxyDialerFn: m.dialer,
MinRequestTimeout: m.minRequestTimeout,
}
}
@@ -1031,7 +997,6 @@ func (m *Master) thirdpartyapi(group, kind, version string) *apiserver.APIGroupV
Context: m.requestContextMapper,
ProxyDialerFn: m.dialer,
MinRequestTimeout: m.minRequestTimeout,
}
}
@@ -1094,7 +1059,6 @@ func (m *Master) experimental(c *Config) *apiserver.APIGroupVersion {
Admit: m.admissionControl,
Context: m.requestContextMapper,
ProxyDialerFn: m.dialer,
MinRequestTimeout: m.minRequestTimeout,
}
}
@@ -1117,41 +1081,6 @@ func findExternalAddress(node *api.Node) (string, error) {
return "", fmt.Errorf("Couldn't find external address: %v", node)
}
func (m *Master) Dial(net, addr string) (net.Conn, error) {
// Only lock while picking a tunnel.
tunnel, err := func() (util.SSHTunnelEntry, error) {
m.tunnelsLock.Lock()
defer m.tunnelsLock.Unlock()
return m.tunnels.PickRandomTunnel()
}()
if err != nil {
return nil, err
}
start := time.Now()
id := rand.Int63() // So you can match begins/ends in the log.
glog.V(3).Infof("[%x: %v] Dialing...", id, tunnel.Address)
defer func() {
glog.V(3).Infof("[%x: %v] Dialed in %v.", id, tunnel.Address, time.Now().Sub(start))
}()
return tunnel.Tunnel.Dial(net, addr)
}
func (m *Master) needToReplaceTunnels(addrs []string) bool {
m.tunnelsLock.Lock()
defer m.tunnelsLock.Unlock()
if m.tunnels == nil || m.tunnels.Len() != len(addrs) {
return true
}
// TODO (cjcullen): This doesn't need to be n^2
for ix := range addrs {
if !m.tunnels.Has(addrs[ix]) {
return true
}
}
return false
}
func (m *Master) getNodeAddresses() ([]string, error) {
nodes, err := m.nodeRegistry.ListNodes(api.NewDefaultContext(), labels.Everything(), fields.Everything())
if err != nil {
@@ -1170,126 +1099,12 @@ func (m *Master) getNodeAddresses() ([]string, error) {
}
func (m *Master) IsTunnelSyncHealthy(req *http.Request) error {
lag := m.secondsSinceSync()
if m.tunneler == nil {
return nil
}
lag := m.tunneler.SecondsSinceSync()
if lag > 600 {
return fmt.Errorf("Tunnel sync is taking to long: %d", lag)
}
return nil
}
func (m *Master) secondsSinceSync() int64 {
now := m.clock.Now().Unix()
then := atomic.LoadInt64(&m.lastSync)
return now - then
}
func (m *Master) replaceTunnels(user, keyfile string, newAddrs []string) error {
glog.Infof("replacing tunnels. New addrs: %v", newAddrs)
tunnels := util.MakeSSHTunnels(user, keyfile, newAddrs)
if err := tunnels.Open(); err != nil {
return err
}
m.tunnelsLock.Lock()
defer m.tunnelsLock.Unlock()
if m.tunnels != nil {
m.tunnels.Close()
}
m.tunnels = tunnels
atomic.StoreInt64(&m.lastSync, m.clock.Now().Unix())
return nil
}
func (m *Master) loadTunnels(user, keyfile string) error {
addrs, err := m.getNodeAddresses()
if err != nil {
return err
}
if !m.needToReplaceTunnels(addrs) {
return nil
}
// TODO: This is going to unnecessarily close connections to unchanged nodes.
// See comment about using Watch above.
glog.Info("found different nodes. Need to replace tunnels")
return m.replaceTunnels(user, keyfile, addrs)
}
func (m *Master) refreshTunnels(user, keyfile string) error {
addrs, err := m.getNodeAddresses()
if err != nil {
return err
}
return m.replaceTunnels(user, keyfile, addrs)
}
func (m *Master) setupSecureProxy(user, privateKeyfile, publicKeyfile string) {
// Sync loop to ensure that the SSH key has been installed.
go util.Until(func() {
if m.installSSHKey == nil {
glog.Error("Won't attempt to install ssh key: installSSHKey function is nil")
return
}
key, err := util.ParsePublicKeyFromFile(publicKeyfile)
if err != nil {
glog.Errorf("Failed to load public key: %v", err)
return
}
keyData, err := util.EncodeSSHKey(key)
if err != nil {
glog.Errorf("Failed to encode public key: %v", err)
return
}
if err := m.installSSHKey(user, keyData); err != nil {
glog.Errorf("Failed to install ssh key: %v", err)
}
}, 5*time.Minute, util.NeverStop)
// Sync loop for tunnels
// TODO: switch this to watch.
go util.Until(func() {
if err := m.loadTunnels(user, privateKeyfile); err != nil {
glog.Errorf("Failed to load SSH Tunnels: %v", err)
}
if m.tunnels != nil && m.tunnels.Len() != 0 {
// Sleep for 10 seconds if we have some tunnels.
// TODO (cjcullen): tunnels can lag behind actually existing nodes.
time.Sleep(9 * time.Second)
}
}, 1*time.Second, util.NeverStop)
// Refresh loop for tunnels
// TODO: could make this more controller-ish
go util.Until(func() {
time.Sleep(5 * time.Minute)
if err := m.refreshTunnels(user, privateKeyfile); err != nil {
glog.Errorf("Failed to refresh SSH Tunnels: %v", err)
}
}, 0*time.Second, util.NeverStop)
}
func (m *Master) generateSSHKey(user, privateKeyfile, publicKeyfile string) error {
// TODO: user is not used. Consider removing it as an input to the function.
private, public, err := util.GenerateKey(2048)
if err != nil {
return err
}
// If private keyfile already exists, we must have only made it halfway
// through last time, so delete it.
exists, err := util.FileExists(privateKeyfile)
if err != nil {
glog.Errorf("Error detecting if private key exists: %v", err)
} else if exists {
glog.Infof("Private key exists, but public key does not")
if err := os.Remove(privateKeyfile); err != nil {
glog.Errorf("Failed to remove stale private key: %v", err)
}
}
if err := ioutil.WriteFile(privateKeyfile, util.EncodePrivateKey(private), 0600); err != nil {
return err
}
publicKeyBytes, err := util.EncodePublicKey(public)
if err != nil {
return err
}
if err := ioutil.WriteFile(publicKeyfile+".tmp", publicKeyBytes, 0600); err != nil {
return err
}
return os.Rename(publicKeyfile+".tmp", publicKeyfile)
}