From fa332f7e075b8df62ee6bcaaea60ecadb4b74410 Mon Sep 17 00:00:00 2001 From: galal-hussein Date: Mon, 28 Jan 2019 20:57:25 +0200 Subject: [PATCH] Revert "revert to skip network plugin port checks of udp port" This reverts commit ea4b16b1161e01f7810b928ddb6ec30b3d5d3ff0. Revert "Add port checks for network plugins" This reverts commit c73a58d45c2273b9a14b3cd650dcc8966fc3b8c3. --- cluster/network.go | 113 +++++++-------------------------------------- 1 file changed, 17 insertions(+), 96 deletions(-) diff --git a/cluster/network.go b/cluster/network.go index 6a3fd637..622c0938 100644 --- a/cluster/network.go +++ b/cluster/network.go @@ -27,20 +27,14 @@ const ( CPPortListenContainer = "rke-cp-port-listener" WorkerPortListenContainer = "rke-worker-port-listener" - KubeAPIPort = "6443" - EtcdPort1 = "2379" - EtcdPort2 = "2380" - ScedulerPort = "10251" - ControllerPort = "10252" - KubeletPort = "10250" - KubeProxyPort = "10256" - + KubeAPIPort = "6443" + EtcdPort1 = "2379" + EtcdPort2 = "2380" + ScedulerPort = "10251" + ControllerPort = "10252" + KubeletPort = "10250" + KubeProxyPort = "10256" FlannetVXLANPortUDP = "8472" - CanalVXLANPortUDP = "8472" - CalicoBGPPortTCP = "179" - WeaveMetricsPortTCP = "6781-6782" - WeaveNetPortTCP = "6783" - WeaveNetPortUDP = "6783-6784" ProtocolTCP = "TCP" ProtocolUDP = "UDP" @@ -119,27 +113,6 @@ var EtcdClientPortList = []string{ EtcdPort1, } -var FlannelUDPPortList = []string{ - FlannetVXLANPortUDP, -} - -var CanalUDPPortList = []string{ - CanalVXLANPortUDP, -} - -var CalicoTCPPortList = []string{ - CalicoBGPPortTCP, -} - -var WeaveTCPPortList = []string{ - WeaveMetricsPortTCP, - WeaveNetPortTCP, -} - -var WeaveUDPPortList = []string{ - WeaveNetPortUDP, -} - func (c *Cluster) deployNetworkPlugin(ctx context.Context) error { log.Infof(ctx, "[network] Setting up network plugin: %s", c.Network.Plugin) switch c.Network.Plugin { @@ -404,33 +377,6 @@ func removeListenerFromPlane(ctx context.Context, hostPlane []*hosts.Host, conta return errgrp.Wait() } -func (c *Cluster) runNetworkPluginPortChecks(ctx context.Context, host interface{}) error { - log.Infof(ctx, "[network] Invoking port checks of network plugin: %s", c.Network.Plugin) - switch c.Network.Plugin { - case FlannelNetworkPlugin: - return checkPlanePortsFromHost(ctx, host.(*hosts.Host), FlannelUDPPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap, false) - case CalicoNetworkPlugin: - return checkPlanePortsFromHost(ctx, host.(*hosts.Host), CalicoTCPPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap, true) - case CanalNetworkPlugin: - return checkPlanePortsFromHost(ctx, host.(*hosts.Host), CanalUDPPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap, false) - case WeaveNetworkPlugin: - err := checkPlanePortsFromHost(ctx, host.(*hosts.Host), WeaveTCPPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap, true) - if err != nil { - return err - } - err = checkPlanePortsFromHost(ctx, host.(*hosts.Host), WeaveUDPPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap, false) - if err != nil { - return err - } - case NoNetworkPlugin: - log.Infof(ctx, "[network] Not deploying a cluster network, expecting custom CNI") - return nil - default: - return fmt.Errorf("[network] Unsupported network plugin: %s", c.Network.Plugin) - } - return nil -} - func (c *Cluster) runServicePortChecks(ctx context.Context) error { var errgrp errgroup.Group // check etcd <-> etcd @@ -442,11 +388,7 @@ func (c *Cluster) runServicePortChecks(ctx context.Context) error { errgrp.Go(func() error { var errList []error for host := range hostsQueue { - err := checkPlanePortsFromHost(ctx, host.(*hosts.Host), EtcdPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap, true) - if err != nil { - errList = append(errList, err) - } - err = c.runNetworkPluginPortChecks(ctx, host) + err := checkPlaneTCPPortsFromHost(ctx, host.(*hosts.Host), EtcdPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap) if err != nil { errList = append(errList, err) } @@ -465,11 +407,7 @@ func (c *Cluster) runServicePortChecks(ctx context.Context) error { errgrp.Go(func() error { var errList []error for host := range hostsQueue { - err := checkPlanePortsFromHost(ctx, host.(*hosts.Host), EtcdClientPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap, true) - if err != nil { - errList = append(errList, err) - } - err = c.runNetworkPluginPortChecks(ctx, host) + err := checkPlaneTCPPortsFromHost(ctx, host.(*hosts.Host), EtcdClientPortList, c.EtcdHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap) if err != nil { errList = append(errList, err) } @@ -487,11 +425,7 @@ func (c *Cluster) runServicePortChecks(ctx context.Context) error { errgrp.Go(func() error { var errList []error for host := range hostsQueue { - err := checkPlanePortsFromHost(ctx, host.(*hosts.Host), WorkerPortList, c.WorkerHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap, true) - if err != nil { - errList = append(errList, err) - } - err = c.runNetworkPluginPortChecks(ctx, host) + err := checkPlaneTCPPortsFromHost(ctx, host.(*hosts.Host), WorkerPortList, c.WorkerHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap) if err != nil { errList = append(errList, err) } @@ -509,11 +443,7 @@ func (c *Cluster) runServicePortChecks(ctx context.Context) error { errgrp.Go(func() error { var errList []error for host := range hostsQueue { - err := checkPlanePortsFromHost(ctx, host.(*hosts.Host), ControlPlanePortList, c.ControlPlaneHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap, true) - if err != nil { - errList = append(errList, err) - } - err = c.runNetworkPluginPortChecks(ctx, host) + err := checkPlaneTCPPortsFromHost(ctx, host.(*hosts.Host), ControlPlanePortList, c.ControlPlaneHosts, c.SystemImages.Alpine, c.PrivateRegistriesMap) if err != nil { errList = append(errList, err) } @@ -524,22 +454,9 @@ func (c *Cluster) runServicePortChecks(ctx context.Context) error { return errgrp.Wait() } -func checkPlanePortsFromHost(ctx context.Context, host *hosts.Host, portList []string, planeHosts []*hosts.Host, image string, prsMap map[string]v3.PrivateRegistry, tcp bool) error { +func checkPlaneTCPPortsFromHost(ctx context.Context, host *hosts.Host, portList []string, planeHosts []*hosts.Host, image string, prsMap map[string]v3.PrivateRegistry) error { var hosts []string - cmd := []string{ - "sh", - "-c", - } - if tcp { - cmd = append(cmd, "for host in $HOSTS; do for port in $PORTS ; do echo \"Checking host ${host} on port ${port}\" >&1 & nc -w5 -z $host $port > /dev/null || echo \"${host}:${port}\" >&2 & done; wait; done") - } else { - // TODO: add proper UDP port checks, and because UDP is not reliable so it has no acknowledgment, retransmission, or timeout. - // Also the k8s layer 3 network like flannel will filtering the host port like 8472 once is installed, so commands like `nc -w5 -uzv $host $port` will always return the same message regardless of the udp port is opened or not. - // More details on: https://github.com/rancher/rke/issues/1102 - return nil - } - for _, host := range planeHosts { hosts = append(hosts, host.InternalAddress) } @@ -549,7 +466,11 @@ func checkPlanePortsFromHost(ctx context.Context, host *hosts.Host, portList []s fmt.Sprintf("HOSTS=%s", strings.Join(hosts, " ")), fmt.Sprintf("PORTS=%s", strings.Join(portList, " ")), }, - Cmd: cmd, + Cmd: []string{ + "sh", + "-c", + "for host in $HOSTS; do for port in $PORTS ; do echo \"Checking host ${host} on port ${port}\" >&1 & nc -w 5 -z $host $port > /dev/null || echo \"${host}:${port}\" >&2 & done; wait; done", + }, } hostCfg := &container.HostConfig{ NetworkMode: "host",