diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index 2bd97cf9eef..d4795d93b69 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -25,6 +25,7 @@ go_library( "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/discovery/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index c298a1b695d..be9804148e6 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -36,6 +36,7 @@ import ( v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1beta1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/record" @@ -835,14 +836,23 @@ func (proxier *Proxier) syncProxyRules() { serviceUpdateResult := proxier.serviceMap.Update(proxier.serviceChanges) endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges) - staleServices := serviceUpdateResult.UDPStaleClusterIP + // We need to detect stale connections to UDP Services so we + // can clean dangling conntrack entries that can blackhole traffic. + conntrackCleanupServiceIPs := serviceUpdateResult.UDPStaleClusterIP + conntrackCleanupServiceNodePorts := sets.NewInt() // merge stale services gathered from updateEndpointsMap + // an UDP service that changes from 0 to non-0 endpoints is considered stale. for _, svcPortName := range endpointUpdateResult.StaleServiceNames { if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "svcPortName", svcPortName.String(), "clusterIP", svcInfo.ClusterIP().String()) - staleServices.Insert(svcInfo.ClusterIP().String()) + conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String()) for _, extIP := range svcInfo.ExternalIPStrings() { - staleServices.Insert(extIP) + conntrackCleanupServiceIPs.Insert(extIP) + } + nodePort := svcInfo.NodePort() + if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 { + klog.V(2).Infof("Stale %s service NodePort %v -> %d", strings.ToLower(string(svcInfo.Protocol())), svcPortName, nodePort) + conntrackCleanupServiceNodePorts.Insert(nodePort) } } } @@ -1278,16 +1288,6 @@ func (proxier *Proxier) syncProxyRules() { klog.ErrorS(err, "can't open port, skipping this nodePort", "port", lp.String()) continue } - if lp.Protocol == "udp" { - // TODO: We might have multiple services using the same port, and this will clear conntrack for all of them. - // This is very low impact. The NodePort range is intentionally obscure, and unlikely to actually collide with real Services. - // This only affects UDP connections, which are not common. - // See issue: https://github.com/kubernetes/kubernetes/issues/49881 - err := conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP) - if err != nil { - klog.ErrorS(err, "Failed to clear udp conntrack", "port", lp.Port) - } - } replacementPortsMap[lp] = socket } } @@ -1646,13 +1646,21 @@ func (proxier *Proxier) syncProxyRules() { } // Finish housekeeping. + // Clear stale conntrack entries for UDP Services, this has to be done AFTER the iptables rules are programmed. // TODO: these could be made more consistent. - klog.V(4).InfoS("Deleting stale services", "ips", staleServices.UnsortedList()) - for _, svcIP := range staleServices.UnsortedList() { + klog.V(4).InfoS("Deleting conntrack stale entries for Services", "ips", conntrackCleanupServiceIPs.UnsortedList()) + for _, svcIP := range conntrackCleanupServiceIPs.UnsortedList() { if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil { klog.ErrorS(err, "Failed to delete stale service connections", "ip", svcIP) } } + klog.V(4).InfoS("Deleting conntrack stale entries for Services", "nodeports", conntrackCleanupServiceNodePorts.UnsortedList()) + for _, nodePort := range conntrackCleanupServiceNodePorts.UnsortedList() { + err := conntrack.ClearEntriesForPort(proxier.exec, nodePort, isIPv6, v1.ProtocolUDP) + if err != nil { + klog.ErrorS(err, "Failed to clear udp conntrack", "port", nodePort) + } + } klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.StaleEndpoints) proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints) } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index a5097cfbaf9..cb3a96dccba 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -26,14 +26,13 @@ import ( "testing" "time" - "k8s.io/klog/v2" - "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/healthcheck" utilproxy "k8s.io/kubernetes/pkg/proxy/util" @@ -2848,4 +2847,88 @@ COMMIT assert.NotEqual(t, expectedIPTables, fp.iptablesData.String()) } +func TestProxierDeleteNodePortStaleUDP(t *testing.T) { + fcmd := fakeexec.FakeCmd{} + fexec := fakeexec.FakeExec{ + LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, + } + execFunc := func(cmd string, args ...string) exec.Cmd { + return fakeexec.InitFakeCmd(&fcmd, cmd, args...) + } + cmdOutput := "1 flow entries have been deleted" + cmdFunc := func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, nil } + + // Delete ClusterIP entries + fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc) + fexec.CommandScript = append(fexec.CommandScript, execFunc) + // Delete NodePort entries + fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc) + fexec.CommandScript = append(fexec.CommandScript, execFunc) + + ipt := iptablestest.NewFake() + fp := NewFakeProxier(ipt, false) + fp.exec = &fexec + + svcIP := "10.20.30.41" + svcPort := 80 + nodePort := 31201 + svcPortName := proxy.ServicePortName{ + NamespacedName: makeNSN("ns1", "svc1"), + Port: "p80", + Protocol: v1.ProtocolUDP, + } + + makeServiceMap(fp, + makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { + svc.Spec.ClusterIP = svcIP + svc.Spec.Ports = []v1.ServicePort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: v1.ProtocolUDP, + NodePort: int32(nodePort), + }} + }), + ) + makeEndpointsMap(fp) + + fp.syncProxyRules() + if fexec.CommandCalls != 0 { + t.Fatalf("Created service without endpoints must not clear conntrack entries") + } + + epIP := "10.180.0.1" + makeEndpointsMap(fp, + makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) { + ept.Subsets = []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{ + IP: epIP, + }}, + Ports: []v1.EndpointPort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: v1.ProtocolUDP, + }}, + }} + }), + ) + + fp.syncProxyRules() + if fexec.CommandCalls != 2 { + t.Fatalf("Updated UDP service with new endpoints must clear UDP entries") + } + + // Delete ClusterIP Conntrack entries + expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p %s", svcIP, strings.ToLower(string((v1.ProtocolUDP)))) + actualCommand := strings.Join(fcmd.CombinedOutputLog[0], " ") + if actualCommand != expectCommand { + t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand) + } + // Delete NodePort Conntrack entrie + expectCommand = fmt.Sprintf("conntrack -D -p %s --dport %d", strings.ToLower(string((v1.ProtocolUDP))), nodePort) + actualCommand = strings.Join(fcmd.CombinedOutputLog[1], " ") + if actualCommand != expectCommand { + t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand) + } +} + // TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces.