From 502d14afd42e407b85729e18e1e3cd988771cb25 Mon Sep 17 00:00:00 2001 From: Dane LeBlanc Date: Wed, 6 Sep 2017 10:30:32 -0400 Subject: [PATCH] Add required family flag for conntrack IPv6 operation This change causes kube-proxy to supply the required "-f ipv6" family flag whenever the conntrack utility is executed and the associated service is using IPv6. This change is required for IPv6-only operation. Note that unit test coverage for the 2-line changes in pkg/proxy/iptables/proxier.go and /pkg/proxy/ipvs/proxier.go will need to be added after support for IPv6 service addresses is added to these files. For pkg/proxy/iptables/proxier.go, this coverage will be added either with PR #48551. fixes #52027 --- pkg/proxy/iptables/proxier.go | 3 +- pkg/proxy/ipvs/proxier.go | 3 +- pkg/proxy/util/conntrack.go | 26 +++++-- pkg/proxy/util/conntrack_test.go | 122 ++++++++++++++++++------------- 4 files changed, 96 insertions(+), 58 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 6d3bcc3606f..565cbed33d7 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -1342,7 +1342,8 @@ func (proxier *Proxier) syncProxyRules() { // This is very low impact. The NodePort range is intentionally obscure, and unlikely to actually collide with real Services. // This only affects UDP connections, which are not common. // See issue: https://github.com/kubernetes/kubernetes/issues/49881 - err := utilproxy.ClearUDPConntrackForPort(proxier.exec, lp.Port) + isIPv6 := svcInfo.clusterIP.To4() != nil + err := utilproxy.ClearUDPConntrackForPort(proxier.exec, lp.Port, isIPv6) if err != nil { glog.Errorf("Failed to clear udp conntrack for port %d, error: %v", lp.Port, err) } diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 6c4ad357240..39416a1c3e2 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -1126,7 +1126,8 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { continue } if lp.Protocol == "udp" { - utilproxy.ClearUDPConntrackForPort(proxier.exec, lp.Port) + isIPv6 := svcInfo.clusterIP.To4() != nil + utilproxy.ClearUDPConntrackForPort(proxier.exec, lp.Port, isIPv6) } replacementPortsMap[lp] = socket } // We're holding the port, so it's OK to install ipvs rules. diff --git a/pkg/proxy/util/conntrack.go b/pkg/proxy/util/conntrack.go index 6d7e4703ec0..504085a5ae2 100644 --- a/pkg/proxy/util/conntrack.go +++ b/pkg/proxy/util/conntrack.go @@ -18,6 +18,7 @@ package util import ( "fmt" + "net" "strconv" "strings" @@ -28,10 +29,23 @@ import ( const noConnectionToDelete = "0 flow entries have been deleted" -// DeleteServiceConnections uses the conntrack tool to delete the conntrack entries +func isIPv6(ip string) bool { + netIP := net.ParseIP(ip) + return netIP != nil && netIP.To4() == nil +} + +func parametersWithFamily(isIPv6 bool, parameters ...string) []string { + if isIPv6 { + parameters = append(parameters, "-f", "ipv6") + } + return parameters +} + +// ClearUDPConntrackForIP uses the conntrack tool to delete the conntrack entries // for the UDP connections specified by the given service IP func ClearUDPConntrackForIP(execer exec.Interface, ip string) error { - err := ExecConntrackTool(execer, "-D", "--orig-dst", ip, "-p", "udp") + parameters := parametersWithFamily(isIPv6(ip), "-D", "--orig-dst", ip, "-p", "udp") + err := ExecConntrackTool(execer, parameters...) if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it @@ -60,11 +74,12 @@ func ExecConntrackTool(execer exec.Interface, parameters ...string) error { // The solution is clearing the conntrack. Known issues: // https://github.com/docker/docker/issues/8795 // https://github.com/kubernetes/kubernetes/issues/31983 -func ClearUDPConntrackForPort(execer exec.Interface, port int) error { +func ClearUDPConntrackForPort(execer exec.Interface, port int, isIPv6 bool) error { if port <= 0 { return fmt.Errorf("Wrong port number. The port number must be greater than zero") } - err := ExecConntrackTool(execer, "-D", "-p", "udp", "--dport", strconv.Itoa(port)) + parameters := parametersWithFamily(isIPv6, "-D", "-p", "udp", "--dport", strconv.Itoa(port)) + err := ExecConntrackTool(execer, parameters...) if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { return fmt.Errorf("error deleting conntrack entries for UDP port: %d, error: %v", port, err) } @@ -74,7 +89,8 @@ func ClearUDPConntrackForPort(execer exec.Interface, port int) error { // ClearUDPConntrackForPeers uses the conntrack tool to delete the conntrack entries // for the UDP connections specified by the {origin, dest} IP pair. func ClearUDPConntrackForPeers(execer exec.Interface, origin, dest string) error { - err := ExecConntrackTool(execer, "-D", "--orig-dst", origin, "--dst-nat", dest, "-p", "udp") + parameters := parametersWithFamily(isIPv6(origin), "-D", "--orig-dst", origin, "--dst-nat", dest, "-p", "udp") + err := ExecConntrackTool(execer, parameters...) if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it diff --git a/pkg/proxy/util/conntrack_test.go b/pkg/proxy/util/conntrack_test.go index 6bd3ecedd6f..6c0db300029 100644 --- a/pkg/proxy/util/conntrack_test.go +++ b/pkg/proxy/util/conntrack_test.go @@ -18,7 +18,6 @@ package util import ( "fmt" - "strconv" "strings" "testing" @@ -26,13 +25,20 @@ import ( fakeexec "k8s.io/utils/exec/testing" ) +func familyParamStr(isIPv6 bool) string { + if isIPv6 { + return " -f ipv6" + } + return "" +} + func TestExecConntrackTool(t *testing.T) { fcmd := fakeexec.FakeCmd{ CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, func() ([]byte, error) { - return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.") + return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted") }, }, } @@ -81,8 +87,9 @@ func TestClearUDPConntrackForIP(t *testing.T) { func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, func() ([]byte, error) { - return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.") + return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted") }, + func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, }, } fexec := fakeexec.FakeExec{ @@ -90,36 +97,35 @@ func TestClearUDPConntrackForIP(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, } - testCases := [][]string{ - { - "10.240.0.3", - "10.240.0.5", - }, - { - "10.240.0.4", - }, + testCases := []struct { + name string + ip string + }{ + {"IPv4 success", "10.240.0.3"}, + {"IPv4 success", "10.240.0.5"}, + {"IPv4 simulated error", "10.240.0.4"}, + {"IPv6 success", "2001:db8::10"}, } svcCount := 0 - for i := range testCases { - for _, ip := range testCases[i] { - if err := ClearUDPConntrackForIP(&fexec, ip); err != nil { - t.Errorf("Unexepected error: %v", err) - } - expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", ip) - execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ") - if expectCommand != execCommand { - t.Errorf("Exepect comand: %s, but executed %s", expectCommand, execCommand) - } - svcCount += 1 + for _, tc := range testCases { + if err := ClearUDPConntrackForIP(&fexec, tc.ip); err != nil { + t.Errorf("%s test case:, Unexpected error: %v", tc.name, err) } - if svcCount != fexec.CommandCalls { - t.Errorf("Exepect comand executed %d times, but got %d", svcCount, fexec.CommandCalls) + expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", tc.ip) + familyParamStr(isIPv6(tc.ip)) + execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ") + if expectCommand != execCommand { + t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand) } + svcCount++ + } + if svcCount != fexec.CommandCalls { + t.Errorf("Expect command executed %d times, but got %d", svcCount, fexec.CommandCalls) } } @@ -128,39 +134,44 @@ func TestClearUDPConntrackForPort(t *testing.T) { CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, func() ([]byte, error) { - return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.") + return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted") }, + func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, }, } fexec := fakeexec.FakeExec{ CommandScript: []fakeexec.FakeCommandAction{ func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, } - testCases := []string{ - "8080", - "9090", + testCases := []struct { + name string + port int + isIPv6 bool + }{ + {"IPv4, no error", 8080, false}, + {"IPv4, simulated error", 9090, false}, + {"IPv6, no error", 6666, true}, } svcCount := 0 - for i := range testCases { - portNum, _ := strconv.Atoi(testCases[i]) - err := ClearUDPConntrackForPort(&fexec, portNum) + for _, tc := range testCases { + err := ClearUDPConntrackForPort(&fexec, tc.port, tc.isIPv6) if err != nil { - t.Errorf("Unexpected error: %v", err) + t.Errorf("%s test case: Unexpected error: %v", tc.name, err) } - expectCommand := fmt.Sprintf("conntrack -D -p udp --dport %s", testCases[i]) + expectCommand := fmt.Sprintf("conntrack -D -p udp --dport %d", tc.port) + familyParamStr(tc.isIPv6) execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ") if expectCommand != execCommand { - t.Errorf("Exepect comand: %s, but executed %s", expectCommand, execCommand) - } - svcCount += 1 - - if svcCount != fexec.CommandCalls { - t.Errorf("Exepect comand executed %d times, but got %d", svcCount, fexec.CommandCalls) + t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand) } + svcCount++ + } + if svcCount != fexec.CommandCalls { + t.Errorf("Expect command executed %d times, but got %d", svcCount, fexec.CommandCalls) } } @@ -169,46 +180,55 @@ func TestDeleteUDPConnections(t *testing.T) { CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, func() ([]byte, error) { - return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.") + return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted") }, + func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, }, } fexec := fakeexec.FakeExec{ CommandScript: []fakeexec.FakeCommandAction{ func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, } testCases := []struct { + name string origin string dest string }{ { + name: "IPv4 success", origin: "1.2.3.4", dest: "10.20.30.40", }, { + name: "IPv4 simulated failure", origin: "2.3.4.5", dest: "20.30.40.50", }, + { + name: "IPv6 success", + origin: "fd00::600d:f00d", + dest: "2001:db8::5", + }, } svcCount := 0 - for i := range testCases { - err := ClearUDPConntrackForPeers(&fexec, testCases[i].origin, testCases[i].dest) + for i, tc := range testCases { + err := ClearUDPConntrackForPeers(&fexec, tc.origin, tc.dest) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Errorf("%s test case: unexpected error: %v", tc.name, err) } - expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", testCases[i].origin, testCases[i].dest) - execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ") + expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.origin, tc.dest) + familyParamStr(isIPv6(tc.origin)) + execCommand := strings.Join(fcmd.CombinedOutputLog[i], " ") if expectCommand != execCommand { - t.Errorf("Exepect comand: %s, but executed %s", expectCommand, execCommand) - } - svcCount += 1 - - if svcCount != fexec.CommandCalls { - t.Errorf("Exepect comand executed %d times, but got %d", svcCount, fexec.CommandCalls) + t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand) } + svcCount++ + } + if svcCount != fexec.CommandCalls { + t.Errorf("Expect command executed %d times, but got %d", svcCount, fexec.CommandCalls) } }