From 25ac521f88da270f8ac2443f8974963b45b471bc Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Wed, 5 Jul 2017 17:08:58 -0700 Subject: [PATCH] flush conntrack entry for udp service when # of backend changes from 0 to non-0 --- pkg/proxy/iptables/proxier.go | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 8e6c444e822..57446aeae81 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -755,16 +755,16 @@ func (proxier *Proxier) OnEndpointsSynced() { func updateEndpointsMap( endpointsMap proxyEndpointsMap, changes *endpointsChangeMap, - hostname string) (hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) { - staleSet = make(map[endpointServicePair]bool) - + hostname string) (hcEndpoints map[types.NamespacedName]int, staleEndpoints map[endpointServicePair]bool, staleServiceNames map[proxy.ServicePortName]bool) { + staleEndpoints = make(map[endpointServicePair]bool) + staleServiceNames = make(map[proxy.ServicePortName]bool) func() { changes.lock.Lock() defer changes.lock.Unlock() for _, change := range changes.items { endpointsMap.unmerge(change.previous) endpointsMap.merge(change.current) - detectStaleConnections(change.previous, change.current, staleSet) + detectStaleConnections(change.previous, change.current, staleEndpoints, staleServiceNames) } changes.items = make(map[types.NamespacedName]*endpointsChange) }() @@ -781,12 +781,11 @@ func updateEndpointsMap( hcEndpoints[nsn] = len(ips) } - return hcEndpoints, staleSet + return hcEndpoints, staleEndpoints, staleServiceNames } -// are modified by this function with detected stale -// connections. -func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool) { +// and are modified by this function with detected stale connections. +func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool, staleServiceNames map[proxy.ServicePortName]bool) { for svcPortName, epList := range oldEndpointsMap { for _, ep := range epList { stale := true @@ -802,6 +801,13 @@ func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, } } } + + for svcPortName, epList := range newEndpointsMap { + // For udp service, if its backend changes from 0 to non-0. There may exist a conntrack entry that could blackhole traffic to the service. + if len(epList) > 0 && len(oldEndpointsMap[svcPortName]) == 0 { + staleServiceNames[svcPortName] = true + } + } } func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.String { @@ -980,14 +986,23 @@ func (proxier *Proxier) syncProxyRules() { return } + var staleServices sets.String // We assume that if this was called, we really want to sync them, // even if nothing changed in the meantime. In other words, callers are // responsible for detecting no-op changes and not calling this function. hcServices, staleServices := updateServiceMap( proxier.serviceMap, &proxier.serviceChanges) - hcEndpoints, staleEndpoints := updateEndpointsMap( + hcEndpoints, staleEndpoints, staleServiceNames := updateEndpointsMap( proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname) + // merge stale services gathered from updateEndpointsMap + for svcPortName := range staleServiceNames { + if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.protocol == api.ProtocolUDP { + glog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.clusterIP.String()) + staleServices.Insert(svcInfo.clusterIP.String()) + } + } + glog.V(3).Infof("Syncing iptables rules") // Create and link the kube services chain.