Merge pull request #4080 from thockin/proxy-close-race

Fix a race in kube-proxy causing runaways
This commit is contained in:
Tim Hockin 2015-02-03 17:09:10 -08:00
commit bffb418985
2 changed files with 72 additions and 22 deletions

View File

@ -23,6 +23,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -52,11 +53,12 @@ var endpointDialTimeout = []time.Duration{1, 2, 4, 8}
type proxySocket interface { type proxySocket interface {
// Addr gets the net.Addr for a proxySocket. // Addr gets the net.Addr for a proxySocket.
Addr() net.Addr Addr() net.Addr
// Close stops the proxySocket from accepting incoming connections. Each implementation should comment // Close stops the proxySocket from accepting incoming connections.
// on the impact of calling Close while sessions are active. // Each implementation should comment on the impact of calling Close
// while sessions are active.
Close() error Close() error
// ProxyLoop proxies incoming connections for the specified service to the service endpoints. // ProxyLoop proxies incoming connections for the specified service to the service endpoints.
ProxyLoop(service string, proxier *Proxier) ProxyLoop(service string, info *serviceInfo, proxier *Proxier)
} }
// tcpProxySocket implements proxySocket. Close() is implemented by net.Listener. When Close() is called, // tcpProxySocket implements proxySocket. Close() is implemented by net.Listener. When Close() is called,
@ -85,20 +87,19 @@ func tryConnect(service string, srcAddr net.Addr, protocol string, proxier *Prox
return nil, fmt.Errorf("failed to connect to an endpoint.") return nil, fmt.Errorf("failed to connect to an endpoint.")
} }
func (tcp *tcpProxySocket) ProxyLoop(service string, proxier *Proxier) { func (tcp *tcpProxySocket) ProxyLoop(service string, myInfo *serviceInfo, proxier *Proxier) {
for { for {
_, exists := proxier.getServiceInfo(service) if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo {
if !exists { // The service port was closed or replaced.
break break
} }
// Block until a connection is made. // Block until a connection is made.
inConn, err := tcp.Accept() inConn, err := tcp.Accept()
if err != nil { if err != nil {
_, exists := proxier.getServiceInfo(service) if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo {
if !exists {
// Then the service port was just closed so the accept failure is to be expected. // Then the service port was just closed so the accept failure is to be expected.
return break
} }
glog.Errorf("Accept failed: %v", err) glog.Errorf("Accept failed: %v", err)
continue continue
@ -161,12 +162,12 @@ func newClientCache() *clientCache {
return &clientCache{clients: map[string]net.Conn{}} return &clientCache{clients: map[string]net.Conn{}}
} }
func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) { func (udp *udpProxySocket) ProxyLoop(service string, myInfo *serviceInfo, proxier *Proxier) {
activeClients := newClientCache() activeClients := newClientCache()
var buffer [4096]byte // 4KiB should be enough for most whole-packets var buffer [4096]byte // 4KiB should be enough for most whole-packets
for { for {
info, exists := proxier.getServiceInfo(service) if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo {
if !exists { // The service port was closed or replaced.
break break
} }
@ -184,7 +185,7 @@ func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) {
break break
} }
// If this is a client we know already, reuse the connection and goroutine. // If this is a client we know already, reuse the connection and goroutine.
svrConn, err := udp.getBackendConn(activeClients, cliAddr, proxier, service, info.timeout) svrConn, err := udp.getBackendConn(activeClients, cliAddr, proxier, service, myInfo.timeout)
if err != nil { if err != nil {
continue continue
} }
@ -198,7 +199,7 @@ func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) {
} }
continue continue
} }
err = svrConn.SetDeadline(time.Now().Add(info.timeout)) err = svrConn.SetDeadline(time.Now().Add(myInfo.timeout))
if err != nil { if err != nil {
glog.Errorf("SetDeadline failed: %v", err) glog.Errorf("SetDeadline failed: %v", err)
continue continue
@ -267,7 +268,7 @@ func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activ
func logTimeout(err error) bool { func logTimeout(err error) bool {
if e, ok := err.(net.Error); ok { if e, ok := err.(net.Error); ok {
if e.Timeout() { if e.Timeout() {
glog.V(1).Infof("connection to endpoint closed due to inactivity") glog.V(3).Infof("connection to endpoint closed due to inactivity")
return true return true
} }
} }
@ -300,12 +301,13 @@ func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, er
// Proxier is a simple proxy for TCP connections between a localhost:lport // Proxier is a simple proxy for TCP connections between a localhost:lport
// and services that provide the actual implementations. // and services that provide the actual implementations.
type Proxier struct { type Proxier struct {
loadBalancer LoadBalancer loadBalancer LoadBalancer
mu sync.Mutex // protects serviceMap mu sync.Mutex // protects serviceMap
serviceMap map[string]*serviceInfo serviceMap map[string]*serviceInfo
listenIP net.IP numProxyLoops int32 // use atomic ops to access this; mostly for testing
iptables iptables.Interface listenIP net.IP
hostIP net.IP iptables iptables.Interface
hostIP net.IP
} }
// NewProxier returns a new Proxier given a LoadBalancer and an address on // NewProxier returns a new Proxier given a LoadBalancer and an address on
@ -443,7 +445,9 @@ func (proxier *Proxier) addServiceOnPort(service string, protocol api.Protocol,
glog.V(1).Infof("Proxying for service %q on %s port %d", service, protocol, portNum) glog.V(1).Infof("Proxying for service %q on %s port %d", service, protocol, portNum)
go func(service string, proxier *Proxier) { go func(service string, proxier *Proxier) {
defer util.HandleCrash() defer util.HandleCrash()
sock.ProxyLoop(service, proxier) atomic.AddInt32(&proxier.numProxyLoops, 1)
sock.ProxyLoop(service, si, proxier)
atomic.AddInt32(&proxier.numProxyLoops, -1)
}(service, proxier) }(service, proxier)
return si, nil return si, nil

View File

@ -24,6 +24,7 @@ import (
"net/http/httptest" "net/http/httptest"
"net/url" "net/url"
"strconv" "strconv"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -171,6 +172,18 @@ func testEchoUDP(t *testing.T, address string, port int) {
} }
} }
func waitForNumProxyLoops(t *testing.T, p *Proxier, want int32) {
var got int32
for i := 0; i < 4; i++ {
got = atomic.LoadInt32(&p.numProxyLoops)
if got == want {
return
}
time.Sleep(500 * time.Millisecond)
}
t.Errorf("expected %d ProxyLoops running, got %d", want, got)
}
func TestTCPProxy(t *testing.T) { func TestTCPProxy(t *testing.T) {
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
lb.OnUpdate([]api.Endpoints{ lb.OnUpdate([]api.Endpoints{
@ -181,12 +194,14 @@ func TestTCPProxy(t *testing.T) {
}) })
p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}) p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second)
if err != nil { if err != nil {
t.Fatalf("error adding new service: %#v", err) t.Fatalf("error adding new service: %#v", err)
} }
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1)
} }
func TestUDPProxy(t *testing.T) { func TestUDPProxy(t *testing.T) {
@ -199,12 +214,14 @@ func TestUDPProxy(t *testing.T) {
}) })
p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}) p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second)
if err != nil { if err != nil {
t.Fatalf("error adding new service: %#v", err) t.Fatalf("error adding new service: %#v", err)
} }
testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1)
} }
// Helper: Stops the proxy for the named service. // Helper: Stops the proxy for the named service.
@ -226,6 +243,7 @@ func TestTCPProxyStop(t *testing.T) {
}) })
p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}) p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second)
if err != nil { if err != nil {
@ -236,12 +254,14 @@ func TestTCPProxyStop(t *testing.T) {
t.Fatalf("error connecting to proxy: %v", err) t.Fatalf("error connecting to proxy: %v", err)
} }
conn.Close() conn.Close()
waitForNumProxyLoops(t, p, 1)
stopProxyByName(p, "echo") stopProxyByName(p, "echo")
// Wait for the port to really close. // Wait for the port to really close.
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
waitForNumProxyLoops(t, p, 0)
} }
func TestUDPProxyStop(t *testing.T) { func TestUDPProxyStop(t *testing.T) {
@ -254,6 +274,7 @@ func TestUDPProxyStop(t *testing.T) {
}) })
p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}) p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second)
if err != nil { if err != nil {
@ -264,12 +285,14 @@ func TestUDPProxyStop(t *testing.T) {
t.Fatalf("error connecting to proxy: %v", err) t.Fatalf("error connecting to proxy: %v", err)
} }
conn.Close() conn.Close()
waitForNumProxyLoops(t, p, 1)
stopProxyByName(p, "echo") stopProxyByName(p, "echo")
// Wait for the port to really close. // Wait for the port to really close.
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
waitForNumProxyLoops(t, p, 0)
} }
func TestTCPProxyUpdateDelete(t *testing.T) { func TestTCPProxyUpdateDelete(t *testing.T) {
@ -282,6 +305,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
}) })
p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}) p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second)
if err != nil { if err != nil {
@ -292,11 +316,13 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
t.Fatalf("error connecting to proxy: %v", err) t.Fatalf("error connecting to proxy: %v", err)
} }
conn.Close() conn.Close()
waitForNumProxyLoops(t, p, 1)
p.OnUpdate([]api.Service{}) p.OnUpdate([]api.Service{})
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
waitForNumProxyLoops(t, p, 0)
} }
func TestUDPProxyUpdateDelete(t *testing.T) { func TestUDPProxyUpdateDelete(t *testing.T) {
@ -309,6 +335,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
}) })
p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}) p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second)
if err != nil { if err != nil {
@ -319,11 +346,13 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
t.Fatalf("error connecting to proxy: %v", err) t.Fatalf("error connecting to proxy: %v", err)
} }
conn.Close() conn.Close()
waitForNumProxyLoops(t, p, 1)
p.OnUpdate([]api.Service{}) p.OnUpdate([]api.Service{})
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
waitForNumProxyLoops(t, p, 0)
} }
func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
@ -336,6 +365,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
}) })
p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}) p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second)
if err != nil { if err != nil {
@ -346,15 +376,18 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
t.Fatalf("error connecting to proxy: %v", err) t.Fatalf("error connecting to proxy: %v", err)
} }
conn.Close() conn.Close()
waitForNumProxyLoops(t, p, 1)
p.OnUpdate([]api.Service{}) p.OnUpdate([]api.Service{})
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
waitForNumProxyLoops(t, p, 0)
p.OnUpdate([]api.Service{ p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: "echo"}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "TCP", ProxyPort: svcInfo.proxyPort}, Status: api.ServiceStatus{}}, {ObjectMeta: api.ObjectMeta{Name: "echo"}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "TCP", ProxyPort: svcInfo.proxyPort}, Status: api.ServiceStatus{}},
}) })
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1)
} }
func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
@ -367,6 +400,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
}) })
p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}) p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second)
if err != nil { if err != nil {
@ -377,15 +411,18 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
t.Fatalf("error connecting to proxy: %v", err) t.Fatalf("error connecting to proxy: %v", err)
} }
conn.Close() conn.Close()
waitForNumProxyLoops(t, p, 1)
p.OnUpdate([]api.Service{}) p.OnUpdate([]api.Service{})
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
waitForNumProxyLoops(t, p, 0)
p.OnUpdate([]api.Service{ p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: "echo"}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "UDP", ProxyPort: svcInfo.proxyPort}, Status: api.ServiceStatus{}}, {ObjectMeta: api.ObjectMeta{Name: "echo"}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "UDP", ProxyPort: svcInfo.proxyPort}, Status: api.ServiceStatus{}},
}) })
testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1)
} }
func TestTCPProxyUpdatePort(t *testing.T) { func TestTCPProxyUpdatePort(t *testing.T) {
@ -398,11 +435,14 @@ func TestTCPProxyUpdatePort(t *testing.T) {
}) })
p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}) p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second)
if err != nil { if err != nil {
t.Fatalf("error adding new service: %#v", err) t.Fatalf("error adding new service: %#v", err)
} }
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1)
// add a new dummy listener in order to get a port that is free // add a new dummy listener in order to get a port that is free
l, _ := net.Listen("tcp", ":0") l, _ := net.Listen("tcp", ":0")
@ -424,6 +464,9 @@ func TestTCPProxyUpdatePort(t *testing.T) {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
testEchoTCP(t, "127.0.0.1", newPort) testEchoTCP(t, "127.0.0.1", newPort)
// This is a bit async, but this should be sufficient.
time.Sleep(500 * time.Millisecond)
waitForNumProxyLoops(t, p, 1)
// Ensure the old port is released and re-usable. // Ensure the old port is released and re-usable.
l, err = net.Listen("tcp", joinHostPort("", svcInfo.proxyPort)) l, err = net.Listen("tcp", joinHostPort("", svcInfo.proxyPort))
@ -443,11 +486,13 @@ func TestUDPProxyUpdatePort(t *testing.T) {
}) })
p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}) p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second)
if err != nil { if err != nil {
t.Fatalf("error adding new service: %#v", err) t.Fatalf("error adding new service: %#v", err)
} }
waitForNumProxyLoops(t, p, 1)
// add a new dummy listener in order to get a port that is free // add a new dummy listener in order to get a port that is free
pc, _ := net.ListenPacket("udp", ":0") pc, _ := net.ListenPacket("udp", ":0")
@ -469,6 +514,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
testEchoUDP(t, "127.0.0.1", newPort) testEchoUDP(t, "127.0.0.1", newPort)
waitForNumProxyLoops(t, p, 1)
// Ensure the old port is released and re-usable. // Ensure the old port is released and re-usable.
pc, err = net.ListenPacket("udp", joinHostPort("", svcInfo.proxyPort)) pc, err = net.ListenPacket("udp", joinHostPort("", svcInfo.proxyPort))