Fix a race in kube-proxy causing runaways

It was an ABA problem where the proxy loop might see its own service as
"existing" when it had been destroyed and recreated (as in an update).

To prove this I added a counter of running ProxyLoop goroutines and check that
in tests.  If I undo my main change, the tests fail.  This makes the
proxier_test significantly slower (3 seconds vs 0.5 seconds).  Sorry.
This commit is contained in:
Tim Hockin
2015-02-03 14:37:41 -08:00
parent 75ebde3814
commit 3b5ea74a48
2 changed files with 72 additions and 22 deletions

View File

@@ -23,6 +23,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@@ -52,11 +53,12 @@ var endpointDialTimeout = []time.Duration{1, 2, 4, 8}
type proxySocket interface {
// Addr gets the net.Addr for a proxySocket.
Addr() net.Addr
// Close stops the proxySocket from accepting incoming connections. Each implementation should comment
// on the impact of calling Close while sessions are active.
// Close stops the proxySocket from accepting incoming connections.
// Each implementation should comment on the impact of calling Close
// while sessions are active.
Close() error
// ProxyLoop proxies incoming connections for the specified service to the service endpoints.
ProxyLoop(service string, proxier *Proxier)
ProxyLoop(service string, info *serviceInfo, proxier *Proxier)
}
// tcpProxySocket implements proxySocket. Close() is implemented by net.Listener. When Close() is called,
@@ -85,20 +87,19 @@ func tryConnect(service string, srcAddr net.Addr, protocol string, proxier *Prox
return nil, fmt.Errorf("failed to connect to an endpoint.")
}
func (tcp *tcpProxySocket) ProxyLoop(service string, proxier *Proxier) {
func (tcp *tcpProxySocket) ProxyLoop(service string, myInfo *serviceInfo, proxier *Proxier) {
for {
_, exists := proxier.getServiceInfo(service)
if !exists {
if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo {
// The service port was closed or replaced.
break
}
// Block until a connection is made.
inConn, err := tcp.Accept()
if err != nil {
_, exists := proxier.getServiceInfo(service)
if !exists {
if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo {
// Then the service port was just closed so the accept failure is to be expected.
return
break
}
glog.Errorf("Accept failed: %v", err)
continue
@@ -161,12 +162,12 @@ func newClientCache() *clientCache {
return &clientCache{clients: map[string]net.Conn{}}
}
func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) {
func (udp *udpProxySocket) ProxyLoop(service string, myInfo *serviceInfo, proxier *Proxier) {
activeClients := newClientCache()
var buffer [4096]byte // 4KiB should be enough for most whole-packets
for {
info, exists := proxier.getServiceInfo(service)
if !exists {
if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo {
// The service port was closed or replaced.
break
}
@@ -184,7 +185,7 @@ func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) {
break
}
// If this is a client we know already, reuse the connection and goroutine.
svrConn, err := udp.getBackendConn(activeClients, cliAddr, proxier, service, info.timeout)
svrConn, err := udp.getBackendConn(activeClients, cliAddr, proxier, service, myInfo.timeout)
if err != nil {
continue
}
@@ -198,7 +199,7 @@ func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) {
}
continue
}
err = svrConn.SetDeadline(time.Now().Add(info.timeout))
err = svrConn.SetDeadline(time.Now().Add(myInfo.timeout))
if err != nil {
glog.Errorf("SetDeadline failed: %v", err)
continue
@@ -267,7 +268,7 @@ func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activ
func logTimeout(err error) bool {
if e, ok := err.(net.Error); ok {
if e.Timeout() {
glog.V(1).Infof("connection to endpoint closed due to inactivity")
glog.V(3).Infof("connection to endpoint closed due to inactivity")
return true
}
}
@@ -300,12 +301,13 @@ func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, er
// Proxier is a simple proxy for TCP connections between a localhost:lport
// and services that provide the actual implementations.
type Proxier struct {
loadBalancer LoadBalancer
mu sync.Mutex // protects serviceMap
serviceMap map[string]*serviceInfo
listenIP net.IP
iptables iptables.Interface
hostIP net.IP
loadBalancer LoadBalancer
mu sync.Mutex // protects serviceMap
serviceMap map[string]*serviceInfo
numProxyLoops int32 // use atomic ops to access this; mostly for testing
listenIP net.IP
iptables iptables.Interface
hostIP net.IP
}
// NewProxier returns a new Proxier given a LoadBalancer and an address on
@@ -443,7 +445,9 @@ func (proxier *Proxier) addServiceOnPort(service string, protocol api.Protocol,
glog.V(1).Infof("Proxying for service %q on %s port %d", service, protocol, portNum)
go func(service string, proxier *Proxier) {
defer util.HandleCrash()
sock.ProxyLoop(service, proxier)
atomic.AddInt32(&proxier.numProxyLoops, 1)
sock.ProxyLoop(service, si, proxier)
atomic.AddInt32(&proxier.numProxyLoops, -1)
}(service, proxier)
return si, nil