kube-proxy: fix stale detection logic

The logic to detect stale endpoints was not assuming the endpoint
readiness.

We can have stale entries on UDP services for 2 reasons:
- an endpoint was receiving traffic and is removed or replaced
- a service was receiving traffic but not forwarding it, and starts
to forward it.

Add an e2e test to cover the regression
This commit is contained in:
Antonio Ojea 2021-11-05 00:37:45 +01:00
parent cb040e5097
commit 909925b492
4 changed files with 131 additions and 4 deletions

View File

@ -121,7 +121,9 @@ func (info *BaseEndpointInfo) Port() (int, error) {
// Equal is part of proxy.Endpoint interface. // Equal is part of proxy.Endpoint interface.
func (info *BaseEndpointInfo) Equal(other Endpoint) bool { func (info *BaseEndpointInfo) Equal(other Endpoint) bool {
return info.String() == other.String() && info.GetIsLocal() == other.GetIsLocal() return info.String() == other.String() &&
info.GetIsLocal() == other.GetIsLocal() &&
info.IsReady() == other.IsReady()
} }
// GetNodeName returns the NodeName for this endpoint. // GetNodeName returns the NodeName for this endpoint.
@ -536,13 +538,22 @@ func (em EndpointsMap) getLocalReadyEndpointIPs() map[types.NamespacedName]sets.
// detectStaleConnections modifies <staleEndpoints> and <staleServices> with detected stale connections. <staleServiceNames> // detectStaleConnections modifies <staleEndpoints> and <staleServices> with detected stale connections. <staleServiceNames>
// is used to store stale udp service in order to clear udp conntrack later. // is used to store stale udp service in order to clear udp conntrack later.
func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, staleEndpoints *[]ServiceEndpoint, staleServiceNames *[]ServicePortName) { func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, staleEndpoints *[]ServiceEndpoint, staleServiceNames *[]ServicePortName) {
// Detect stale endpoints: an endpoint can have stale conntrack entries if it was receiving traffic
// and then goes unready or changes its IP address.
for svcPortName, epList := range oldEndpointsMap { for svcPortName, epList := range oldEndpointsMap {
if svcPortName.Protocol != v1.ProtocolUDP { if svcPortName.Protocol != v1.ProtocolUDP {
continue continue
} }
for _, ep := range epList { for _, ep := range epList {
// if the old endpoint wasn't ready is not possible to have stale entries
// since there was no traffic sent to it.
if !ep.IsReady() {
continue
}
stale := true stale := true
// Check if the endpoint has changed, including if it went from ready to not ready.
// If it did change stale entries for the old endpoint has to be cleared.
for i := range newEndpointsMap[svcPortName] { for i := range newEndpointsMap[svcPortName] {
if newEndpointsMap[svcPortName][i].Equal(ep) { if newEndpointsMap[svcPortName][i].Equal(ep) {
stale = false stale = false
@ -556,13 +567,29 @@ func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, stale
} }
} }
// Detect stale services
// For udp service, if its backend changes from 0 to non-0 ready endpoints.
// There may exist a conntrack entry that could blackhole traffic to the service.
for svcPortName, epList := range newEndpointsMap { for svcPortName, epList := range newEndpointsMap {
if svcPortName.Protocol != v1.ProtocolUDP { if svcPortName.Protocol != v1.ProtocolUDP {
continue continue
} }
// For udp service, if its backend changes from 0 to non-0. There may exist a conntrack entry that could blackhole traffic to the service. epReady := 0
if len(epList) > 0 && len(oldEndpointsMap[svcPortName]) == 0 { for _, ep := range epList {
if ep.IsReady() {
epReady++
}
}
oldEpReady := 0
for _, ep := range oldEndpointsMap[svcPortName] {
if ep.IsReady() {
oldEpReady++
}
}
if epReady > 0 && oldEpReady == 0 {
*staleServiceNames = append(*staleServiceNames, svcPortName) *staleServiceNames = append(*staleServiceNames, svcPortName)
} }
} }

View File

@ -164,7 +164,8 @@ func (e *endpointsInfo) Equal(other proxy.Endpoint) bool {
return e.Endpoint == o.Endpoint && return e.Endpoint == o.Endpoint &&
e.IsLocal == o.IsLocal && e.IsLocal == o.IsLocal &&
e.protocol == o.protocol && e.protocol == o.protocol &&
e.chainName == o.chainName e.chainName == o.chainName &&
e.Ready == o.Ready
} }
// Returns the endpoint chain name for a given endpointsInfo. // Returns the endpoint chain name for a given endpointsInfo.

View File

@ -3898,6 +3898,32 @@ func TestProxierDeleteNodePortStaleUDP(t *testing.T) {
eps.AddressType = discovery.AddressTypeIPv4 eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{ eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIP}, Addresses: []string{epIP},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.Bool(false),
},
}}
eps.Ports = []discovery.EndpointPort{{
Name: utilpointer.StringPtr(svcPortName.Port),
Port: utilpointer.Int32(int32(svcPort)),
Protocol: &udpProtocol,
}}
}),
)
fp.syncProxyRules()
if fexec.CommandCalls != 0 {
t.Fatalf("Updated UDP service with not ready endpoints must not clear UDP entries")
}
populateEndpointSlices(fp,
makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIP},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.Bool(true),
},
}} }}
eps.Ports = []discovery.EndpointPort{{ eps.Ports = []discovery.EndpointPort{{
Name: utilpointer.StringPtr(svcPortName.Port), Name: utilpointer.StringPtr(svcPortName.Port),

View File

@ -279,6 +279,79 @@ var _ = common.SIGDescribe("Conntrack", func() {
} }
}) })
// Regression test for #105657
// 1. Create an UDP Service
// 2. Client Pod sending traffic to the UDP service
// 3. Create an UDP server associated to the Service created in 1. with an init container that sleeps for some time
// The init container makes that the server pod is not ready, however, the endpoint slices are created, it is just
// that the Endpoint conditions Ready is false.
// If the kube-proxy conntrack logic doesn't check readiness, it will delete the conntrack entries for the UDP server
// when the endpoint slice has been created, however, the iptables rules will not installed until at least one
// endpoint is ready. If some traffic arrives to since kube-proxy clear the entries (see the endpoint slice) and
// installs the corresponding iptables rules (the endpoint is ready), a conntrack entry will be generated blackholing
// subsequent traffic.
ginkgo.It("should be able to preserve UDP traffic when initial unready endpoints get ready", func() {
// Create a ClusterIP service
udpJig := e2eservice.NewTestJig(cs, ns, serviceName)
ginkgo.By("creating a UDP service " + serviceName + " with type=ClusterIP in " + ns)
udpService, err := udpJig.CreateUDPService(func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeClusterIP
svc.Spec.Ports = []v1.ServicePort{
{Port: 80, Name: "udp", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt(80)},
}
})
framework.ExpectNoError(err)
// Create a pod in one node to create the UDP traffic against the ClusterIP service every 5 seconds
ginkgo.By("creating a client pod for probing the service " + serviceName)
clientPod := e2epod.NewAgnhostPod(ns, podClient, nil, nil, nil)
nodeSelection := e2epod.NodeSelection{Name: clientNodeInfo.name}
e2epod.SetNodeSelection(&clientPod.Spec, nodeSelection)
cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do echo "$(date) Try: ${i}"; echo hostname | nc -u -w 5 -p %d %s %d; echo; done`, srcPort, udpService.Spec.ClusterIP, udpService.Spec.Ports[0].Port)
clientPod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd}
clientPod.Spec.Containers[0].Name = podClient
fr.PodClient().CreateSync(clientPod)
// Read the client pod logs
logs, err := e2epod.GetPodLogs(cs, ns, podClient, podClient)
framework.ExpectNoError(err)
framework.Logf("Pod client logs: %s", logs)
// Add a backend pod to the service in the other node
ginkgo.By("creating a backend pod " + podBackend1 + " for the service " + serviceName)
serverPod1 := e2epod.NewAgnhostPod(ns, podBackend1, nil, nil, nil, "netexec", fmt.Sprintf("--udp-port=%d", 80))
serverPod1.Labels = udpJig.Labels
nodeSelection = e2epod.NodeSelection{Name: serverNodeInfo.name}
// Add an init container to hold the pod to be ready for 15 seconds
serverPod1.Spec.InitContainers = []v1.Container{
{
Name: "init",
Image: imageutils.GetE2EImage(imageutils.BusyBox),
Command: []string{"/bin/sh", "-c", "echo Pausing start. && sleep 15"},
},
}
e2epod.SetNodeSelection(&serverPod1.Spec, nodeSelection)
fr.PodClient().CreateSync(serverPod1)
// wait until the endpoints are ready
validateEndpointsPortsOrFail(cs, ns, serviceName, portsByPodName{podBackend1: {80}})
// Note that the fact that Endpoints object already exists, does NOT mean
// that iptables (or whatever else is used) was already programmed.
// Additionally take into account that UDP conntract entries timeout is
// 30 seconds by default.
// Based on the above check if the pod receives the traffic.
ginkgo.By("checking client pod connected to the backend on Node IP " + serverNodeInfo.nodeIP)
if err := wait.PollImmediate(5*time.Second, time.Minute, logContainsFn(podBackend1, podClient)); err != nil {
logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient)
framework.ExpectNoError(err)
framework.Logf("Pod client logs: %s", logs)
framework.Failf("Failed to connect to backend pod")
}
})
// Regression test for #74839, where: // Regression test for #74839, where:
// Packets considered INVALID by conntrack are now dropped. In particular, this fixes // Packets considered INVALID by conntrack are now dropped. In particular, this fixes
// a problem where spurious retransmits in a long-running TCP connection to a service // a problem where spurious retransmits in a long-running TCP connection to a service