diff --git a/pkg/proxy/proxier.go b/pkg/proxy/proxier.go index 85a5c03c39a..5c0f1d604a9 100644 --- a/pkg/proxy/proxier.go +++ b/pkg/proxy/proxier.go @@ -23,6 +23,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -52,11 +53,12 @@ var endpointDialTimeout = []time.Duration{1, 2, 4, 8} type proxySocket interface { // Addr gets the net.Addr for a proxySocket. Addr() net.Addr - // Close stops the proxySocket from accepting incoming connections. Each implementation should comment - // on the impact of calling Close while sessions are active. + // Close stops the proxySocket from accepting incoming connections. + // Each implementation should comment on the impact of calling Close + // while sessions are active. Close() error // ProxyLoop proxies incoming connections for the specified service to the service endpoints. - ProxyLoop(service string, proxier *Proxier) + ProxyLoop(service string, info *serviceInfo, proxier *Proxier) } // tcpProxySocket implements proxySocket. Close() is implemented by net.Listener. When Close() is called, @@ -85,20 +87,19 @@ func tryConnect(service string, srcAddr net.Addr, protocol string, proxier *Prox return nil, fmt.Errorf("failed to connect to an endpoint.") } -func (tcp *tcpProxySocket) ProxyLoop(service string, proxier *Proxier) { +func (tcp *tcpProxySocket) ProxyLoop(service string, myInfo *serviceInfo, proxier *Proxier) { for { - _, exists := proxier.getServiceInfo(service) - if !exists { + if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo { + // The service port was closed or replaced. break } // Block until a connection is made. inConn, err := tcp.Accept() if err != nil { - _, exists := proxier.getServiceInfo(service) - if !exists { + if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo { // Then the service port was just closed so the accept failure is to be expected. - return + break } glog.Errorf("Accept failed: %v", err) continue @@ -161,12 +162,12 @@ func newClientCache() *clientCache { return &clientCache{clients: map[string]net.Conn{}} } -func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) { +func (udp *udpProxySocket) ProxyLoop(service string, myInfo *serviceInfo, proxier *Proxier) { activeClients := newClientCache() var buffer [4096]byte // 4KiB should be enough for most whole-packets for { - info, exists := proxier.getServiceInfo(service) - if !exists { + if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo { + // The service port was closed or replaced. break } @@ -184,7 +185,7 @@ func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) { break } // If this is a client we know already, reuse the connection and goroutine. - svrConn, err := udp.getBackendConn(activeClients, cliAddr, proxier, service, info.timeout) + svrConn, err := udp.getBackendConn(activeClients, cliAddr, proxier, service, myInfo.timeout) if err != nil { continue } @@ -198,7 +199,7 @@ func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) { } continue } - err = svrConn.SetDeadline(time.Now().Add(info.timeout)) + err = svrConn.SetDeadline(time.Now().Add(myInfo.timeout)) if err != nil { glog.Errorf("SetDeadline failed: %v", err) continue @@ -267,7 +268,7 @@ func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activ func logTimeout(err error) bool { if e, ok := err.(net.Error); ok { if e.Timeout() { - glog.V(1).Infof("connection to endpoint closed due to inactivity") + glog.V(3).Infof("connection to endpoint closed due to inactivity") return true } } @@ -300,12 +301,13 @@ func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, er // Proxier is a simple proxy for TCP connections between a localhost:lport // and services that provide the actual implementations. type Proxier struct { - loadBalancer LoadBalancer - mu sync.Mutex // protects serviceMap - serviceMap map[string]*serviceInfo - listenIP net.IP - iptables iptables.Interface - hostIP net.IP + loadBalancer LoadBalancer + mu sync.Mutex // protects serviceMap + serviceMap map[string]*serviceInfo + numProxyLoops int32 // use atomic ops to access this; mostly for testing + listenIP net.IP + iptables iptables.Interface + hostIP net.IP } // NewProxier returns a new Proxier given a LoadBalancer and an address on @@ -443,7 +445,9 @@ func (proxier *Proxier) addServiceOnPort(service string, protocol api.Protocol, glog.V(1).Infof("Proxying for service %q on %s port %d", service, protocol, portNum) go func(service string, proxier *Proxier) { defer util.HandleCrash() - sock.ProxyLoop(service, proxier) + atomic.AddInt32(&proxier.numProxyLoops, 1) + sock.ProxyLoop(service, si, proxier) + atomic.AddInt32(&proxier.numProxyLoops, -1) }(service, proxier) return si, nil diff --git a/pkg/proxy/proxier_test.go b/pkg/proxy/proxier_test.go index 1eec884f55d..81e7345246a 100644 --- a/pkg/proxy/proxier_test.go +++ b/pkg/proxy/proxier_test.go @@ -24,6 +24,7 @@ import ( "net/http/httptest" "net/url" "strconv" + "sync/atomic" "testing" "time" @@ -171,6 +172,18 @@ func testEchoUDP(t *testing.T, address string, port int) { } } +func waitForNumProxyLoops(t *testing.T, p *Proxier, want int32) { + var got int32 + for i := 0; i < 4; i++ { + got = atomic.LoadInt32(&p.numProxyLoops) + if got == want { + return + } + time.Sleep(500 * time.Millisecond) + } + t.Errorf("expected %d ProxyLoops running, got %d", want, got) +} + func TestTCPProxy(t *testing.T) { lb := NewLoadBalancerRR() lb.OnUpdate([]api.Endpoints{ @@ -181,12 +194,14 @@ func TestTCPProxy(t *testing.T) { }) p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}) + waitForNumProxyLoops(t, p, 0) svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) + waitForNumProxyLoops(t, p, 1) } func TestUDPProxy(t *testing.T) { @@ -199,12 +214,14 @@ func TestUDPProxy(t *testing.T) { }) p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}) + waitForNumProxyLoops(t, p, 0) svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) + waitForNumProxyLoops(t, p, 1) } // Helper: Stops the proxy for the named service. @@ -226,6 +243,7 @@ func TestTCPProxyStop(t *testing.T) { }) p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}) + waitForNumProxyLoops(t, p, 0) svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) if err != nil { @@ -236,12 +254,14 @@ func TestTCPProxyStop(t *testing.T) { t.Fatalf("error connecting to proxy: %v", err) } conn.Close() + waitForNumProxyLoops(t, p, 1) stopProxyByName(p, "echo") // Wait for the port to really close. if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } + waitForNumProxyLoops(t, p, 0) } func TestUDPProxyStop(t *testing.T) { @@ -254,6 +274,7 @@ func TestUDPProxyStop(t *testing.T) { }) p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}) + waitForNumProxyLoops(t, p, 0) svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) if err != nil { @@ -264,12 +285,14 @@ func TestUDPProxyStop(t *testing.T) { t.Fatalf("error connecting to proxy: %v", err) } conn.Close() + waitForNumProxyLoops(t, p, 1) stopProxyByName(p, "echo") // Wait for the port to really close. if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } + waitForNumProxyLoops(t, p, 0) } func TestTCPProxyUpdateDelete(t *testing.T) { @@ -282,6 +305,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) { }) p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}) + waitForNumProxyLoops(t, p, 0) svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) if err != nil { @@ -292,11 +316,13 @@ func TestTCPProxyUpdateDelete(t *testing.T) { t.Fatalf("error connecting to proxy: %v", err) } conn.Close() + waitForNumProxyLoops(t, p, 1) p.OnUpdate([]api.Service{}) if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } + waitForNumProxyLoops(t, p, 0) } func TestUDPProxyUpdateDelete(t *testing.T) { @@ -309,6 +335,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) { }) p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}) + waitForNumProxyLoops(t, p, 0) svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) if err != nil { @@ -319,11 +346,13 @@ func TestUDPProxyUpdateDelete(t *testing.T) { t.Fatalf("error connecting to proxy: %v", err) } conn.Close() + waitForNumProxyLoops(t, p, 1) p.OnUpdate([]api.Service{}) if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } + waitForNumProxyLoops(t, p, 0) } func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { @@ -336,6 +365,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { }) p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}) + waitForNumProxyLoops(t, p, 0) svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) if err != nil { @@ -346,15 +376,18 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { t.Fatalf("error connecting to proxy: %v", err) } conn.Close() + waitForNumProxyLoops(t, p, 1) p.OnUpdate([]api.Service{}) if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } + waitForNumProxyLoops(t, p, 0) p.OnUpdate([]api.Service{ {ObjectMeta: api.ObjectMeta{Name: "echo"}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "TCP", ProxyPort: svcInfo.proxyPort}, Status: api.ServiceStatus{}}, }) testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) + waitForNumProxyLoops(t, p, 1) } func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { @@ -367,6 +400,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { }) p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}) + waitForNumProxyLoops(t, p, 0) svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) if err != nil { @@ -377,15 +411,18 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { t.Fatalf("error connecting to proxy: %v", err) } conn.Close() + waitForNumProxyLoops(t, p, 1) p.OnUpdate([]api.Service{}) if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } + waitForNumProxyLoops(t, p, 0) p.OnUpdate([]api.Service{ {ObjectMeta: api.ObjectMeta{Name: "echo"}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "UDP", ProxyPort: svcInfo.proxyPort}, Status: api.ServiceStatus{}}, }) testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) + waitForNumProxyLoops(t, p, 1) } func TestTCPProxyUpdatePort(t *testing.T) { @@ -398,11 +435,14 @@ func TestTCPProxyUpdatePort(t *testing.T) { }) p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}) + waitForNumProxyLoops(t, p, 0) svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } + testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) + waitForNumProxyLoops(t, p, 1) // add a new dummy listener in order to get a port that is free l, _ := net.Listen("tcp", ":0") @@ -424,6 +464,9 @@ func TestTCPProxyUpdatePort(t *testing.T) { t.Fatalf(err.Error()) } testEchoTCP(t, "127.0.0.1", newPort) + // This is a bit async, but this should be sufficient. + time.Sleep(500 * time.Millisecond) + waitForNumProxyLoops(t, p, 1) // Ensure the old port is released and re-usable. l, err = net.Listen("tcp", joinHostPort("", svcInfo.proxyPort)) @@ -443,11 +486,13 @@ func TestUDPProxyUpdatePort(t *testing.T) { }) p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}) + waitForNumProxyLoops(t, p, 0) svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } + waitForNumProxyLoops(t, p, 1) // add a new dummy listener in order to get a port that is free pc, _ := net.ListenPacket("udp", ":0") @@ -469,6 +514,7 @@ func TestUDPProxyUpdatePort(t *testing.T) { t.Fatalf(err.Error()) } testEchoUDP(t, "127.0.0.1", newPort) + waitForNumProxyLoops(t, p, 1) // Ensure the old port is released and re-usable. pc, err = net.ListenPacket("udp", joinHostPort("", svcInfo.proxyPort))