From de25d6cb9577b00b24ca2baf0187b34f483a0d6c Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Mon, 18 Feb 2019 23:52:24 -0800 Subject: [PATCH] Kube-proxy: REJECT LB IPs with no endpoints We REJECT every other case. Close this FIXME. To get this to work in all cases, we have to process service in filter.INPUT, since LB IPS might be manged as local addresses. --- pkg/proxy/iptables/proxier.go | 23 +++++++++++--- test/e2e/framework/service_util.go | 49 ++++++++++++++++++++++++++++++ test/e2e/network/service.go | 40 ++++++++++++++++++++++-- 3 files changed, 105 insertions(+), 7 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index a4f8dc8ef76..7633c6c9dcd 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -369,6 +369,7 @@ var iptablesJumpChains = []iptablesJumpChain{ {utiliptables.TableFilter, kubeExternalServicesChain, utiliptables.ChainInput, "kubernetes externally-visible service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}}, {utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainForward, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}}, {utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainOutput, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}}, + {utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainInput, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}}, {utiliptables.TableFilter, kubeForwardChain, utiliptables.ChainForward, "kubernetes forwarding rules", nil}, {utiliptables.TableNAT, kubeServicesChain, utiliptables.ChainOutput, "kubernetes service portals", nil}, {utiliptables.TableNAT, kubeServicesChain, utiliptables.ChainPrerouting, "kubernetes service portals", nil}, @@ -847,6 +848,7 @@ func (proxier *Proxier) syncProxyRules() { } writeLine(proxier.natRules, append(args, "-j", string(svcChain))...) } else { + // No endpoints. writeLine(proxier.filterRules, "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), @@ -917,6 +919,7 @@ func (proxier *Proxier) syncProxyRules() { // This covers cases like GCE load-balancers which get added to the local routing table. writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", string(svcChain))...) } else { + // No endpoints. writeLine(proxier.filterRules, "-A", string(kubeExternalServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), @@ -929,10 +932,10 @@ func (proxier *Proxier) syncProxyRules() { } // Capture load-balancer ingress. - if hasEndpoints { - fwChain := svcInfo.serviceFirewallChainName - for _, ingress := range svcInfo.LoadBalancerStatus.Ingress { - if ingress.IP != "" { + fwChain := svcInfo.serviceFirewallChainName + for _, ingress := range svcInfo.LoadBalancerStatus.Ingress { + if ingress.IP != "" { + if hasEndpoints { // create service firewall chain if chain, ok := existingNATChains[fwChain]; ok { writeBytesLine(proxier.natChains, chain) @@ -993,10 +996,19 @@ func (proxier *Proxier) syncProxyRules() { // If the packet was able to reach the end of firewall chain, then it did not get DNATed. // It means the packet cannot go thru the firewall, then mark it for DROP writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...) + } else { + // No endpoints. + writeLine(proxier.filterRules, + "-A", string(kubeServicesChain), + "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), + "-m", protocol, "-p", protocol, + "-d", utilproxy.ToCIDR(net.ParseIP(ingress.IP)), + "--dport", strconv.Itoa(svcInfo.Port), + "-j", "REJECT", + ) } } } - // FIXME: do we need REJECT rules for load-balancer ingress if !hasEndpoints? // Capture nodeports. If we had more than 2 rules it might be // worthwhile to make a new per-service chain for nodeport rules, but @@ -1078,6 +1090,7 @@ func (proxier *Proxier) syncProxyRules() { writeLine(proxier.natRules, append(args, "-j", string(svcXlbChain))...) } } else { + // No endpoints. writeLine(proxier.filterRules, "-A", string(kubeExternalServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), diff --git a/test/e2e/framework/service_util.go b/test/e2e/framework/service_util.go index 172f52bd89a..b98ddba72e8 100644 --- a/test/e2e/framework/service_util.go +++ b/test/e2e/framework/service_util.go @@ -738,6 +738,28 @@ func (j *ServiceTestJig) RunOrFail(namespace string, tweak func(rc *v1.Replicati return result } +func (j *ServiceTestJig) Scale(namespace string, replicas int) { + rc := j.Name + scale, err := j.Client.CoreV1().ReplicationControllers(namespace).GetScale(rc, metav1.GetOptions{}) + if err != nil { + Failf("Failed to get scale for RC %q: %v", rc, err) + } + + scale.Spec.Replicas = int32(replicas) + _, err = j.Client.CoreV1().ReplicationControllers(namespace).UpdateScale(rc, scale) + if err != nil { + Failf("Failed to scale RC %q: %v", rc, err) + } + pods, err := j.waitForPodsCreated(namespace, replicas) + if err != nil { + Failf("Failed waiting for pods: %v", err) + } + if err := j.waitForPodsReady(namespace, pods); err != nil { + Failf("Failed waiting for pods to be running: %v", err) + } + return +} + func (j *ServiceTestJig) waitForPdbReady(namespace string) error { timeout := 2 * time.Minute for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) { @@ -911,6 +933,20 @@ func (j *ServiceTestJig) TestNotReachableHTTP(host string, port int, timeout tim } } +func (j *ServiceTestJig) TestRejectedHTTP(host string, port int, timeout time.Duration) { + pollfn := func() (bool, error) { + result := PokeHTTP(host, port, "/", nil) + if result.Status == HTTPRefused { + return true, nil + } + return false, nil // caller can retry + } + + if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil { + Failf("HTTP service %v:%v not rejected: %v", host, port, err) + } +} + func (j *ServiceTestJig) TestReachableUDP(host string, port int, timeout time.Duration) { pollfn := func() (bool, error) { result := PokeUDP(host, port, "echo hello", &UDPPokeParams{ @@ -941,6 +977,19 @@ func (j *ServiceTestJig) TestNotReachableUDP(host string, port int, timeout time } } +func (j *ServiceTestJig) TestRejectedUDP(host string, port int, timeout time.Duration) { + pollfn := func() (bool, error) { + result := PokeUDP(host, port, "echo hello", &UDPPokeParams{Timeout: 3 * time.Second}) + if result.Status == UDPRefused { + return true, nil + } + return false, nil // caller can retry + } + if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil { + Failf("UDP service %v:%v not rejected: %v", host, port, err) + } +} + func (j *ServiceTestJig) GetHTTPContent(host string, port int, timeout time.Duration, url string) bytes.Buffer { var body bytes.Buffer if pollErr := wait.PollImmediate(Poll, timeout, func() (bool, error) { diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index a60fdd41388..55f743992da 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -791,11 +791,47 @@ var _ = SIGDescribe("Services", func() { jig.TestReachableUDP(nodeIP, udpNodePort, framework.KubeProxyLagTimeout) By("hitting the TCP service's LoadBalancer") - jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout) // this may actually recreate the LB + jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout) if loadBalancerSupportsUDP { By("hitting the UDP service's LoadBalancer") - jig.TestReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) // this may actually recreate the LB) + jig.TestReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) + } + + By("Scaling the pods to 0") + jig.Scale(ns1, 0) + jig.Scale(ns2, 0) + + By("looking for ICMP REJECT on the TCP service's NodePort") + jig.TestRejectedHTTP(nodeIP, tcpNodePort, framework.KubeProxyLagTimeout) + + By("looking for ICMP REJECT on the UDP service's NodePort") + jig.TestRejectedUDP(nodeIP, udpNodePort, framework.KubeProxyLagTimeout) + + By("looking for ICMP REJECT on the TCP service's LoadBalancer") + jig.TestRejectedHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout) + + if loadBalancerSupportsUDP { + By("looking for ICMP REJECT on the UDP service's LoadBalancer") + jig.TestRejectedUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) + } + + By("Scaling the pods to 1") + jig.Scale(ns1, 1) + jig.Scale(ns2, 1) + + By("hitting the TCP service's NodePort") + jig.TestReachableHTTP(nodeIP, tcpNodePort, framework.KubeProxyLagTimeout) + + By("hitting the UDP service's NodePort") + jig.TestReachableUDP(nodeIP, udpNodePort, framework.KubeProxyLagTimeout) + + By("hitting the TCP service's LoadBalancer") + jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout) + + if loadBalancerSupportsUDP { + By("hitting the UDP service's LoadBalancer") + jig.TestReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) } // Change the services back to ClusterIP.