Merge pull request #49478 from m1093782566/proxy-util-conntrack

Automatic merge from submit-queue (batch tested with PRs 50094, 48966, 49478, 50593, 49140)

[kube-proxy] Move UDP conntrack operations together to pkg/proxy/util/conntrack.go

**What this PR does / why we need it**:

Fix TODO in pkg/proxy/iptables.go, see

https://github.com/kubernetes/kubernetes/blob/master/pkg/proxy/iptables/proxier.go#L1632

Move UDP conntrack operations together to from `pkg/proxy/iptables/proxier.go` to `pkg/proxy/util/conntrack.go` so that make them more consistent and add some UTs.

**Which issue this PR fixes**

Fixes #49477

**Special notes for your reviewer**:

```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2017-08-14 12:14:59 -07:00 committed by GitHub
commit 41784b5c66
5 changed files with 161 additions and 56 deletions

View File

@ -959,8 +959,6 @@ func (esp *endpointServicePair) IPPart() string {
return esp.endpoint
}
const noConnectionToDelete = "0 flow entries have been deleted"
// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
// risk sending more traffic to it, all of which will be lost (because UDP).
// This assumes the proxier mutex is held
@ -968,13 +966,9 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServ
for epSvcPair := range connectionMap {
if svcInfo, ok := proxier.serviceMap[epSvcPair.servicePortName]; ok && svcInfo.protocol == api.ProtocolUDP {
endpointIP := epSvcPair.endpoint[0:strings.Index(epSvcPair.endpoint, ":")]
glog.V(2).Infof("Deleting connection tracking state for service IP %s, endpoint IP %s", svcInfo.clusterIP.String(), endpointIP)
err := utilproxy.ExecConntrackTool(proxier.exec, "-D", "--orig-dst", svcInfo.clusterIP.String(), "--dst-nat", endpointIP, "-p", "udp")
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
// is expensive to baby sit all udp connections to kubernetes services.
glog.Errorf("conntrack return with error: %v", err)
err := utilproxy.ClearUDPConntrackForPeers(proxier.exec, svcInfo.clusterIP.String(), endpointIP)
if err != nil {
glog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.servicePortName.String(), err)
}
}
}
@ -1373,7 +1367,14 @@ func (proxier *Proxier) syncProxyRules() {
continue
}
if lp.protocol == "udp" {
proxier.clearUDPConntrackForPort(lp.port)
// TODO: We might have multiple services using the same port, and this will clear conntrack for all of them.
// This is very low impact. The NodePort range is intentionally obscure, and unlikely to actually collide with real Services.
// This only affects UDP connections, which are not common.
// See issue: https://github.com/kubernetes/kubernetes/issues/49881
err := utilproxy.ClearUDPConntrackForPort(proxier.exec, lp.port)
if err != nil {
glog.Errorf("Failed to clear udp conntrack for port %d, error: %v", lp.port, err)
}
}
replacementPortsMap[lp] = socket
} // We're holding the port, so it's OK to install iptables rules.
@ -1642,26 +1643,13 @@ func (proxier *Proxier) syncProxyRules() {
}
// Finish housekeeping.
// TODO: these and clearUDPConntrackForPort() could be made more consistent.
utilproxy.DeleteServiceConnections(proxier.exec, staleServices.List())
proxier.deleteEndpointConnections(endpointUpdateResult.staleEndpoints)
}
// Clear UDP conntrack for port or all conntrack entries when port equal zero.
// When a packet arrives, it will not go through NAT table again, because it is not "the first" packet.
// The solution is clearing the conntrack. Known issus:
// https://github.com/docker/docker/issues/8795
// https://github.com/kubernetes/kubernetes/issues/31983
func (proxier *Proxier) clearUDPConntrackForPort(port int) {
glog.V(2).Infof("Deleting conntrack entries for udp connections")
if port > 0 {
err := utilproxy.ExecConntrackTool(proxier.exec, "-D", "-p", "udp", "--dport", strconv.Itoa(port))
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
glog.Errorf("conntrack return with error: %v", err)
// TODO: these could be made more consistent.
for _, svcIP := range staleServices.List() {
if err := utilproxy.ClearUDPConntrackForIP(proxier.exec, svcIP); err != nil {
glog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err)
}
} else {
glog.Errorf("Wrong port number. The port number must be greater than zero")
}
proxier.deleteEndpointConnections(endpointUpdateResult.staleEndpoints)
}
// Join all words with spaces, terminate with newline and write to buf.

View File

@ -500,7 +500,11 @@ func (proxier *Proxier) unmergeService(service *api.Service, existingPorts sets.
}
proxier.loadBalancer.DeleteService(serviceName)
}
utilproxy.DeleteServiceConnections(proxier.exec, staleUDPServices.List())
for _, svcIP := range staleUDPServices.List() {
if err := utilproxy.ClearUDPConntrackForIP(proxier.exec, svcIP); err != nil {
glog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err)
}
}
}
func (proxier *Proxier) OnServiceAdd(service *api.Service) {

View File

@ -1,18 +1,10 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["conntrack.go"],
deps = [
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
],
visibility = ["//visibility:public"],
deps = ["//vendor/k8s.io/utils/exec:go_default_library"],
)
go_test(
@ -36,4 +28,5 @@ filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -18,30 +18,27 @@ package util
import (
"fmt"
"strconv"
"strings"
"k8s.io/utils/exec"
"github.com/golang/glog"
)
// Utilities for dealing with conntrack
const noConnectionToDelete = "0 flow entries have been deleted"
// DeleteServiceConnection uses the conntrack tool to delete the conntrack entries
// for the UDP connections specified by the given service IPs
func DeleteServiceConnections(execer exec.Interface, svcIPs []string) {
for _, ip := range svcIPs {
glog.V(2).Infof("Deleting connection tracking state for service IP %s", ip)
err := ExecConntrackTool(execer, "-D", "--orig-dst", ip, "-p", "udp")
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
// is expensive to baby-sit all udp connections to kubernetes services.
glog.Errorf("conntrack returned error: %v", err)
}
// DeleteServiceConnections uses the conntrack tool to delete the conntrack entries
// for the UDP connections specified by the given service IP
func ClearUDPConntrackForIP(execer exec.Interface, ip string) error {
err := ExecConntrackTool(execer, "-D", "--orig-dst", ip, "-p", "udp")
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
// is expensive to baby-sit all udp connections to kubernetes services.
return fmt.Errorf("error deleting connection tracking state for UDP service IP: %s, error: %v", ip, err)
}
return nil
}
// ExecConntrackTool executes the conntrack tool using the given parameters
@ -56,3 +53,33 @@ func ExecConntrackTool(execer exec.Interface, parameters ...string) error {
}
return nil
}
// ClearUDPConntrackForPort uses the conntrack tool to delete the conntrack entries
// for the UDP connections specified by the port.
// When a packet arrives, it will not go through NAT table again, because it is not "the first" packet.
// The solution is clearing the conntrack. Known issues:
// https://github.com/docker/docker/issues/8795
// https://github.com/kubernetes/kubernetes/issues/31983
func ClearUDPConntrackForPort(execer exec.Interface, port int) error {
if port <= 0 {
return fmt.Errorf("Wrong port number. The port number must be greater than zero")
}
err := ExecConntrackTool(execer, "-D", "-p", "udp", "--dport", strconv.Itoa(port))
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
return fmt.Errorf("error deleting conntrack entries for UDP port: %d, error: %v", port, err)
}
return nil
}
// ClearUDPConntrackForPeers uses the conntrack tool to delete the conntrack entries
// for the UDP connections specified by the {origin, dest} IP pair.
func ClearUDPConntrackForPeers(execer exec.Interface, origin, dest string) error {
err := ExecConntrackTool(execer, "-D", "--orig-dst", origin, "--dst-nat", dest, "-p", "udp")
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
// is expensive to baby sit all udp connections to kubernetes services.
return fmt.Errorf("error deleting conntrack entries for UDP peer {%s, %s}, error: %v", origin, dest, err)
}
return nil
}

View File

@ -18,6 +18,7 @@ package util
import (
"fmt"
"strconv"
"strings"
"testing"
@ -74,7 +75,7 @@ func TestExecConntrackTool(t *testing.T) {
}
}
func TestDeleteServiceConnections(t *testing.T) {
func TestClearUDPConntrackForIP(t *testing.T) {
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
@ -105,8 +106,10 @@ func TestDeleteServiceConnections(t *testing.T) {
svcCount := 0
for i := range testCases {
DeleteServiceConnections(&fexec, testCases[i])
for _, ip := range testCases[i] {
if err := ClearUDPConntrackForIP(&fexec, ip); err != nil {
t.Errorf("Unexepected error: %v", err)
}
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", ip)
execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ")
if expectCommand != execCommand {
@ -119,3 +122,93 @@ func TestDeleteServiceConnections(t *testing.T) {
}
}
}
func TestClearUDPConntrackForPort(t *testing.T) {
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) {
return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.")
},
},
}
fexec := fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
testCases := []string{
"8080",
"9090",
}
svcCount := 0
for i := range testCases {
portNum, _ := strconv.Atoi(testCases[i])
err := ClearUDPConntrackForPort(&fexec, portNum)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
expectCommand := fmt.Sprintf("conntrack -D -p udp --dport %s", testCases[i])
execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ")
if expectCommand != execCommand {
t.Errorf("Exepect comand: %s, but executed %s", expectCommand, execCommand)
}
svcCount += 1
if svcCount != fexec.CommandCalls {
t.Errorf("Exepect comand executed %d times, but got %d", svcCount, fexec.CommandCalls)
}
}
}
func TestDeleteUDPConnections(t *testing.T) {
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) {
return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.")
},
},
}
fexec := fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
testCases := []struct {
origin string
dest string
}{
{
origin: "1.2.3.4",
dest: "10.20.30.40",
},
{
origin: "2.3.4.5",
dest: "20.30.40.50",
},
}
svcCount := 0
for i := range testCases {
err := ClearUDPConntrackForPeers(&fexec, testCases[i].origin, testCases[i].dest)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", testCases[i].origin, testCases[i].dest)
execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ")
if expectCommand != execCommand {
t.Errorf("Exepect comand: %s, but executed %s", expectCommand, execCommand)
}
svcCount += 1
if svcCount != fexec.CommandCalls {
t.Errorf("Exepect comand executed %d times, but got %d", svcCount, fexec.CommandCalls)
}
}
}