From 9fc8a79e37c1eb752a29623c88e8c98409ea4315 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Tue, 1 Sep 2015 16:40:11 -0700 Subject: [PATCH] Revert "Revert "Don't take the proxy mutex in the traffic path"" --- pkg/proxy/userspace/proxier.go | 15 +++++++++++++++ pkg/proxy/userspace/proxier_test.go | 6 ++++++ pkg/proxy/userspace/proxysocket.go | 6 +++--- 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index d9ff1d22b8f..5fecd71d8c0 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -41,6 +41,7 @@ type portal struct { } type serviceInfo struct { + isAliveAtomic int32 // Only access this with atomic ops portal portal protocol api.Protocol proxyPort int @@ -55,6 +56,18 @@ type serviceInfo struct { externalIPs []string } +func (info *serviceInfo) setAlive(b bool) { + var i int32 + if b { + i = 1 + } + atomic.StoreInt32(&info.isAliveAtomic, i) +} + +func (info *serviceInfo) isAlive() bool { + return atomic.LoadInt32(&info.isAliveAtomic) != 0 +} + func logTimeout(err error) bool { if e, ok := err.(net.Error); ok { if e.Timeout() { @@ -256,6 +269,7 @@ func (proxier *Proxier) stopProxy(service proxy.ServicePortName, info *serviceIn // This assumes proxier.mu is locked. func (proxier *Proxier) stopProxyInternal(service proxy.ServicePortName, info *serviceInfo) error { delete(proxier.serviceMap, service) + info.setAlive(false) err := info.socket.Close() port := info.socket.ListenPort() proxier.proxyPorts.Release(port) @@ -294,6 +308,7 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol return nil, err } si := &serviceInfo{ + isAliveAtomic: 1, proxyPort: portNum, protocol: protocol, socket: sock, diff --git a/pkg/proxy/userspace/proxier_test.go b/pkg/proxy/userspace/proxier_test.go index f45102c55b2..3c789e62be5 100644 --- a/pkg/proxy/userspace/proxier_test.go +++ b/pkg/proxy/userspace/proxier_test.go @@ -429,6 +429,9 @@ func TestTCPProxyStop(t *testing.T) { if err != nil { t.Fatalf("error adding new service: %#v", err) } + if !svcInfo.isAlive() { + t.Fatalf("wrong value for isAlive(): expected true") + } conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort)) if err != nil { t.Fatalf("error connecting to proxy: %v", err) @@ -437,6 +440,9 @@ func TestTCPProxyStop(t *testing.T) { waitForNumProxyLoops(t, p, 1) stopProxyByName(p, service) + if svcInfo.isAlive() { + t.Fatalf("wrong value for isAlive(): expected false") + } // Wait for the port to really close. if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) diff --git a/pkg/proxy/userspace/proxysocket.go b/pkg/proxy/userspace/proxysocket.go index 938e8d20da6..ca7f59ec3d4 100644 --- a/pkg/proxy/userspace/proxysocket.go +++ b/pkg/proxy/userspace/proxysocket.go @@ -111,7 +111,7 @@ func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) { for { - if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo { + if !myInfo.isAlive() { // The service port was closed or replaced. return } @@ -125,7 +125,7 @@ func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serv if isClosedError(err) { return } - if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo { + if !myInfo.isAlive() { // Then the service port was just closed so the accept failure is to be expected. return } @@ -198,7 +198,7 @@ func newClientCache() *clientCache { func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) { var buffer [4096]byte // 4KiB should be enough for most whole-packets for { - if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo { + if !myInfo.isAlive() { // The service port was closed or replaced. break }