kube-proxy: clear conntrack entries after rules are in place

Clear conntrack entries for UDP NodePorts,
this has to be done AFTER the iptables rules are programmed.
It can happen that traffic to the NodePort hits the host before
the iptables rules are programmed this will create an stale entry
in conntrack that will blackhole the traffic, so we need to
clear it ONLY when the service has endpoints.
This commit is contained in:
Antonio Ojea 2021-02-02 09:46:00 +01:00
parent d3fce91fdc
commit ed21a0e16c
3 changed files with 109 additions and 17 deletions

View File

@ -25,6 +25,7 @@ go_library(
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/discovery/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",

View File

@ -36,6 +36,7 @@ import (
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
@ -835,14 +836,23 @@ func (proxier *Proxier) syncProxyRules() {
serviceUpdateResult := proxier.serviceMap.Update(proxier.serviceChanges)
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
staleServices := serviceUpdateResult.UDPStaleClusterIP
// We need to detect stale connections to UDP Services so we
// can clean dangling conntrack entries that can blackhole traffic.
conntrackCleanupServiceIPs := serviceUpdateResult.UDPStaleClusterIP
conntrackCleanupServiceNodePorts := sets.NewInt()
// merge stale services gathered from updateEndpointsMap
// an UDP service that changes from 0 to non-0 endpoints is considered stale.
for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "svcPortName", svcPortName.String(), "clusterIP", svcInfo.ClusterIP().String())
staleServices.Insert(svcInfo.ClusterIP().String())
conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String())
for _, extIP := range svcInfo.ExternalIPStrings() {
staleServices.Insert(extIP)
conntrackCleanupServiceIPs.Insert(extIP)
}
nodePort := svcInfo.NodePort()
if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 {
klog.V(2).Infof("Stale %s service NodePort %v -> %d", strings.ToLower(string(svcInfo.Protocol())), svcPortName, nodePort)
conntrackCleanupServiceNodePorts.Insert(nodePort)
}
}
}
@ -1278,16 +1288,6 @@ func (proxier *Proxier) syncProxyRules() {
klog.ErrorS(err, "can't open port, skipping this nodePort", "port", lp.String())
continue
}
if lp.Protocol == "udp" {
// TODO: We might have multiple services using the same port, and this will clear conntrack for all of them.
// This is very low impact. The NodePort range is intentionally obscure, and unlikely to actually collide with real Services.
// This only affects UDP connections, which are not common.
// See issue: https://github.com/kubernetes/kubernetes/issues/49881
err := conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to clear udp conntrack", "port", lp.Port)
}
}
replacementPortsMap[lp] = socket
}
}
@ -1646,13 +1646,21 @@ func (proxier *Proxier) syncProxyRules() {
}
// Finish housekeeping.
// Clear stale conntrack entries for UDP Services, this has to be done AFTER the iptables rules are programmed.
// TODO: these could be made more consistent.
klog.V(4).InfoS("Deleting stale services", "ips", staleServices.UnsortedList())
for _, svcIP := range staleServices.UnsortedList() {
klog.V(4).InfoS("Deleting conntrack stale entries for Services", "ips", conntrackCleanupServiceIPs.UnsortedList())
for _, svcIP := range conntrackCleanupServiceIPs.UnsortedList() {
if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {
klog.ErrorS(err, "Failed to delete stale service connections", "ip", svcIP)
}
}
klog.V(4).InfoS("Deleting conntrack stale entries for Services", "nodeports", conntrackCleanupServiceNodePorts.UnsortedList())
for _, nodePort := range conntrackCleanupServiceNodePorts.UnsortedList() {
err := conntrack.ClearEntriesForPort(proxier.exec, nodePort, isIPv6, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to clear udp conntrack", "port", nodePort)
}
}
klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.StaleEndpoints)
proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
}

View File

@ -26,14 +26,13 @@ import (
"testing"
"time"
"k8s.io/klog/v2"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
@ -2848,4 +2847,88 @@ COMMIT
assert.NotEqual(t, expectedIPTables, fp.iptablesData.String())
}
func TestProxierDeleteNodePortStaleUDP(t *testing.T) {
fcmd := fakeexec.FakeCmd{}
fexec := fakeexec.FakeExec{
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
execFunc := func(cmd string, args ...string) exec.Cmd {
return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
}
cmdOutput := "1 flow entries have been deleted"
cmdFunc := func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, nil }
// Delete ClusterIP entries
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
fexec.CommandScript = append(fexec.CommandScript, execFunc)
// Delete NodePort entries
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
fexec.CommandScript = append(fexec.CommandScript, execFunc)
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt, false)
fp.exec = &fexec
svcIP := "10.20.30.41"
svcPort := 80
nodePort := 31201
svcPortName := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80",
Protocol: v1.ProtocolUDP,
}
makeServiceMap(fp,
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
svc.Spec.ClusterIP = svcIP
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolUDP,
NodePort: int32(nodePort),
}}
}),
)
makeEndpointsMap(fp)
fp.syncProxyRules()
if fexec.CommandCalls != 0 {
t.Fatalf("Created service without endpoints must not clear conntrack entries")
}
epIP := "10.180.0.1"
makeEndpointsMap(fp,
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: epIP,
}},
Ports: []v1.EndpointPort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolUDP,
}},
}}
}),
)
fp.syncProxyRules()
if fexec.CommandCalls != 2 {
t.Fatalf("Updated UDP service with new endpoints must clear UDP entries")
}
// Delete ClusterIP Conntrack entries
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p %s", svcIP, strings.ToLower(string((v1.ProtocolUDP))))
actualCommand := strings.Join(fcmd.CombinedOutputLog[0], " ")
if actualCommand != expectCommand {
t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand)
}
// Delete NodePort Conntrack entrie
expectCommand = fmt.Sprintf("conntrack -D -p %s --dport %d", strings.ToLower(string((v1.ProtocolUDP))), nodePort)
actualCommand = strings.Join(fcmd.CombinedOutputLog[1], " ")
if actualCommand != expectCommand {
t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand)
}
}
// TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces.