mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-06 10:43:56 +00:00
Merge pull request #98305 from aojea/holdports
kube-proxy has to clear NodePort stale UDP entries
This commit is contained in:
commit
659b4dc4a8
@ -25,6 +25,7 @@ go_library(
|
|||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/api/discovery/v1beta1:go_default_library",
|
"//staging/src/k8s.io/api/discovery/v1beta1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
|
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
|
||||||
|
@ -36,6 +36,7 @@ import (
|
|||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
discovery "k8s.io/api/discovery/v1beta1"
|
discovery "k8s.io/api/discovery/v1beta1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
@ -835,14 +836,23 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
serviceUpdateResult := proxier.serviceMap.Update(proxier.serviceChanges)
|
serviceUpdateResult := proxier.serviceMap.Update(proxier.serviceChanges)
|
||||||
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
|
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
|
||||||
|
|
||||||
staleServices := serviceUpdateResult.UDPStaleClusterIP
|
// We need to detect stale connections to UDP Services so we
|
||||||
|
// can clean dangling conntrack entries that can blackhole traffic.
|
||||||
|
conntrackCleanupServiceIPs := serviceUpdateResult.UDPStaleClusterIP
|
||||||
|
conntrackCleanupServiceNodePorts := sets.NewInt()
|
||||||
// merge stale services gathered from updateEndpointsMap
|
// merge stale services gathered from updateEndpointsMap
|
||||||
|
// an UDP service that changes from 0 to non-0 endpoints is considered stale.
|
||||||
for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
|
for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
|
||||||
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
|
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
|
||||||
klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "svcPortName", svcPortName.String(), "clusterIP", svcInfo.ClusterIP().String())
|
klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "svcPortName", svcPortName.String(), "clusterIP", svcInfo.ClusterIP().String())
|
||||||
staleServices.Insert(svcInfo.ClusterIP().String())
|
conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String())
|
||||||
for _, extIP := range svcInfo.ExternalIPStrings() {
|
for _, extIP := range svcInfo.ExternalIPStrings() {
|
||||||
staleServices.Insert(extIP)
|
conntrackCleanupServiceIPs.Insert(extIP)
|
||||||
|
}
|
||||||
|
nodePort := svcInfo.NodePort()
|
||||||
|
if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 {
|
||||||
|
klog.V(2).Infof("Stale %s service NodePort %v -> %d", strings.ToLower(string(svcInfo.Protocol())), svcPortName, nodePort)
|
||||||
|
conntrackCleanupServiceNodePorts.Insert(nodePort)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1278,16 +1288,6 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
klog.ErrorS(err, "can't open port, skipping this nodePort", "port", lp.String())
|
klog.ErrorS(err, "can't open port, skipping this nodePort", "port", lp.String())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if lp.Protocol == "udp" {
|
|
||||||
// TODO: We might have multiple services using the same port, and this will clear conntrack for all of them.
|
|
||||||
// This is very low impact. The NodePort range is intentionally obscure, and unlikely to actually collide with real Services.
|
|
||||||
// This only affects UDP connections, which are not common.
|
|
||||||
// See issue: https://github.com/kubernetes/kubernetes/issues/49881
|
|
||||||
err := conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
|
|
||||||
if err != nil {
|
|
||||||
klog.ErrorS(err, "Failed to clear udp conntrack", "port", lp.Port)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
replacementPortsMap[lp] = socket
|
replacementPortsMap[lp] = socket
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1646,13 +1646,21 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Finish housekeeping.
|
// Finish housekeeping.
|
||||||
|
// Clear stale conntrack entries for UDP Services, this has to be done AFTER the iptables rules are programmed.
|
||||||
// TODO: these could be made more consistent.
|
// TODO: these could be made more consistent.
|
||||||
klog.V(4).InfoS("Deleting stale services", "ips", staleServices.UnsortedList())
|
klog.V(4).InfoS("Deleting conntrack stale entries for Services", "ips", conntrackCleanupServiceIPs.UnsortedList())
|
||||||
for _, svcIP := range staleServices.UnsortedList() {
|
for _, svcIP := range conntrackCleanupServiceIPs.UnsortedList() {
|
||||||
if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {
|
if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {
|
||||||
klog.ErrorS(err, "Failed to delete stale service connections", "ip", svcIP)
|
klog.ErrorS(err, "Failed to delete stale service connections", "ip", svcIP)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
klog.V(4).InfoS("Deleting conntrack stale entries for Services", "nodeports", conntrackCleanupServiceNodePorts.UnsortedList())
|
||||||
|
for _, nodePort := range conntrackCleanupServiceNodePorts.UnsortedList() {
|
||||||
|
err := conntrack.ClearEntriesForPort(proxier.exec, nodePort, isIPv6, v1.ProtocolUDP)
|
||||||
|
if err != nil {
|
||||||
|
klog.ErrorS(err, "Failed to clear udp conntrack", "port", nodePort)
|
||||||
|
}
|
||||||
|
}
|
||||||
klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.StaleEndpoints)
|
klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.StaleEndpoints)
|
||||||
proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
|
proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
|
||||||
}
|
}
|
||||||
|
@ -26,14 +26,13 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/klog/v2"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
discovery "k8s.io/api/discovery/v1beta1"
|
discovery "k8s.io/api/discovery/v1beta1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/intstr"
|
"k8s.io/apimachinery/pkg/util/intstr"
|
||||||
|
"k8s.io/klog/v2"
|
||||||
"k8s.io/kubernetes/pkg/proxy"
|
"k8s.io/kubernetes/pkg/proxy"
|
||||||
"k8s.io/kubernetes/pkg/proxy/healthcheck"
|
"k8s.io/kubernetes/pkg/proxy/healthcheck"
|
||||||
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
|
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
|
||||||
@ -2848,4 +2847,88 @@ COMMIT
|
|||||||
assert.NotEqual(t, expectedIPTables, fp.iptablesData.String())
|
assert.NotEqual(t, expectedIPTables, fp.iptablesData.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestProxierDeleteNodePortStaleUDP(t *testing.T) {
|
||||||
|
fcmd := fakeexec.FakeCmd{}
|
||||||
|
fexec := fakeexec.FakeExec{
|
||||||
|
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
|
||||||
|
}
|
||||||
|
execFunc := func(cmd string, args ...string) exec.Cmd {
|
||||||
|
return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
|
||||||
|
}
|
||||||
|
cmdOutput := "1 flow entries have been deleted"
|
||||||
|
cmdFunc := func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, nil }
|
||||||
|
|
||||||
|
// Delete ClusterIP entries
|
||||||
|
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
|
||||||
|
fexec.CommandScript = append(fexec.CommandScript, execFunc)
|
||||||
|
// Delete NodePort entries
|
||||||
|
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
|
||||||
|
fexec.CommandScript = append(fexec.CommandScript, execFunc)
|
||||||
|
|
||||||
|
ipt := iptablestest.NewFake()
|
||||||
|
fp := NewFakeProxier(ipt, false)
|
||||||
|
fp.exec = &fexec
|
||||||
|
|
||||||
|
svcIP := "10.20.30.41"
|
||||||
|
svcPort := 80
|
||||||
|
nodePort := 31201
|
||||||
|
svcPortName := proxy.ServicePortName{
|
||||||
|
NamespacedName: makeNSN("ns1", "svc1"),
|
||||||
|
Port: "p80",
|
||||||
|
Protocol: v1.ProtocolUDP,
|
||||||
|
}
|
||||||
|
|
||||||
|
makeServiceMap(fp,
|
||||||
|
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
|
||||||
|
svc.Spec.ClusterIP = svcIP
|
||||||
|
svc.Spec.Ports = []v1.ServicePort{{
|
||||||
|
Name: svcPortName.Port,
|
||||||
|
Port: int32(svcPort),
|
||||||
|
Protocol: v1.ProtocolUDP,
|
||||||
|
NodePort: int32(nodePort),
|
||||||
|
}}
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
makeEndpointsMap(fp)
|
||||||
|
|
||||||
|
fp.syncProxyRules()
|
||||||
|
if fexec.CommandCalls != 0 {
|
||||||
|
t.Fatalf("Created service without endpoints must not clear conntrack entries")
|
||||||
|
}
|
||||||
|
|
||||||
|
epIP := "10.180.0.1"
|
||||||
|
makeEndpointsMap(fp,
|
||||||
|
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
|
||||||
|
ept.Subsets = []v1.EndpointSubset{{
|
||||||
|
Addresses: []v1.EndpointAddress{{
|
||||||
|
IP: epIP,
|
||||||
|
}},
|
||||||
|
Ports: []v1.EndpointPort{{
|
||||||
|
Name: svcPortName.Port,
|
||||||
|
Port: int32(svcPort),
|
||||||
|
Protocol: v1.ProtocolUDP,
|
||||||
|
}},
|
||||||
|
}}
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
|
||||||
|
fp.syncProxyRules()
|
||||||
|
if fexec.CommandCalls != 2 {
|
||||||
|
t.Fatalf("Updated UDP service with new endpoints must clear UDP entries")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete ClusterIP Conntrack entries
|
||||||
|
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p %s", svcIP, strings.ToLower(string((v1.ProtocolUDP))))
|
||||||
|
actualCommand := strings.Join(fcmd.CombinedOutputLog[0], " ")
|
||||||
|
if actualCommand != expectCommand {
|
||||||
|
t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand)
|
||||||
|
}
|
||||||
|
// Delete NodePort Conntrack entrie
|
||||||
|
expectCommand = fmt.Sprintf("conntrack -D -p %s --dport %d", strings.ToLower(string((v1.ProtocolUDP))), nodePort)
|
||||||
|
actualCommand = strings.Join(fcmd.CombinedOutputLog[1], " ")
|
||||||
|
if actualCommand != expectCommand {
|
||||||
|
t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces.
|
// TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces.
|
||||||
|
Loading…
Reference in New Issue
Block a user