proxy/userspace: use waitgroups instead of sketchy atomic ops in testcases

Instead of relying on atomic ops to increment/decrement at the right time
just use waitgroups to provide hard synchronization points.
This commit is contained in:
Dan Williams 2020-08-13 15:34:57 -05:00
parent 90723ebbae
commit 1372bd94fe
2 changed files with 402 additions and 421 deletions

View File

@ -68,6 +68,29 @@ type ServiceInfo struct {
stickyMaxAgeSeconds int
// Deprecated, but required for back-compat (including e2e)
externalIPs []string
// isStartedAtomic is set to non-zero when the service's socket begins
// accepting requests. Used in testcases. Only access this with atomic ops.
isStartedAtomic int32
// isFinishedAtomic is set to non-zero when the service's socket shuts
// down. Used in testcases. Only access this with atomic ops.
isFinishedAtomic int32
}
func (info *ServiceInfo) setStarted() {
atomic.StoreInt32(&info.isStartedAtomic, 1)
}
func (info *ServiceInfo) IsStarted() bool {
return atomic.LoadInt32(&info.isStartedAtomic) != 0
}
func (info *ServiceInfo) setFinished() {
atomic.StoreInt32(&info.isFinishedAtomic, 1)
}
func (info *ServiceInfo) IsFinished() bool {
return atomic.LoadInt32(&info.isFinishedAtomic) != 0
}
func (info *ServiceInfo) setAlive(b bool) {
@ -124,7 +147,6 @@ type Proxier struct {
udpIdleTimeout time.Duration
portMapMutex sync.Mutex
portMap map[portMapKey]*portMapValue
numProxyLoops int32 // use atomic ops to access this; mostly for testing
listenIP net.IP
iptables iptables.Interface
hostIP net.IP
@ -427,14 +449,6 @@ func (proxier *Proxier) getServiceInfo(service proxy.ServicePortName) (*ServiceI
return info, ok
}
// addServiceOnPort lockes the proxy before calling addServiceOnPortInternal.
// Used from testcases.
func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol v1.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) {
proxier.mu.Lock()
defer proxier.mu.Unlock()
return proxier.addServiceOnPortInternal(service, protocol, proxyPort, timeout)
}
// addServiceOnPortInternal starts listening for a new service, returning the ServiceInfo.
// Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP
// connections, for now.
@ -465,12 +479,10 @@ func (proxier *Proxier) addServiceOnPortInternal(service proxy.ServicePortName,
proxier.serviceMap[service] = si
klog.V(2).Infof("Proxying for service %q on %s port %d", service, protocol, portNum)
go func(service proxy.ServicePortName, proxier *Proxier) {
go func() {
defer runtime.HandleCrash()
atomic.AddInt32(&proxier.numProxyLoops, 1)
sock.ProxyLoop(service, si, proxier.loadBalancer)
atomic.AddInt32(&proxier.numProxyLoops, -1)
}(service, proxier)
}()
return si, nil
}
@ -509,6 +521,7 @@ func (proxier *Proxier) mergeService(service *v1.Service) sets.String {
if err := proxier.cleanupPortalAndProxy(serviceName, info); err != nil {
klog.Error(err)
}
info.setFinished()
}
proxyPort, err := proxier.proxyPorts.AllocateNext()
if err != nil {
@ -541,6 +554,8 @@ func (proxier *Proxier) mergeService(service *v1.Service) sets.String {
klog.Errorf("Failed to open portal for %q: %v", serviceName, err)
}
proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeSeconds)
info.setStarted()
}
return existingPorts
@ -578,6 +593,7 @@ func (proxier *Proxier) unmergeService(service *v1.Service, existingPorts sets.S
klog.Error(err)
}
proxier.loadBalancer.DeleteService(serviceName)
info.setFinished()
}
for _, svcIP := range staleUDPServices.UnsortedList() {
if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {

File diff suppressed because it is too large Load Diff