Correctly Clear conntrack entrty on endpoint changes when using nodeport

When using NodePort to connect to an endpoint using UDP, if the endpoint is deleted on
restoration of the endpoint traffic does not flow. This happens because conntrack holds
the state of the connection and the proxy does not correctly clear the conntrack entry
for the stale endpoint.

Introduced a new function to conntrack ClearEntriesForPortNAT that uses the endpointIP
and NodePort to remove the stale conntrack entry and allow traffic to resume when
the endpoint is restored.

Signed-off-by: Jacob Tanenbaum <jtanenba@redhat.com>
This commit is contained in:
Jacob Tanenbaum 2018-11-29 11:46:12 -05:00
parent 527d1c34cc
commit 144280e7a7
5 changed files with 77 additions and 1 deletions

View File

@ -607,7 +607,13 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceE
for _, epSvcPair := range connectionMap {
if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.GetProtocol() == v1.ProtocolUDP {
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIPString(), endpointIP, v1.ProtocolUDP)
nodePort := svcInfo.GetNodePort()
var err error
if nodePort != 0 {
err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, v1.ProtocolUDP)
} else {
err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIPString(), endpointIP, v1.ProtocolUDP)
}
if err != nil {
klog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err)
}

View File

@ -74,6 +74,11 @@ func (info *BaseServiceInfo) GetHealthCheckNodePort() int {
return info.HealthCheckNodePort
}
// GetNodePort is part of the ServicePort interface.
func (info *BaseServiceInfo) GetNodePort() int {
return info.NodePort
}
func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, service *v1.Service) *BaseServiceInfo {
onlyNodeLocalEndpoints := false
if apiservice.RequestsOnlyLocalTraffic(service) {

View File

@ -54,6 +54,8 @@ type ServicePort interface {
GetProtocol() v1.Protocol
// GetHealthCheckNodePort returns service health check node port if present. If return 0, it means not present.
GetHealthCheckNodePort() int
// GetNodePort returns a service Node port if present. If return 0, it means not present.
GetNodePort() int
}
// Endpoint in an interface which abstracts information about an endpoint.

View File

@ -107,3 +107,19 @@ func ClearEntriesForNAT(execer exec.Interface, origin, dest string, protocol v1.
}
return nil
}
// ClearEntriesForPortNAT uses the conntrack tool to delete the contrack entries
// for connections specified by the {dest IP, port} pair.
// Known issue:
// https://github.com/kubernetes/kubernetes/issues/59368
func ClearEntriesForPortNAT(execer exec.Interface, dest string, port int, protocol v1.Protocol) error {
if port <= 0 {
return fmt.Errorf("Wrong port number. The port number must be greater then zero")
}
parameters := parametersWithFamily(utilnet.IsIPv6String(dest), "-D", "-p", protoStr(protocol), "--dport", strconv.Itoa(port), "--dst-nat", dest)
err := Exec(execer, parameters...)
if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) {
return fmt.Errorf("error deleting conntrack entries for UDP port: %d, error: %v", port, err)
}
return nil
}

View File

@ -234,3 +234,50 @@ func TestDeleteUDPConnections(t *testing.T) {
t.Errorf("Expect command executed %d times, but got %d", svcCount, fexec.CommandCalls)
}
}
func TestClearUDPConntrackForPortNAT(t *testing.T) {
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) {
return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted")
},
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
},
}
fexec := fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
testCases := []struct {
name string
port int
dest string
}{
{
name: "IPv4 success",
port: 30211,
dest: "1.2.3.4",
},
}
svcCount := 0
for i, tc := range testCases {
err := ClearEntriesForPortNAT(&fexec, tc.dest, tc.port, v1.ProtocolUDP)
if err != nil {
t.Errorf("%s test case: unexpected error: %v", tc.name, err)
}
expectCommand := fmt.Sprintf("conntrack -D -p udp --dport %d --dst-nat %s", tc.port, tc.dest) + familyParamStr(utilnet.IsIPv6String(tc.dest))
execCommand := strings.Join(fcmd.CombinedOutputLog[i], " ")
if expectCommand != execCommand {
t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand)
}
svcCount++
}
if svcCount != fexec.CommandCalls {
t.Errorf("Expect command executed %d times, but got %d", svcCount, fexec.CommandCalls)
}
}