diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index ac49fb2093e..b577c9178ce 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -48,6 +48,7 @@ go_test( "//pkg/util/iptables:go_default_library", "//pkg/util/iptables/testing:go_default_library", "//vendor/github.com/davecgh/go-spew/spew:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index aba1b5d58ee..e2a492b8a20 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/davecgh/go-spew/spew" + "github.com/golang/glog" "fmt" "net" @@ -89,7 +90,7 @@ func TestReadLinesFromByteBuffer(t *testing.T) { } func TestGetChainLines(t *testing.T) { - iptables_save := `# Generated by iptables-save v1.4.7 on Wed Oct 29 14:56:01 2014 + iptablesSave := `# Generated by iptables-save v1.4.7 on Wed Oct 29 14:56:01 2014 *nat :PREROUTING ACCEPT [2136997:197881818] :POSTROUTING ACCEPT [4284525:258542680] @@ -102,11 +103,11 @@ func TestGetChainLines(t *testing.T) { utiliptables.ChainPostrouting: ":POSTROUTING ACCEPT [4284525:258542680]", utiliptables.ChainOutput: ":OUTPUT ACCEPT [5901660:357267963]", } - checkAllLines(t, utiliptables.TableNAT, []byte(iptables_save), expected) + checkAllLines(t, utiliptables.TableNAT, []byte(iptablesSave), expected) } func TestGetChainLinesMultipleTables(t *testing.T) { - iptables_save := `# Generated by iptables-save v1.4.21 on Fri Aug 7 14:47:37 2015 + iptablesSave := `# Generated by iptables-save v1.4.21 on Fri Aug 7 14:47:37 2015 *nat :PREROUTING ACCEPT [2:138] :INPUT ACCEPT [0:0] @@ -174,7 +175,7 @@ func TestGetChainLinesMultipleTables(t *testing.T) { utiliptables.Chain("KUBE-SVC-5555555555555555"): ":KUBE-SVC-5555555555555555 - [0:0]", utiliptables.Chain("KUBE-SVC-6666666666666666"): ":KUBE-SVC-6666666666666666 - [0:0]", } - checkAllLines(t, utiliptables.TableNAT, []byte(iptables_save), expected) + checkAllLines(t, utiliptables.TableNAT, []byte(iptablesSave), expected) } func newFakeServiceInfo(service proxy.ServicePortName, ip net.IP, port int, protocol api.Protocol, onlyNodeLocalEndpoints bool) *serviceInfo { @@ -189,67 +190,153 @@ func newFakeServiceInfo(service proxy.ServicePortName, ip net.IP, port int, prot } func TestDeleteEndpointConnections(t *testing.T) { - fcmd := fakeexec.FakeCmd{ - CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ - func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, - func() ([]byte, error) { - return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.") - }, + const ( + UDP = api.ProtocolUDP + TCP = api.ProtocolTCP + ) + testCases := []struct { + description string + svcName string + svcIP string + svcPort int + protocol api.Protocol + endpoint string // IP:port endpoint + epSvcPair endpointServicePair // Will be generated by test + simulatedErr string + }{ + { + description: "V4 UDP", + svcName: "v4-udp", + svcIP: "10.96.1.1", + svcPort: 80, + protocol: UDP, + endpoint: "10.240.0.3:80", + }, { + description: "V4 TCP", + svcName: "v4-tcp", + svcIP: "10.96.2.2", + svcPort: 80, + protocol: TCP, + endpoint: "10.240.0.4:80", + }, { + description: "V4 UDP, nothing to delete, benign error", + svcName: "v4-udp-nothing-to-delete", + svcIP: "10.96.1.1", + svcPort: 80, + protocol: UDP, + endpoint: "10.240.0.3:80", + simulatedErr: utilproxy.NoConnectionToDelete, + }, { + description: "V4 UDP, unexpected error, should be glogged", + svcName: "v4-udp-simulated-error", + svcIP: "10.96.1.1", + svcPort: 80, + protocol: UDP, + endpoint: "10.240.0.3:80", + simulatedErr: "simulated error", + }, { + description: "V6 UDP", + svcName: "v6-udp", + svcIP: "fd00:1234::20", + svcPort: 80, + protocol: UDP, + endpoint: "[2001:db8::2]:80", + }, { + description: "V6 TCP", + svcName: "v6-tcp", + svcIP: "fd00:1234::30", + svcPort: 80, + protocol: TCP, + endpoint: "[2001:db8::3]:80", }, } + + // Create a service map that has service info entries for all test cases + // and generate an endpoint service pair for each test case + serviceMap := make(map[proxy.ServicePortName]*serviceInfo) + for i, tc := range testCases { + svc := proxy.ServicePortName{ + NamespacedName: types.NamespacedName{Namespace: "ns1", Name: tc.svcName}, + Port: "p80", + } + serviceMap[svc] = newFakeServiceInfo(svc, net.ParseIP(tc.svcIP), 80, tc.protocol, false) + testCases[i].epSvcPair = endpointServicePair{ + endpoint: tc.endpoint, + servicePortName: svc, + } + } + + // Create a fake executor for the conntrack utility. This should only be + // invoked for UDP connections, since no conntrack cleanup is needed for TCP + fcmd := fakeexec.FakeCmd{} fexec := fakeexec.FakeExec{ - CommandScript: []fakeexec.FakeCommandAction{ - func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, - }, LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, } - - serviceMap := make(map[proxy.ServicePortName]*serviceInfo) - svc1 := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: "svc1"}, Port: "p80"} - svc2 := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: "svc2"}, Port: "p80"} - serviceMap[svc1] = newFakeServiceInfo(svc1, net.IPv4(10, 20, 30, 40), 80, api.ProtocolUDP, false) - serviceMap[svc2] = newFakeServiceInfo(svc1, net.IPv4(10, 20, 30, 41), 80, api.ProtocolTCP, false) - - fakeProxier := Proxier{exec: &fexec, serviceMap: serviceMap} - - testCases := []endpointServicePair{ - { - endpoint: "10.240.0.3:80", - servicePortName: svc1, - }, - { - endpoint: "10.240.0.4:80", - servicePortName: svc1, - }, - { - endpoint: "10.240.0.5:80", - servicePortName: svc2, - }, - { - endpoint: "[fd00:1::5]:8080", - servicePortName: svc2, - }, + execFunc := func(cmd string, args ...string) exec.Cmd { + return fakeexec.InitFakeCmd(&fcmd, cmd, args...) + } + for _, tc := range testCases { + if tc.protocol == UDP { + var cmdOutput string + var simErr error + if tc.simulatedErr == "" { + cmdOutput = "1 flow entries have been deleted" + } else { + simErr = fmt.Errorf(tc.simulatedErr) + } + cmdFunc := func() ([]byte, error) { return []byte(cmdOutput), simErr } + fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc) + fexec.CommandScript = append(fexec.CommandScript, execFunc) + } } - expectCommandExecCount := 0 - for i := range testCases { - input := map[endpointServicePair]bool{testCases[i]: true} + // Create a proxier using the fake conntrack executor and service map + fakeProxier := Proxier{exec: &fexec, serviceMap: serviceMap} + + // Run the test cases + for _, tc := range testCases { + priorExecs := fexec.CommandCalls + priorGlogErrs := glog.Stats.Error.Lines() + + input := map[endpointServicePair]bool{tc.epSvcPair: true} fakeProxier.deleteEndpointConnections(input) - svcInfo := fakeProxier.serviceMap[testCases[i].servicePortName] - if svcInfo.protocol == api.ProtocolUDP { - svcIp := svcInfo.clusterIP.String() - endpointIp := utilproxy.IPPart(testCases[i].endpoint) - expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", svcIp, endpointIp) - execCommand := strings.Join(fcmd.CombinedOutputLog[expectCommandExecCount], " ") - if expectCommand != execCommand { - t.Errorf("Exepect comand: %s, but executed %s", expectCommand, execCommand) + + // For UDP connections, check the executed conntrack command + var expExecs int + if tc.protocol == UDP { + isIPv6 := func(ip string) bool { + netIP := net.ParseIP(ip) + if netIP.To4() == nil { + return true + } + return false } - expectCommandExecCount += 1 + endpointIP := utilproxy.IPPart(tc.endpoint) + expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.svcIP, endpointIP) + if isIPv6(endpointIP) { + expectCommand += " -f ipv6" + } + actualCommand := strings.Join(fcmd.CombinedOutputLog[fexec.CommandCalls-1], " ") + if actualCommand != expectCommand { + t.Errorf("%s: Expected command: %s, but executed %s", tc.description, expectCommand, actualCommand) + } + expExecs = 1 } - if expectCommandExecCount != fexec.CommandCalls { - t.Errorf("Exepect comand executed %d times, but got %d", expectCommandExecCount, fexec.CommandCalls) + // Check the number of times conntrack was executed + execs := fexec.CommandCalls - priorExecs + if execs != expExecs { + t.Errorf("%s: Expected conntrack to be executed %d times, but got %d", tc.description, expExecs, execs) + } + + // Check the number of new glog errors + var expGlogErrs int64 + if tc.simulatedErr != "" && tc.simulatedErr != utilproxy.NoConnectionToDelete { + expGlogErrs = 1 + } + glogErrs := glog.Stats.Error.Lines() - priorGlogErrs + if glogErrs != expGlogErrs { + t.Errorf("%s: Expected %d glogged errors, but got %d", tc.description, expGlogErrs, glogErrs) } } } diff --git a/pkg/proxy/util/conntrack.go b/pkg/proxy/util/conntrack.go index 504085a5ae2..57e4fba32f5 100644 --- a/pkg/proxy/util/conntrack.go +++ b/pkg/proxy/util/conntrack.go @@ -27,7 +27,7 @@ import ( // Utilities for dealing with conntrack -const noConnectionToDelete = "0 flow entries have been deleted" +const NoConnectionToDelete = "0 flow entries have been deleted" func isIPv6(ip string) bool { netIP := net.ParseIP(ip) @@ -46,7 +46,7 @@ func parametersWithFamily(isIPv6 bool, parameters ...string) []string { func ClearUDPConntrackForIP(execer exec.Interface, ip string) error { parameters := parametersWithFamily(isIPv6(ip), "-D", "--orig-dst", ip, "-p", "udp") err := ExecConntrackTool(execer, parameters...) - if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { + if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) { // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it // is expensive to baby-sit all udp connections to kubernetes services. @@ -80,7 +80,7 @@ func ClearUDPConntrackForPort(execer exec.Interface, port int, isIPv6 bool) erro } parameters := parametersWithFamily(isIPv6, "-D", "-p", "udp", "--dport", strconv.Itoa(port)) err := ExecConntrackTool(execer, parameters...) - if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { + if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) { return fmt.Errorf("error deleting conntrack entries for UDP port: %d, error: %v", port, err) } return nil @@ -91,7 +91,7 @@ func ClearUDPConntrackForPort(execer exec.Interface, port int, isIPv6 bool) erro func ClearUDPConntrackForPeers(execer exec.Interface, origin, dest string) error { parameters := parametersWithFamily(isIPv6(origin), "-D", "--orig-dst", origin, "--dst-nat", dest, "-p", "udp") err := ExecConntrackTool(execer, parameters...) - if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { + if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) { // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it // is expensive to baby sit all udp connections to kubernetes services.