Merge pull request #13345 from thockin/proxy-contention

Don't take the proxy mutex in the traffic path
This commit is contained in:
Daniel Smith 2015-09-01 12:25:59 -07:00
commit a98a0f52b4
3 changed files with 24 additions and 3 deletions

View File

@ -41,6 +41,7 @@ type portal struct {
} }
type serviceInfo struct { type serviceInfo struct {
isAliveAtomic int32 // Only access this with atomic ops
portal portal portal portal
protocol api.Protocol protocol api.Protocol
proxyPort int proxyPort int
@ -55,6 +56,18 @@ type serviceInfo struct {
externalIPs []string externalIPs []string
} }
func (info *serviceInfo) setAlive(b bool) {
var i int32
if b {
i = 1
}
atomic.StoreInt32(&info.isAliveAtomic, i)
}
func (info *serviceInfo) isAlive() bool {
return atomic.LoadInt32(&info.isAliveAtomic) != 0
}
func logTimeout(err error) bool { func logTimeout(err error) bool {
if e, ok := err.(net.Error); ok { if e, ok := err.(net.Error); ok {
if e.Timeout() { if e.Timeout() {
@ -256,6 +269,7 @@ func (proxier *Proxier) stopProxy(service proxy.ServicePortName, info *serviceIn
// This assumes proxier.mu is locked. // This assumes proxier.mu is locked.
func (proxier *Proxier) stopProxyInternal(service proxy.ServicePortName, info *serviceInfo) error { func (proxier *Proxier) stopProxyInternal(service proxy.ServicePortName, info *serviceInfo) error {
delete(proxier.serviceMap, service) delete(proxier.serviceMap, service)
info.setAlive(false)
err := info.socket.Close() err := info.socket.Close()
port := info.socket.ListenPort() port := info.socket.ListenPort()
proxier.proxyPorts.Release(port) proxier.proxyPorts.Release(port)
@ -294,6 +308,7 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol
return nil, err return nil, err
} }
si := &serviceInfo{ si := &serviceInfo{
isAliveAtomic: 1,
proxyPort: portNum, proxyPort: portNum,
protocol: protocol, protocol: protocol,
socket: sock, socket: sock,

View File

@ -429,6 +429,9 @@ func TestTCPProxyStop(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("error adding new service: %#v", err) t.Fatalf("error adding new service: %#v", err)
} }
if !svcInfo.isAlive() {
t.Fatalf("wrong value for isAlive(): expected true")
}
conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort)) conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort))
if err != nil { if err != nil {
t.Fatalf("error connecting to proxy: %v", err) t.Fatalf("error connecting to proxy: %v", err)
@ -437,6 +440,9 @@ func TestTCPProxyStop(t *testing.T) {
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
stopProxyByName(p, service) stopProxyByName(p, service)
if svcInfo.isAlive() {
t.Fatalf("wrong value for isAlive(): expected false")
}
// Wait for the port to really close. // Wait for the port to really close.
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())

View File

@ -111,7 +111,7 @@ func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string
func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) { func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) {
for { for {
if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo { if !myInfo.isAlive() {
// The service port was closed or replaced. // The service port was closed or replaced.
return return
} }
@ -125,7 +125,7 @@ func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serv
if isClosedError(err) { if isClosedError(err) {
return return
} }
if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo { if !myInfo.isAlive() {
// Then the service port was just closed so the accept failure is to be expected. // Then the service port was just closed so the accept failure is to be expected.
return return
} }
@ -198,7 +198,7 @@ func newClientCache() *clientCache {
func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) { func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) {
var buffer [4096]byte // 4KiB should be enough for most whole-packets var buffer [4096]byte // 4KiB should be enough for most whole-packets
for { for {
if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo { if !myInfo.isAlive() {
// The service port was closed or replaced. // The service port was closed or replaced.
break break
} }