mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-14 05:36:12 +00:00
Initial proxy tunnelling.
This commit is contained in:
@@ -39,6 +39,8 @@ import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/auth/authorizer"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/auth/handlers"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/componentstatus"
|
||||
controlleretcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller/etcd"
|
||||
@@ -143,6 +145,10 @@ type Config struct {
|
||||
|
||||
// The range of ports to be assigned to services with type=NodePort or greater
|
||||
ServiceNodePortRange util.PortRange
|
||||
|
||||
// Used for secure proxy. If empty, don't use secure proxy.
|
||||
SSHUser string
|
||||
SSHKeyfile string
|
||||
}
|
||||
|
||||
// Master contains state for a Kubernetes cluster master/api server.
|
||||
@@ -196,6 +202,9 @@ type Master struct {
|
||||
// "Outputs"
|
||||
Handler http.Handler
|
||||
InsecureHandler http.Handler
|
||||
|
||||
// Used for secure proxy
|
||||
tunnels util.SSHTunnelList
|
||||
}
|
||||
|
||||
// NewEtcdHelper returns an EtcdHelper for the provided arguments or an error if the version
|
||||
@@ -474,15 +483,22 @@ func (m *Master) init(c *Config) {
|
||||
"componentStatuses": componentstatus.NewStorage(func() map[string]apiserver.Server { return m.getServersToValidate(c) }),
|
||||
}
|
||||
|
||||
var proxyDialer func(net, addr string) (net.Conn, error)
|
||||
if len(c.SSHUser) > 0 {
|
||||
glog.Infof("Setting up proxy: %s %s", c.SSHUser, c.SSHKeyfile)
|
||||
m.setupSecureProxy(c.SSHUser, c.SSHKeyfile)
|
||||
proxyDialer = m.Dial
|
||||
}
|
||||
|
||||
apiVersions := []string{}
|
||||
if m.v1beta3 {
|
||||
if err := m.api_v1beta3().InstallREST(m.handlerContainer); err != nil {
|
||||
if err := m.api_v1beta3().InstallREST(m.handlerContainer, proxyDialer); err != nil {
|
||||
glog.Fatalf("Unable to setup API v1beta3: %v", err)
|
||||
}
|
||||
apiVersions = append(apiVersions, "v1beta3")
|
||||
}
|
||||
if m.v1 {
|
||||
if err := m.api_v1().InstallREST(m.handlerContainer); err != nil {
|
||||
if err := m.api_v1().InstallREST(m.handlerContainer, proxyDialer); err != nil {
|
||||
glog.Fatalf("Unable to setup API v1: %v", err)
|
||||
}
|
||||
apiVersions = append(apiVersions, "v1")
|
||||
@@ -703,3 +719,85 @@ func (m *Master) api_v1() *apiserver.APIGroupVersion {
|
||||
version.Codec = v1.Codec
|
||||
return version
|
||||
}
|
||||
|
||||
func findExternalAddress(node *api.Node) (string, error) {
|
||||
for ix := range node.Status.Addresses {
|
||||
addr := &node.Status.Addresses[ix]
|
||||
if addr.Type == api.NodeExternalIP {
|
||||
return addr.Address, nil
|
||||
}
|
||||
}
|
||||
return "", fmt.Errorf("Couldn't find external address: %v", node)
|
||||
}
|
||||
|
||||
func (m *Master) Dial(net, addr string) (net.Conn, error) {
|
||||
return m.tunnels.Dial(net, addr)
|
||||
}
|
||||
|
||||
func (m *Master) detectTunnelChanges(addrs []string) bool {
|
||||
if len(m.tunnels) != len(addrs) {
|
||||
return true
|
||||
}
|
||||
for ix := range addrs {
|
||||
if !m.tunnels.Has(addrs[ix]) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (m *Master) loadTunnels(user, keyfile string) error {
|
||||
nodes, err := m.nodeRegistry.ListMinions(api.NewDefaultContext(), labels.Everything(), fields.Everything())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
result := []string{}
|
||||
for ix := range nodes.Items {
|
||||
node := &nodes.Items[ix]
|
||||
addr, err := findExternalAddress(node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
result = append(result, addr)
|
||||
}
|
||||
changesExist := m.detectTunnelChanges(result)
|
||||
if !changesExist {
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO: This is going to drop connections in the middle. See comment about using Watch above.
|
||||
tunnels, err := util.MakeSSHTunnels(user, keyfile, result)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tunnels.Open()
|
||||
if m.tunnels != nil {
|
||||
m.tunnels.Close()
|
||||
}
|
||||
m.tunnels = tunnels
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Master) setupSecureProxy(user, keyfile string) {
|
||||
loadTunnelsPrintError := func() {
|
||||
if err := m.loadTunnels(user, keyfile); err != nil {
|
||||
glog.Errorf("Failed to load SSH Tunnels: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Sync loop for tunnels
|
||||
// TODO: switch this to watch.
|
||||
go func() {
|
||||
for {
|
||||
loadTunnelsPrintError()
|
||||
|
||||
var sleep time.Duration
|
||||
if len(m.tunnels) == 0 {
|
||||
sleep = time.Second
|
||||
} else {
|
||||
sleep = time.Second * 120
|
||||
}
|
||||
time.Sleep(sleep)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
Reference in New Issue
Block a user