mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 10:51:29 +00:00
Merge pull request #106163 from aojea/conntrack_readiness
kube-proxy consider endpoint readiness to delete UDP stale conntrack entries
This commit is contained in:
commit
0940dd6fc4
@ -121,7 +121,9 @@ func (info *BaseEndpointInfo) Port() (int, error) {
|
||||
|
||||
// Equal is part of proxy.Endpoint interface.
|
||||
func (info *BaseEndpointInfo) Equal(other Endpoint) bool {
|
||||
return info.String() == other.String() && info.GetIsLocal() == other.GetIsLocal()
|
||||
return info.String() == other.String() &&
|
||||
info.GetIsLocal() == other.GetIsLocal() &&
|
||||
info.IsReady() == other.IsReady()
|
||||
}
|
||||
|
||||
// GetNodeName returns the NodeName for this endpoint.
|
||||
@ -536,13 +538,22 @@ func (em EndpointsMap) getLocalReadyEndpointIPs() map[types.NamespacedName]sets.
|
||||
// detectStaleConnections modifies <staleEndpoints> and <staleServices> with detected stale connections. <staleServiceNames>
|
||||
// is used to store stale udp service in order to clear udp conntrack later.
|
||||
func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, staleEndpoints *[]ServiceEndpoint, staleServiceNames *[]ServicePortName) {
|
||||
// Detect stale endpoints: an endpoint can have stale conntrack entries if it was receiving traffic
|
||||
// and then goes unready or changes its IP address.
|
||||
for svcPortName, epList := range oldEndpointsMap {
|
||||
if svcPortName.Protocol != v1.ProtocolUDP {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, ep := range epList {
|
||||
// if the old endpoint wasn't ready is not possible to have stale entries
|
||||
// since there was no traffic sent to it.
|
||||
if !ep.IsReady() {
|
||||
continue
|
||||
}
|
||||
stale := true
|
||||
// Check if the endpoint has changed, including if it went from ready to not ready.
|
||||
// If it did change stale entries for the old endpoint has to be cleared.
|
||||
for i := range newEndpointsMap[svcPortName] {
|
||||
if newEndpointsMap[svcPortName][i].Equal(ep) {
|
||||
stale = false
|
||||
@ -556,13 +567,29 @@ func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, stale
|
||||
}
|
||||
}
|
||||
|
||||
// Detect stale services
|
||||
// For udp service, if its backend changes from 0 to non-0 ready endpoints.
|
||||
// There may exist a conntrack entry that could blackhole traffic to the service.
|
||||
for svcPortName, epList := range newEndpointsMap {
|
||||
if svcPortName.Protocol != v1.ProtocolUDP {
|
||||
continue
|
||||
}
|
||||
|
||||
// For udp service, if its backend changes from 0 to non-0. There may exist a conntrack entry that could blackhole traffic to the service.
|
||||
if len(epList) > 0 && len(oldEndpointsMap[svcPortName]) == 0 {
|
||||
epReady := 0
|
||||
for _, ep := range epList {
|
||||
if ep.IsReady() {
|
||||
epReady++
|
||||
}
|
||||
}
|
||||
|
||||
oldEpReady := 0
|
||||
for _, ep := range oldEndpointsMap[svcPortName] {
|
||||
if ep.IsReady() {
|
||||
oldEpReady++
|
||||
}
|
||||
}
|
||||
|
||||
if epReady > 0 && oldEpReady == 0 {
|
||||
*staleServiceNames = append(*staleServiceNames, svcPortName)
|
||||
}
|
||||
}
|
||||
|
@ -164,7 +164,8 @@ func (e *endpointsInfo) Equal(other proxy.Endpoint) bool {
|
||||
return e.Endpoint == o.Endpoint &&
|
||||
e.IsLocal == o.IsLocal &&
|
||||
e.protocol == o.protocol &&
|
||||
e.chainName == o.chainName
|
||||
e.chainName == o.chainName &&
|
||||
e.Ready == o.Ready
|
||||
}
|
||||
|
||||
// Returns the endpoint chain name for a given endpointsInfo.
|
||||
|
@ -3898,6 +3898,32 @@ func TestProxierDeleteNodePortStaleUDP(t *testing.T) {
|
||||
eps.AddressType = discovery.AddressTypeIPv4
|
||||
eps.Endpoints = []discovery.Endpoint{{
|
||||
Addresses: []string{epIP},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.Bool(false),
|
||||
},
|
||||
}}
|
||||
eps.Ports = []discovery.EndpointPort{{
|
||||
Name: utilpointer.StringPtr(svcPortName.Port),
|
||||
Port: utilpointer.Int32(int32(svcPort)),
|
||||
Protocol: &udpProtocol,
|
||||
}}
|
||||
}),
|
||||
)
|
||||
|
||||
fp.syncProxyRules()
|
||||
|
||||
if fexec.CommandCalls != 0 {
|
||||
t.Fatalf("Updated UDP service with not ready endpoints must not clear UDP entries")
|
||||
}
|
||||
|
||||
populateEndpointSlices(fp,
|
||||
makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
|
||||
eps.AddressType = discovery.AddressTypeIPv4
|
||||
eps.Endpoints = []discovery.Endpoint{{
|
||||
Addresses: []string{epIP},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.Bool(true),
|
||||
},
|
||||
}}
|
||||
eps.Ports = []discovery.EndpointPort{{
|
||||
Name: utilpointer.StringPtr(svcPortName.Port),
|
||||
|
@ -279,6 +279,79 @@ var _ = common.SIGDescribe("Conntrack", func() {
|
||||
}
|
||||
})
|
||||
|
||||
// Regression test for #105657
|
||||
// 1. Create an UDP Service
|
||||
// 2. Client Pod sending traffic to the UDP service
|
||||
// 3. Create an UDP server associated to the Service created in 1. with an init container that sleeps for some time
|
||||
// The init container makes that the server pod is not ready, however, the endpoint slices are created, it is just
|
||||
// that the Endpoint conditions Ready is false.
|
||||
// If the kube-proxy conntrack logic doesn't check readiness, it will delete the conntrack entries for the UDP server
|
||||
// when the endpoint slice has been created, however, the iptables rules will not installed until at least one
|
||||
// endpoint is ready. If some traffic arrives to since kube-proxy clear the entries (see the endpoint slice) and
|
||||
// installs the corresponding iptables rules (the endpoint is ready), a conntrack entry will be generated blackholing
|
||||
// subsequent traffic.
|
||||
ginkgo.It("should be able to preserve UDP traffic when initial unready endpoints get ready", func() {
|
||||
|
||||
// Create a ClusterIP service
|
||||
udpJig := e2eservice.NewTestJig(cs, ns, serviceName)
|
||||
ginkgo.By("creating a UDP service " + serviceName + " with type=ClusterIP in " + ns)
|
||||
udpService, err := udpJig.CreateUDPService(func(svc *v1.Service) {
|
||||
svc.Spec.Type = v1.ServiceTypeClusterIP
|
||||
svc.Spec.Ports = []v1.ServicePort{
|
||||
{Port: 80, Name: "udp", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt(80)},
|
||||
}
|
||||
})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
// Create a pod in one node to create the UDP traffic against the ClusterIP service every 5 seconds
|
||||
ginkgo.By("creating a client pod for probing the service " + serviceName)
|
||||
clientPod := e2epod.NewAgnhostPod(ns, podClient, nil, nil, nil)
|
||||
nodeSelection := e2epod.NodeSelection{Name: clientNodeInfo.name}
|
||||
e2epod.SetNodeSelection(&clientPod.Spec, nodeSelection)
|
||||
cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do echo "$(date) Try: ${i}"; echo hostname | nc -u -w 5 -p %d %s %d; echo; done`, srcPort, udpService.Spec.ClusterIP, udpService.Spec.Ports[0].Port)
|
||||
clientPod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd}
|
||||
clientPod.Spec.Containers[0].Name = podClient
|
||||
fr.PodClient().CreateSync(clientPod)
|
||||
|
||||
// Read the client pod logs
|
||||
logs, err := e2epod.GetPodLogs(cs, ns, podClient, podClient)
|
||||
framework.ExpectNoError(err)
|
||||
framework.Logf("Pod client logs: %s", logs)
|
||||
|
||||
// Add a backend pod to the service in the other node
|
||||
ginkgo.By("creating a backend pod " + podBackend1 + " for the service " + serviceName)
|
||||
serverPod1 := e2epod.NewAgnhostPod(ns, podBackend1, nil, nil, nil, "netexec", fmt.Sprintf("--udp-port=%d", 80))
|
||||
serverPod1.Labels = udpJig.Labels
|
||||
nodeSelection = e2epod.NodeSelection{Name: serverNodeInfo.name}
|
||||
// Add an init container to hold the pod to be ready for 15 seconds
|
||||
serverPod1.Spec.InitContainers = []v1.Container{
|
||||
{
|
||||
Name: "init",
|
||||
Image: imageutils.GetE2EImage(imageutils.BusyBox),
|
||||
Command: []string{"/bin/sh", "-c", "echo Pausing start. && sleep 15"},
|
||||
},
|
||||
}
|
||||
e2epod.SetNodeSelection(&serverPod1.Spec, nodeSelection)
|
||||
fr.PodClient().CreateSync(serverPod1)
|
||||
|
||||
// wait until the endpoints are ready
|
||||
validateEndpointsPortsOrFail(cs, ns, serviceName, portsByPodName{podBackend1: {80}})
|
||||
|
||||
// Note that the fact that Endpoints object already exists, does NOT mean
|
||||
// that iptables (or whatever else is used) was already programmed.
|
||||
// Additionally take into account that UDP conntract entries timeout is
|
||||
// 30 seconds by default.
|
||||
// Based on the above check if the pod receives the traffic.
|
||||
ginkgo.By("checking client pod connected to the backend on Node IP " + serverNodeInfo.nodeIP)
|
||||
if err := wait.PollImmediate(5*time.Second, time.Minute, logContainsFn(podBackend1, podClient)); err != nil {
|
||||
logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient)
|
||||
framework.ExpectNoError(err)
|
||||
framework.Logf("Pod client logs: %s", logs)
|
||||
framework.Failf("Failed to connect to backend pod")
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
// Regression test for #74839, where:
|
||||
// Packets considered INVALID by conntrack are now dropped. In particular, this fixes
|
||||
// a problem where spurious retransmits in a long-running TCP connection to a service
|
||||
|
Loading…
Reference in New Issue
Block a user