diff --git a/cluster/cluster.go b/cluster/cluster.go index b04218cc..e80e4ce4 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -298,3 +298,20 @@ func (c *Cluster) ApplyAuthzResources(ctx context.Context) error { } return nil } + +func (c *Cluster) getUniqueHostList() []*hosts.Host { + hostList := []*hosts.Host{} + hostList = append(hostList, c.EtcdHosts...) + hostList = append(hostList, c.ControlPlaneHosts...) + hostList = append(hostList, c.WorkerHosts...) + // little trick to get a unique host list + uniqHostMap := make(map[*hosts.Host]bool) + for _, host := range hostList { + uniqHostMap[host] = true + } + uniqHostList := []*hosts.Host{} + for host := range uniqHostMap { + uniqHostList = append(uniqHostList, host) + } + return uniqHostList +} diff --git a/cluster/network.go b/cluster/network.go index a759cf52..93a40fca 100644 --- a/cluster/network.go +++ b/cluster/network.go @@ -1,18 +1,40 @@ package cluster import ( + "bufio" "context" "fmt" + "io" + "net" + "strings" + "github.com/docker/docker/api/types/container" + "github.com/docker/go-connections/nat" + "github.com/rancher/rke/docker" + "github.com/rancher/rke/hosts" "github.com/rancher/rke/log" "github.com/rancher/rke/pki" "github.com/rancher/rke/services" "github.com/rancher/rke/templates" + "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" ) const ( NetworkPluginResourceName = "rke-network-plugin" + PortCheckContainer = "rke-port-checker" + PortListenContainer = "rke-port-listener" + + KubeAPIPort = "6443" + EtcdPort1 = "2379" + EtcdPort2 = "2380" + ScedulerPort = "10251" + ControllerPort = "10252" + KubeletPort = "10250" + KubeProxyPort = "10256" + FlannetVXLANPortUDP = "8472" + FlannelNetworkPlugin = "flannel" FlannelImage = "flannel_image" FlannelCNIImage = "flannel_cni_image" @@ -209,3 +231,231 @@ func (c *Cluster) getNetworkPluginManifest(pluginConfig map[string]string) (stri return "", fmt.Errorf("[network] Unsupported network plugin: %s", c.Network.Plugin) } } + +func (c *Cluster) CheckClusterPorts(ctx context.Context) error { + if err := c.deployTCPPortListeners(ctx); err != nil { + return err + } + if err := c.runServicePortChecks(ctx); err != nil { + return err + } + if err := c.checkKubeAPIPort(ctx); err != nil { + return err + } + return c.removeTCPPortListeners(ctx) +} + +func (c *Cluster) checkKubeAPIPort(ctx context.Context) error { + log.Infof(ctx, "[network] Checking KubeAPI port Control Plane hosts") + for _, host := range c.ControlPlaneHosts { + logrus.Debugf("[network] Checking KubeAPI port [%s] on host: %s", KubeAPIPort, host.Address) + address := fmt.Sprintf("%s:%s", host.Address, KubeAPIPort) + conn, err := net.Dial("tcp", address) + if err != nil { + return fmt.Errorf("[network] Can't access KubeAPI port [%s] on Control Plane host: %s", KubeAPIPort, host.Address) + } + conn.Close() + } + return nil +} + +func (c *Cluster) deployTCPPortListeners(ctx context.Context) error { + log.Infof(ctx, "[network] Deploying port listener containers") + var errgrp errgroup.Group + + portList := []string{ + KubeAPIPort, + EtcdPort1, + EtcdPort2, + ScedulerPort, + ControllerPort, + KubeletPort, + KubeProxyPort, + } + portBindingList := []nat.PortBinding{} + for _, portNumber := range portList { + rawPort := fmt.Sprintf("0.0.0.0:%s:1337/tcp", portNumber) + portMapping, _ := nat.ParsePortSpec(rawPort) + portBindingList = append(portBindingList, portMapping[0].Binding) + } + + imageCfg := &container.Config{ + Image: c.SystemImages[AplineImage], + Cmd: []string{ + "nc", + "-kl", + "-p", + "1337", + "-e", + "echo", + }, + ExposedPorts: nat.PortSet{ + "1337/tcp": {}, + }, + } + + hostCfg := &container.HostConfig{ + PortBindings: nat.PortMap{ + "1337/tcp": portBindingList, + }, + } + + uniqHosts := c.getUniqueHostList() + for _, host := range uniqHosts { + runHost := host + errgrp.Go(func() error { + return docker.DoRunContainer(ctx, runHost.DClient, imageCfg, hostCfg, PortListenContainer, runHost.Address, "network") + }) + } + if err := errgrp.Wait(); err != nil { + return err + } + log.Infof(ctx, "[network] Port listener containers deployed successfully") + return nil +} +func (c *Cluster) removeTCPPortListeners(ctx context.Context) error { + log.Infof(ctx, "[network] Removing port listener containers") + var errgrp errgroup.Group + + uniqHosts := c.getUniqueHostList() + for _, host := range uniqHosts { + runHost := host + errgrp.Go(func() error { + return docker.DoRemoveContainer(ctx, runHost.DClient, PortListenContainer, runHost.Address) + }) + } + if err := errgrp.Wait(); err != nil { + return err + } + log.Infof(ctx, "[network] Port listener containers removed successfully") + return nil +} + +func (c *Cluster) runServicePortChecks(ctx context.Context) error { + var errgrp errgroup.Group + // check etcd <-> etcd + etcdPortList := []string{ + EtcdPort1, + EtcdPort2, + } + // one etcd host is a pass + if len(c.EtcdHosts) > 1 { + log.Infof(ctx, "[network] Running etcd <-> etcd port checks") + for _, host := range c.EtcdHosts { + runHost := host + errgrp.Go(func() error { + return checkPlaneTCPPortsFromHost(ctx, runHost, etcdPortList, c.EtcdHosts, c.SystemImages[AplineImage]) + }) + } + if err := errgrp.Wait(); err != nil { + return err + } + } + // check all -> etcd connectivity + log.Infof(ctx, "[network] Running all -> etcd port checks") + for _, host := range c.ControlPlaneHosts { + runHost := host + errgrp.Go(func() error { + return checkPlaneTCPPortsFromHost(ctx, runHost, etcdPortList, c.EtcdHosts, c.SystemImages[AplineImage]) + }) + } + if err := errgrp.Wait(); err != nil { + return err + } + // Workers need to talk to etcd for calico + for _, host := range c.WorkerHosts { + runHost := host + errgrp.Go(func() error { + return checkPlaneTCPPortsFromHost(ctx, runHost, etcdPortList, c.EtcdHosts, c.SystemImages[AplineImage]) + }) + } + if err := errgrp.Wait(); err != nil { + return err + } + // check controle plane -> Workers + log.Infof(ctx, "[network] Running control plane -> etcd port checks") + workerPortList := []string{ + KubeletPort, + } + for _, host := range c.ControlPlaneHosts { + runHost := host + errgrp.Go(func() error { + return checkPlaneTCPPortsFromHost(ctx, runHost, workerPortList, c.WorkerHosts, c.SystemImages[AplineImage]) + }) + } + if err := errgrp.Wait(); err != nil { + return err + } + // check workers -> control plane + log.Infof(ctx, "[network] Running workers -> control plane port checks") + controlPlanePortList := []string{ + KubeAPIPort, + } + for _, host := range c.WorkerHosts { + runHost := host + errgrp.Go(func() error { + return checkPlaneTCPPortsFromHost(ctx, runHost, controlPlanePortList, c.ControlPlaneHosts, c.SystemImages[AplineImage]) + }) + } + return errgrp.Wait() +} + +func checkPlaneTCPPortsFromHost(ctx context.Context, host *hosts.Host, portList []string, planeHosts []*hosts.Host, image string) error { + hosts := []string{} + for _, host := range planeHosts { + hosts = append(hosts, host.Address) + } + imageCfg := &container.Config{ + Image: image, + Tty: true, + Env: []string{ + fmt.Sprintf("HOSTS=%s", strings.Join(hosts, " ")), + fmt.Sprintf("PORTS=%s", strings.Join(portList, " ")), + }, + Cmd: []string{ + "sh", + "-c", + "for host in $HOSTS; do for port in $PORTS ; do nc -z $host $port > /dev/null || echo $host $port ; done; done", + }, + } + if err := docker.DoRemoveContainer(ctx, host.DClient, PortCheckContainer, host.Address); err != nil { + return err + } + if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, nil, PortCheckContainer, host.Address, "network"); err != nil { + return err + } + if err := docker.WaitForContainer(ctx, host.DClient, PortCheckContainer); err != nil { + return err + } + logs, err := docker.ReadContainerLogs(ctx, host.DClient, PortCheckContainer) + if err != nil { + return err + } + defer logs.Close() + if err := docker.RemoveContainer(ctx, host.DClient, host.Address, PortCheckContainer); err != nil { + return err + } + portCheckLogs, err := getPortCheckLogs(logs) + if err != nil { + return err + } + if len(portCheckLogs) > 0 { + + return fmt.Errorf("[netwok] Port check for ports: [%s] failed on host: [%s]", strings.Join(portCheckLogs, ", "), host.Address) + + } + return nil +} + +func getPortCheckLogs(reader io.ReadCloser) ([]string, error) { + logLines := bufio.NewScanner(reader) + hostPortLines := []string{} + for logLines.Scan() { + logLine := strings.Split(logLines.Text(), " ") + hostPortLines = append(hostPortLines, fmt.Sprintf("%s:%s", logLine[0], logLine[1])) + } + if err := logLines.Err(); err != nil { + return nil, err + } + return hostPortLines, nil +} diff --git a/cmd/up.go b/cmd/up.go index de0123f2..40188b38 100644 --- a/cmd/up.go +++ b/cmd/up.go @@ -54,6 +54,10 @@ func ClusterUp( return APIURL, caCrt, clientCert, clientKey, err } + if err = kubeCluster.CheckClusterPorts(ctx); err != nil { + return APIURL, caCrt, clientCert, clientKey, err + } + currentCluster, err := kubeCluster.GetClusterState(ctx) if err != nil { return APIURL, caCrt, clientCert, clientKey, err diff --git a/docker/docker.go b/docker/docker.go index ab1788d9..00d26468 100644 --- a/docker/docker.go +++ b/docker/docker.go @@ -289,3 +289,8 @@ func ReadFileFromContainer(ctx context.Context, dClient *client.Client, hostname } return string(file), nil } + +func ReadContainerLogs(ctx context.Context, dClient *client.Client, containerName string) (io.ReadCloser, error) { + return dClient.ContainerLogs(ctx, containerName, types.ContainerLogsOptions{ShowStdout: true}) + +}