mirror of
https://github.com/rancher/rke.git
synced 2025-08-01 15:19:09 +00:00
Add open port checks
This commit is contained in:
parent
d351594098
commit
c815ef5751
@ -298,3 +298,20 @@ func (c *Cluster) ApplyAuthzResources(ctx context.Context) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) getUniqueHostList() []*hosts.Host {
|
||||
hostList := []*hosts.Host{}
|
||||
hostList = append(hostList, c.EtcdHosts...)
|
||||
hostList = append(hostList, c.ControlPlaneHosts...)
|
||||
hostList = append(hostList, c.WorkerHosts...)
|
||||
// little trick to get a unique host list
|
||||
uniqHostMap := make(map[*hosts.Host]bool)
|
||||
for _, host := range hostList {
|
||||
uniqHostMap[host] = true
|
||||
}
|
||||
uniqHostList := []*hosts.Host{}
|
||||
for host := range uniqHostMap {
|
||||
uniqHostList = append(uniqHostList, host)
|
||||
}
|
||||
return uniqHostList
|
||||
}
|
||||
|
@ -1,18 +1,40 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"strings"
|
||||
|
||||
"github.com/docker/docker/api/types/container"
|
||||
"github.com/docker/go-connections/nat"
|
||||
"github.com/rancher/rke/docker"
|
||||
"github.com/rancher/rke/hosts"
|
||||
"github.com/rancher/rke/log"
|
||||
"github.com/rancher/rke/pki"
|
||||
"github.com/rancher/rke/services"
|
||||
"github.com/rancher/rke/templates"
|
||||
"github.com/sirupsen/logrus"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
const (
|
||||
NetworkPluginResourceName = "rke-network-plugin"
|
||||
|
||||
PortCheckContainer = "rke-port-checker"
|
||||
PortListenContainer = "rke-port-listener"
|
||||
|
||||
KubeAPIPort = "6443"
|
||||
EtcdPort1 = "2379"
|
||||
EtcdPort2 = "2380"
|
||||
ScedulerPort = "10251"
|
||||
ControllerPort = "10252"
|
||||
KubeletPort = "10250"
|
||||
KubeProxyPort = "10256"
|
||||
FlannetVXLANPortUDP = "8472"
|
||||
|
||||
FlannelNetworkPlugin = "flannel"
|
||||
FlannelImage = "flannel_image"
|
||||
FlannelCNIImage = "flannel_cni_image"
|
||||
@ -209,3 +231,231 @@ func (c *Cluster) getNetworkPluginManifest(pluginConfig map[string]string) (stri
|
||||
return "", fmt.Errorf("[network] Unsupported network plugin: %s", c.Network.Plugin)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) CheckClusterPorts(ctx context.Context) error {
|
||||
if err := c.deployTCPPortListeners(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.runServicePortChecks(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.checkKubeAPIPort(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
return c.removeTCPPortListeners(ctx)
|
||||
}
|
||||
|
||||
func (c *Cluster) checkKubeAPIPort(ctx context.Context) error {
|
||||
log.Infof(ctx, "[network] Checking KubeAPI port Control Plane hosts")
|
||||
for _, host := range c.ControlPlaneHosts {
|
||||
logrus.Debugf("[network] Checking KubeAPI port [%s] on host: %s", KubeAPIPort, host.Address)
|
||||
address := fmt.Sprintf("%s:%s", host.Address, KubeAPIPort)
|
||||
conn, err := net.Dial("tcp", address)
|
||||
if err != nil {
|
||||
return fmt.Errorf("[network] Can't access KubeAPI port [%s] on Control Plane host: %s", KubeAPIPort, host.Address)
|
||||
}
|
||||
conn.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) deployTCPPortListeners(ctx context.Context) error {
|
||||
log.Infof(ctx, "[network] Deploying port listener containers")
|
||||
var errgrp errgroup.Group
|
||||
|
||||
portList := []string{
|
||||
KubeAPIPort,
|
||||
EtcdPort1,
|
||||
EtcdPort2,
|
||||
ScedulerPort,
|
||||
ControllerPort,
|
||||
KubeletPort,
|
||||
KubeProxyPort,
|
||||
}
|
||||
portBindingList := []nat.PortBinding{}
|
||||
for _, portNumber := range portList {
|
||||
rawPort := fmt.Sprintf("0.0.0.0:%s:1337/tcp", portNumber)
|
||||
portMapping, _ := nat.ParsePortSpec(rawPort)
|
||||
portBindingList = append(portBindingList, portMapping[0].Binding)
|
||||
}
|
||||
|
||||
imageCfg := &container.Config{
|
||||
Image: c.SystemImages[AplineImage],
|
||||
Cmd: []string{
|
||||
"nc",
|
||||
"-kl",
|
||||
"-p",
|
||||
"1337",
|
||||
"-e",
|
||||
"echo",
|
||||
},
|
||||
ExposedPorts: nat.PortSet{
|
||||
"1337/tcp": {},
|
||||
},
|
||||
}
|
||||
|
||||
hostCfg := &container.HostConfig{
|
||||
PortBindings: nat.PortMap{
|
||||
"1337/tcp": portBindingList,
|
||||
},
|
||||
}
|
||||
|
||||
uniqHosts := c.getUniqueHostList()
|
||||
for _, host := range uniqHosts {
|
||||
runHost := host
|
||||
errgrp.Go(func() error {
|
||||
return docker.DoRunContainer(ctx, runHost.DClient, imageCfg, hostCfg, PortListenContainer, runHost.Address, "network")
|
||||
})
|
||||
}
|
||||
if err := errgrp.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof(ctx, "[network] Port listener containers deployed successfully")
|
||||
return nil
|
||||
}
|
||||
func (c *Cluster) removeTCPPortListeners(ctx context.Context) error {
|
||||
log.Infof(ctx, "[network] Removing port listener containers")
|
||||
var errgrp errgroup.Group
|
||||
|
||||
uniqHosts := c.getUniqueHostList()
|
||||
for _, host := range uniqHosts {
|
||||
runHost := host
|
||||
errgrp.Go(func() error {
|
||||
return docker.DoRemoveContainer(ctx, runHost.DClient, PortListenContainer, runHost.Address)
|
||||
})
|
||||
}
|
||||
if err := errgrp.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof(ctx, "[network] Port listener containers removed successfully")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) runServicePortChecks(ctx context.Context) error {
|
||||
var errgrp errgroup.Group
|
||||
// check etcd <-> etcd
|
||||
etcdPortList := []string{
|
||||
EtcdPort1,
|
||||
EtcdPort2,
|
||||
}
|
||||
// one etcd host is a pass
|
||||
if len(c.EtcdHosts) > 1 {
|
||||
log.Infof(ctx, "[network] Running etcd <-> etcd port checks")
|
||||
for _, host := range c.EtcdHosts {
|
||||
runHost := host
|
||||
errgrp.Go(func() error {
|
||||
return checkPlaneTCPPortsFromHost(ctx, runHost, etcdPortList, c.EtcdHosts, c.SystemImages[AplineImage])
|
||||
})
|
||||
}
|
||||
if err := errgrp.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// check all -> etcd connectivity
|
||||
log.Infof(ctx, "[network] Running all -> etcd port checks")
|
||||
for _, host := range c.ControlPlaneHosts {
|
||||
runHost := host
|
||||
errgrp.Go(func() error {
|
||||
return checkPlaneTCPPortsFromHost(ctx, runHost, etcdPortList, c.EtcdHosts, c.SystemImages[AplineImage])
|
||||
})
|
||||
}
|
||||
if err := errgrp.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
// Workers need to talk to etcd for calico
|
||||
for _, host := range c.WorkerHosts {
|
||||
runHost := host
|
||||
errgrp.Go(func() error {
|
||||
return checkPlaneTCPPortsFromHost(ctx, runHost, etcdPortList, c.EtcdHosts, c.SystemImages[AplineImage])
|
||||
})
|
||||
}
|
||||
if err := errgrp.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
// check controle plane -> Workers
|
||||
log.Infof(ctx, "[network] Running control plane -> etcd port checks")
|
||||
workerPortList := []string{
|
||||
KubeletPort,
|
||||
}
|
||||
for _, host := range c.ControlPlaneHosts {
|
||||
runHost := host
|
||||
errgrp.Go(func() error {
|
||||
return checkPlaneTCPPortsFromHost(ctx, runHost, workerPortList, c.WorkerHosts, c.SystemImages[AplineImage])
|
||||
})
|
||||
}
|
||||
if err := errgrp.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
// check workers -> control plane
|
||||
log.Infof(ctx, "[network] Running workers -> control plane port checks")
|
||||
controlPlanePortList := []string{
|
||||
KubeAPIPort,
|
||||
}
|
||||
for _, host := range c.WorkerHosts {
|
||||
runHost := host
|
||||
errgrp.Go(func() error {
|
||||
return checkPlaneTCPPortsFromHost(ctx, runHost, controlPlanePortList, c.ControlPlaneHosts, c.SystemImages[AplineImage])
|
||||
})
|
||||
}
|
||||
return errgrp.Wait()
|
||||
}
|
||||
|
||||
func checkPlaneTCPPortsFromHost(ctx context.Context, host *hosts.Host, portList []string, planeHosts []*hosts.Host, image string) error {
|
||||
hosts := []string{}
|
||||
for _, host := range planeHosts {
|
||||
hosts = append(hosts, host.Address)
|
||||
}
|
||||
imageCfg := &container.Config{
|
||||
Image: image,
|
||||
Tty: true,
|
||||
Env: []string{
|
||||
fmt.Sprintf("HOSTS=%s", strings.Join(hosts, " ")),
|
||||
fmt.Sprintf("PORTS=%s", strings.Join(portList, " ")),
|
||||
},
|
||||
Cmd: []string{
|
||||
"sh",
|
||||
"-c",
|
||||
"for host in $HOSTS; do for port in $PORTS ; do nc -z $host $port > /dev/null || echo $host $port ; done; done",
|
||||
},
|
||||
}
|
||||
if err := docker.DoRemoveContainer(ctx, host.DClient, PortCheckContainer, host.Address); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, nil, PortCheckContainer, host.Address, "network"); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := docker.WaitForContainer(ctx, host.DClient, PortCheckContainer); err != nil {
|
||||
return err
|
||||
}
|
||||
logs, err := docker.ReadContainerLogs(ctx, host.DClient, PortCheckContainer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer logs.Close()
|
||||
if err := docker.RemoveContainer(ctx, host.DClient, host.Address, PortCheckContainer); err != nil {
|
||||
return err
|
||||
}
|
||||
portCheckLogs, err := getPortCheckLogs(logs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(portCheckLogs) > 0 {
|
||||
|
||||
return fmt.Errorf("[netwok] Port check for ports: [%s] failed on host: [%s]", strings.Join(portCheckLogs, ", "), host.Address)
|
||||
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getPortCheckLogs(reader io.ReadCloser) ([]string, error) {
|
||||
logLines := bufio.NewScanner(reader)
|
||||
hostPortLines := []string{}
|
||||
for logLines.Scan() {
|
||||
logLine := strings.Split(logLines.Text(), " ")
|
||||
hostPortLines = append(hostPortLines, fmt.Sprintf("%s:%s", logLine[0], logLine[1]))
|
||||
}
|
||||
if err := logLines.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return hostPortLines, nil
|
||||
}
|
||||
|
@ -54,6 +54,10 @@ func ClusterUp(
|
||||
return APIURL, caCrt, clientCert, clientKey, err
|
||||
}
|
||||
|
||||
if err = kubeCluster.CheckClusterPorts(ctx); err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, err
|
||||
}
|
||||
|
||||
currentCluster, err := kubeCluster.GetClusterState(ctx)
|
||||
if err != nil {
|
||||
return APIURL, caCrt, clientCert, clientKey, err
|
||||
|
@ -289,3 +289,8 @@ func ReadFileFromContainer(ctx context.Context, dClient *client.Client, hostname
|
||||
}
|
||||
return string(file), nil
|
||||
}
|
||||
|
||||
func ReadContainerLogs(ctx context.Context, dClient *client.Client, containerName string) (io.ReadCloser, error) {
|
||||
return dClient.ContainerLogs(ctx, containerName, types.ContainerLogsOptions{ShowStdout: true})
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user