diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index aa340e96b91..1c497df9673 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -959,8 +959,6 @@ func (esp *endpointServicePair) IPPart() string { return esp.endpoint } -const noConnectionToDelete = "0 flow entries have been deleted" - // After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we // risk sending more traffic to it, all of which will be lost (because UDP). // This assumes the proxier mutex is held @@ -968,13 +966,9 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServ for epSvcPair := range connectionMap { if svcInfo, ok := proxier.serviceMap[epSvcPair.servicePortName]; ok && svcInfo.protocol == api.ProtocolUDP { endpointIP := epSvcPair.endpoint[0:strings.Index(epSvcPair.endpoint, ":")] - glog.V(2).Infof("Deleting connection tracking state for service IP %s, endpoint IP %s", svcInfo.clusterIP.String(), endpointIP) - err := utilproxy.ExecConntrackTool(proxier.exec, "-D", "--orig-dst", svcInfo.clusterIP.String(), "--dst-nat", endpointIP, "-p", "udp") - if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { - // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. - // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it - // is expensive to baby sit all udp connections to kubernetes services. - glog.Errorf("conntrack return with error: %v", err) + err := utilproxy.ClearUDPConntrackForPeers(proxier.exec, svcInfo.clusterIP.String(), endpointIP) + if err != nil { + glog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.servicePortName.String(), err) } } } @@ -1373,7 +1367,14 @@ func (proxier *Proxier) syncProxyRules() { continue } if lp.protocol == "udp" { - proxier.clearUDPConntrackForPort(lp.port) + // TODO: We might have multiple services using the same port, and this will clear conntrack for all of them. + // This is very low impact. The NodePort range is intentionally obscure, and unlikely to actually collide with real Services. + // This only affects UDP connections, which are not common. + // See issue: https://github.com/kubernetes/kubernetes/issues/49881 + err := utilproxy.ClearUDPConntrackForPort(proxier.exec, lp.port) + if err != nil { + glog.Errorf("Failed to clear udp conntrack for port %d, error: %v", lp.port, err) + } } replacementPortsMap[lp] = socket } // We're holding the port, so it's OK to install iptables rules. @@ -1642,26 +1643,13 @@ func (proxier *Proxier) syncProxyRules() { } // Finish housekeeping. - // TODO: these and clearUDPConntrackForPort() could be made more consistent. - utilproxy.DeleteServiceConnections(proxier.exec, staleServices.List()) - proxier.deleteEndpointConnections(endpointUpdateResult.staleEndpoints) -} - -// Clear UDP conntrack for port or all conntrack entries when port equal zero. -// When a packet arrives, it will not go through NAT table again, because it is not "the first" packet. -// The solution is clearing the conntrack. Known issus: -// https://github.com/docker/docker/issues/8795 -// https://github.com/kubernetes/kubernetes/issues/31983 -func (proxier *Proxier) clearUDPConntrackForPort(port int) { - glog.V(2).Infof("Deleting conntrack entries for udp connections") - if port > 0 { - err := utilproxy.ExecConntrackTool(proxier.exec, "-D", "-p", "udp", "--dport", strconv.Itoa(port)) - if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { - glog.Errorf("conntrack return with error: %v", err) + // TODO: these could be made more consistent. + for _, svcIP := range staleServices.List() { + if err := utilproxy.ClearUDPConntrackForIP(proxier.exec, svcIP); err != nil { + glog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err) } - } else { - glog.Errorf("Wrong port number. The port number must be greater than zero") } + proxier.deleteEndpointConnections(endpointUpdateResult.staleEndpoints) } // Join all words with spaces, terminate with newline and write to buf. diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 86f9ff31af5..846915f7fbd 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -500,7 +500,11 @@ func (proxier *Proxier) unmergeService(service *api.Service, existingPorts sets. } proxier.loadBalancer.DeleteService(serviceName) } - utilproxy.DeleteServiceConnections(proxier.exec, staleUDPServices.List()) + for _, svcIP := range staleUDPServices.List() { + if err := utilproxy.ClearUDPConntrackForIP(proxier.exec, svcIP); err != nil { + glog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err) + } + } } func (proxier *Proxier) OnServiceAdd(service *api.Service) { diff --git a/pkg/proxy/util/BUILD b/pkg/proxy/util/BUILD index 61acf95fca0..01c12f6e8d5 100644 --- a/pkg/proxy/util/BUILD +++ b/pkg/proxy/util/BUILD @@ -1,18 +1,10 @@ -package(default_visibility = ["//visibility:public"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", - "go_test", -) +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = ["conntrack.go"], - deps = [ - "//vendor/github.com/golang/glog:go_default_library", - "//vendor/k8s.io/utils/exec:go_default_library", - ], + visibility = ["//visibility:public"], + deps = ["//vendor/k8s.io/utils/exec:go_default_library"], ) go_test( @@ -36,4 +28,5 @@ filegroup( name = "all-srcs", srcs = [":package-srcs"], tags = ["automanaged"], + visibility = ["//visibility:public"], ) diff --git a/pkg/proxy/util/conntrack.go b/pkg/proxy/util/conntrack.go index 389de1f5556..6d7e4703ec0 100644 --- a/pkg/proxy/util/conntrack.go +++ b/pkg/proxy/util/conntrack.go @@ -18,30 +18,27 @@ package util import ( "fmt" + "strconv" "strings" "k8s.io/utils/exec" - - "github.com/golang/glog" ) // Utilities for dealing with conntrack const noConnectionToDelete = "0 flow entries have been deleted" -// DeleteServiceConnection uses the conntrack tool to delete the conntrack entries -// for the UDP connections specified by the given service IPs -func DeleteServiceConnections(execer exec.Interface, svcIPs []string) { - for _, ip := range svcIPs { - glog.V(2).Infof("Deleting connection tracking state for service IP %s", ip) - err := ExecConntrackTool(execer, "-D", "--orig-dst", ip, "-p", "udp") - if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { - // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. - // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it - // is expensive to baby-sit all udp connections to kubernetes services. - glog.Errorf("conntrack returned error: %v", err) - } +// DeleteServiceConnections uses the conntrack tool to delete the conntrack entries +// for the UDP connections specified by the given service IP +func ClearUDPConntrackForIP(execer exec.Interface, ip string) error { + err := ExecConntrackTool(execer, "-D", "--orig-dst", ip, "-p", "udp") + if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { + // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. + // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it + // is expensive to baby-sit all udp connections to kubernetes services. + return fmt.Errorf("error deleting connection tracking state for UDP service IP: %s, error: %v", ip, err) } + return nil } // ExecConntrackTool executes the conntrack tool using the given parameters @@ -56,3 +53,33 @@ func ExecConntrackTool(execer exec.Interface, parameters ...string) error { } return nil } + +// ClearUDPConntrackForPort uses the conntrack tool to delete the conntrack entries +// for the UDP connections specified by the port. +// When a packet arrives, it will not go through NAT table again, because it is not "the first" packet. +// The solution is clearing the conntrack. Known issues: +// https://github.com/docker/docker/issues/8795 +// https://github.com/kubernetes/kubernetes/issues/31983 +func ClearUDPConntrackForPort(execer exec.Interface, port int) error { + if port <= 0 { + return fmt.Errorf("Wrong port number. The port number must be greater than zero") + } + err := ExecConntrackTool(execer, "-D", "-p", "udp", "--dport", strconv.Itoa(port)) + if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { + return fmt.Errorf("error deleting conntrack entries for UDP port: %d, error: %v", port, err) + } + return nil +} + +// ClearUDPConntrackForPeers uses the conntrack tool to delete the conntrack entries +// for the UDP connections specified by the {origin, dest} IP pair. +func ClearUDPConntrackForPeers(execer exec.Interface, origin, dest string) error { + err := ExecConntrackTool(execer, "-D", "--orig-dst", origin, "--dst-nat", dest, "-p", "udp") + if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { + // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. + // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it + // is expensive to baby sit all udp connections to kubernetes services. + return fmt.Errorf("error deleting conntrack entries for UDP peer {%s, %s}, error: %v", origin, dest, err) + } + return nil +} diff --git a/pkg/proxy/util/conntrack_test.go b/pkg/proxy/util/conntrack_test.go index a788f1f67fc..6bd3ecedd6f 100644 --- a/pkg/proxy/util/conntrack_test.go +++ b/pkg/proxy/util/conntrack_test.go @@ -18,6 +18,7 @@ package util import ( "fmt" + "strconv" "strings" "testing" @@ -74,7 +75,7 @@ func TestExecConntrackTool(t *testing.T) { } } -func TestDeleteServiceConnections(t *testing.T) { +func TestClearUDPConntrackForIP(t *testing.T) { fcmd := fakeexec.FakeCmd{ CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, @@ -105,8 +106,10 @@ func TestDeleteServiceConnections(t *testing.T) { svcCount := 0 for i := range testCases { - DeleteServiceConnections(&fexec, testCases[i]) for _, ip := range testCases[i] { + if err := ClearUDPConntrackForIP(&fexec, ip); err != nil { + t.Errorf("Unexepected error: %v", err) + } expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", ip) execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ") if expectCommand != execCommand { @@ -119,3 +122,93 @@ func TestDeleteServiceConnections(t *testing.T) { } } } + +func TestClearUDPConntrackForPort(t *testing.T) { + fcmd := fakeexec.FakeCmd{ + CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ + func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, + func() ([]byte, error) { + return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.") + }, + }, + } + fexec := fakeexec.FakeExec{ + CommandScript: []fakeexec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, + } + + testCases := []string{ + "8080", + "9090", + } + svcCount := 0 + for i := range testCases { + portNum, _ := strconv.Atoi(testCases[i]) + err := ClearUDPConntrackForPort(&fexec, portNum) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + expectCommand := fmt.Sprintf("conntrack -D -p udp --dport %s", testCases[i]) + execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ") + if expectCommand != execCommand { + t.Errorf("Exepect comand: %s, but executed %s", expectCommand, execCommand) + } + svcCount += 1 + + if svcCount != fexec.CommandCalls { + t.Errorf("Exepect comand executed %d times, but got %d", svcCount, fexec.CommandCalls) + } + } +} + +func TestDeleteUDPConnections(t *testing.T) { + fcmd := fakeexec.FakeCmd{ + CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ + func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, + func() ([]byte, error) { + return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.") + }, + }, + } + fexec := fakeexec.FakeExec{ + CommandScript: []fakeexec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, + } + + testCases := []struct { + origin string + dest string + }{ + { + origin: "1.2.3.4", + dest: "10.20.30.40", + }, + { + origin: "2.3.4.5", + dest: "20.30.40.50", + }, + } + svcCount := 0 + for i := range testCases { + err := ClearUDPConntrackForPeers(&fexec, testCases[i].origin, testCases[i].dest) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", testCases[i].origin, testCases[i].dest) + execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ") + if expectCommand != execCommand { + t.Errorf("Exepect comand: %s, but executed %s", expectCommand, execCommand) + } + svcCount += 1 + + if svcCount != fexec.CommandCalls { + t.Errorf("Exepect comand executed %d times, but got %d", svcCount, fexec.CommandCalls) + } + } +}