Merge pull request #13480 from kubernetes/revert-13461-revert-13345-proxy-contention

Revert "Revert "Don't take the proxy mutex in the traffic path""
This commit is contained in:
Daniel Smith 2015-09-01 18:06:36 -07:00
commit a918044edd
3 changed files with 24 additions and 3 deletions

View File

@ -41,6 +41,7 @@ type portal struct {
}
type serviceInfo struct {
isAliveAtomic int32 // Only access this with atomic ops
portal portal
protocol api.Protocol
proxyPort int
@ -55,6 +56,18 @@ type serviceInfo struct {
externalIPs []string
}
func (info *serviceInfo) setAlive(b bool) {
var i int32
if b {
i = 1
}
atomic.StoreInt32(&info.isAliveAtomic, i)
}
func (info *serviceInfo) isAlive() bool {
return atomic.LoadInt32(&info.isAliveAtomic) != 0
}
func logTimeout(err error) bool {
if e, ok := err.(net.Error); ok {
if e.Timeout() {
@ -256,6 +269,7 @@ func (proxier *Proxier) stopProxy(service proxy.ServicePortName, info *serviceIn
// This assumes proxier.mu is locked.
func (proxier *Proxier) stopProxyInternal(service proxy.ServicePortName, info *serviceInfo) error {
delete(proxier.serviceMap, service)
info.setAlive(false)
err := info.socket.Close()
port := info.socket.ListenPort()
proxier.proxyPorts.Release(port)
@ -294,6 +308,7 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol
return nil, err
}
si := &serviceInfo{
isAliveAtomic: 1,
proxyPort: portNum,
protocol: protocol,
socket: sock,

View File

@ -429,6 +429,9 @@ func TestTCPProxyStop(t *testing.T) {
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
if !svcInfo.isAlive() {
t.Fatalf("wrong value for isAlive(): expected true")
}
conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort))
if err != nil {
t.Fatalf("error connecting to proxy: %v", err)
@ -437,6 +440,9 @@ func TestTCPProxyStop(t *testing.T) {
waitForNumProxyLoops(t, p, 1)
stopProxyByName(p, service)
if svcInfo.isAlive() {
t.Fatalf("wrong value for isAlive(): expected false")
}
// Wait for the port to really close.
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())

View File

@ -111,7 +111,7 @@ func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string
func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) {
for {
if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo {
if !myInfo.isAlive() {
// The service port was closed or replaced.
return
}
@ -125,7 +125,7 @@ func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serv
if isClosedError(err) {
return
}
if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo {
if !myInfo.isAlive() {
// Then the service port was just closed so the accept failure is to be expected.
return
}
@ -198,7 +198,7 @@ func newClientCache() *clientCache {
func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) {
var buffer [4096]byte // 4KiB should be enough for most whole-packets
for {
if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo {
if !myInfo.isAlive() {
// The service port was closed or replaced.
break
}