From 448a4b7d69e85267f6949e3c79b7a6e1c44ba36e Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Wed, 30 Jul 2014 06:52:03 -0700 Subject: [PATCH 1/3] Revert "Revert "Add support for stopping a proxier."" This reverts commit 19beaf71b576b557f65960ec35086b9580a91221. --- pkg/proxy/proxier.go | 116 +++++++++++++++++++++++++++++++----- pkg/proxy/proxier_test.go | 122 ++++++++++++++++++++++++++++++++++---- 2 files changed, 214 insertions(+), 24 deletions(-) diff --git a/pkg/proxy/proxier.go b/pkg/proxy/proxier.go index a9c4ddc88fe..280685b3451 100644 --- a/pkg/proxy/proxier.go +++ b/pkg/proxy/proxier.go @@ -20,22 +20,34 @@ import ( "fmt" "io" "net" + "strconv" + "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/golang/glog" ) +type serviceInfo struct { + port int + active bool + listener net.Listener + lock sync.Mutex +} + // Proxier is a simple proxy for tcp connections between a localhost:lport and services that provide // the actual implementations. type Proxier struct { loadBalancer LoadBalancer - serviceMap map[string]int + serviceMap map[string]*serviceInfo + // protects 'serviceMap' + serviceLock sync.Mutex } // NewProxier returns a newly created and correctly initialized instance of Proxier. func NewProxier(loadBalancer LoadBalancer) *Proxier { - return &Proxier{loadBalancer: loadBalancer, serviceMap: make(map[string]int)} + return &Proxier{loadBalancer: loadBalancer, serviceMap: make(map[string]*serviceInfo)} } func copyBytes(in, out *net.TCPConn) { @@ -59,10 +71,52 @@ func proxyConnection(in, out *net.TCPConn) { go copyBytes(out, in) } +// StopProxy stops a proxy for the named service. It stops the proxy loop and closes the socket. +func (proxier *Proxier) StopProxy(service string) error { + // TODO: delete from map here? + info, found := proxier.getServiceInfo(service) + if !found { + return fmt.Errorf("unknown service: %s", service) + } + info.lock.Lock() + defer info.lock.Unlock() + return proxier.stopProxyInternal(info) +} + +// Requires that info.lock be held before calling. +func (proxier *Proxier) stopProxyInternal(info *serviceInfo) error { + info.active = false + return info.listener.Close() +} + +func (proxier *Proxier) getServiceInfo(service string) (*serviceInfo, bool) { + proxier.serviceLock.Lock() + defer proxier.serviceLock.Unlock() + info, ok := proxier.serviceMap[service] + return info, ok +} + +func (proxier *Proxier) setServiceInfo(service string, info *serviceInfo) { + proxier.serviceLock.Lock() + defer proxier.serviceLock.Unlock() + proxier.serviceMap[service] = info +} + // AcceptHandler begins accepting incoming connections from listener and proxying the connections to the load-balanced endpoints. // It never returns. -func (proxier Proxier) AcceptHandler(service string, listener net.Listener) { +func (proxier *Proxier) AcceptHandler(service string, listener net.Listener) { + info, found := proxier.getServiceInfo(service) + if !found { + glog.Errorf("Failed to find service: %s", service) + return + } for { + info.lock.Lock() + if !info.active { + info.lock.Unlock() + break + } + info.lock.Unlock() inConn, err := listener.Accept() if err != nil { glog.Errorf("Accept failed: %v", err) @@ -92,30 +146,42 @@ func (proxier Proxier) AcceptHandler(service string, listener net.Listener) { } // addService starts listening for a new service on a given port. -func (proxier Proxier) addService(service string, port int) error { +func (proxier *Proxier) addService(service string, port int) (net.Listener, error) { // Make sure we can start listening on the port before saying all's well. l, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { - return err + return nil, err } proxier.addServiceCommon(service, l) - return nil + return l, nil } // addService starts listening for a new service, returning the port it's using. // For testing on a system with unknown ports used. -func (proxier Proxier) addServiceOnUnusedPort(service string) (string, error) { +func (proxier *Proxier) addServiceOnUnusedPort(service string) (string, error) { // Make sure we can start listening on the port before saying all's well. l, err := net.Listen("tcp", ":0") if err != nil { return "", err } - proxier.addServiceCommon(service, l) _, port, err := net.SplitHostPort(l.Addr().String()) + if err != nil { + return "", err + } + portNum, err := strconv.Atoi(port) + if err != nil { + return "", err + } + proxier.setServiceInfo(service, &serviceInfo{ + port: portNum, + active: true, + listener: l, + }) + proxier.addServiceCommon(service, l) return port, nil } -func (proxier Proxier) addServiceCommon(service string, l net.Listener) { +func (proxier *Proxier) addServiceCommon(service string, l net.Listener) { glog.Infof("Listening for %s on %s", service, l.Addr().String()) // If that succeeds, start the accepting loop. go proxier.AcceptHandler(service, l) @@ -123,19 +189,41 @@ func (proxier Proxier) addServiceCommon(service string, l net.Listener) { // OnUpdate receives update notices for the updated services and start listening newly added services. // It implements "github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config".ServiceConfigHandler.OnUpdate. -func (proxier Proxier) OnUpdate(services []api.Service) { +func (proxier *Proxier) OnUpdate(services []api.Service) { glog.Infof("Received update notice: %+v", services) + serviceNames := util.StringSet{} + for _, service := range services { - port, exists := proxier.serviceMap[service.ID] - if exists && port == service.Port { + serviceNames.Insert(service.ID) + info, exists := proxier.getServiceInfo(service.ID) + if exists && info.port == service.Port { continue } + if exists { + // Stop the old proxier. + proxier.StopProxy(service.ID) + } glog.Infof("Adding a new service %s on port %d", service.ID, service.Port) - err := proxier.addService(service.ID, service.Port) + listener, err := proxier.addService(service.ID, service.Port) if err != nil { glog.Infof("Failed to start listening for %s on %d", service.ID, service.Port) continue } - proxier.serviceMap[service.ID] = service.Port + proxier.setServiceInfo(service.ID, &serviceInfo{ + port: service.Port, + active: true, + listener: listener, + }) + } + + proxier.serviceLock.Lock() + defer proxier.serviceLock.Unlock() + for name, info := range proxier.serviceMap { + info.lock.Lock() + if !serviceNames.Has(name) && info.active { + glog.Infof("Removing service: %s", name) + proxier.stopProxyInternal(info) + } + info.lock.Unlock() } } diff --git a/pkg/proxy/proxier_test.go b/pkg/proxy/proxier_test.go index 8489d8d358f..1cb8c7b8af2 100644 --- a/pkg/proxy/proxier_test.go +++ b/pkg/proxy/proxier_test.go @@ -20,7 +20,9 @@ import ( "fmt" "io" "net" + "strconv" "testing" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" ) @@ -45,6 +47,24 @@ func echoServer(t *testing.T, addr string) (string, error) { return port, err } +func testEchoConnection(t *testing.T, address, port string) { + conn, err := net.Dial("tcp", net.JoinHostPort(address, port)) + if err != nil { + t.Fatalf("error connecting to proxy: %v", err) + } + magic := "aaaaa" + if _, err := conn.Write([]byte(magic)); err != nil { + t.Fatalf("error writing to proxy: %v", err) + } + buf := make([]byte, 5) + if _, err := conn.Read(buf); err != nil { + t.Fatalf("error reading from proxy: %v", err) + } + if string(buf) != magic { + t.Fatalf("bad echo from proxy: got: %q, expected %q", string(buf), magic) + } +} + func TestProxy(t *testing.T) { port, err := echoServer(t, "127.0.0.1:0") if err != nil { @@ -57,6 +77,24 @@ func TestProxy(t *testing.T) { p := NewProxier(lb) + proxyPort, err := p.addServiceOnUnusedPort("echo") + if err != nil { + t.Fatalf("error adding new service: %#v", err) + } + testEchoConnection(t, "127.0.0.1", proxyPort) +} + +func TestProxyStop(t *testing.T) { + port, err := echoServer(t, "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + + lb := NewLoadBalancerRR() + lb.OnUpdate([]api.Endpoints{{"echo", []string{net.JoinHostPort("127.0.0.1", port)}}}) + + p := NewProxier(lb) + proxyPort, err := p.addServiceOnUnusedPort("echo") if err != nil { t.Fatalf("error adding new service: %#v", err) @@ -65,15 +103,79 @@ func TestProxy(t *testing.T) { if err != nil { t.Fatalf("error connecting to proxy: %v", err) } - magic := "aaaaa" - if _, err := conn.Write([]byte(magic)); err != nil { - t.Fatalf("error writing to proxy: %v", err) - } - buf := make([]byte, 5) - if _, err := conn.Read(buf); err != nil { - t.Fatalf("error reading from proxy: %v", err) - } - if string(buf) != magic { - t.Fatalf("bad echo from proxy: got: %q, expected %q", string(buf), magic) + conn.Close() + + p.StopProxy("echo") + time.Sleep(2 * time.Second) + _, err = net.Dial("tcp", net.JoinHostPort("127.0.0.1", proxyPort)) + if err == nil { + t.Fatalf("Unexpected non-error.") } } + +func TestProxyUpdateDelete(t *testing.T) { + port, err := echoServer(t, "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + + lb := NewLoadBalancerRR() + lb.OnUpdate([]api.Endpoints{{"echo", []string{net.JoinHostPort("127.0.0.1", port)}}}) + + p := NewProxier(lb) + + proxyPort, err := p.addServiceOnUnusedPort("echo") + if err != nil { + t.Fatalf("error adding new service: %#v", err) + } + conn, err := net.Dial("tcp", net.JoinHostPort("127.0.0.1", proxyPort)) + if err != nil { + t.Fatalf("error connecting to proxy: %v", err) + } + conn.Close() + + p.OnUpdate([]api.Service{}) + time.Sleep(2 * time.Second) + _, err = net.Dial("tcp", net.JoinHostPort("127.0.0.1", proxyPort)) + if err == nil { + t.Fatalf("Unexpected non-error.") + } +} + +func TestProxyUpdatePort(t *testing.T) { + port, err := echoServer(t, "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + + lb := NewLoadBalancerRR() + lb.OnUpdate([]api.Endpoints{{"echo", []string{net.JoinHostPort("127.0.0.1", port)}}}) + + p := NewProxier(lb) + + proxyPort, err := p.addServiceOnUnusedPort("echo") + if err != nil { + t.Fatalf("error adding new service: %#v", err) + } + conn, err := net.Dial("tcp", net.JoinHostPort("127.0.0.1", proxyPort)) + if err != nil { + t.Fatalf("error connecting to proxy: %v", err) + } + conn.Close() + + // add a new dummy listener in order to get a port that is free + l, _ := net.Listen("tcp", ":0") + _, port, _ = net.SplitHostPort(l.Addr().String()) + portNum, _ := strconv.Atoi(port) + l.Close() + + p.OnUpdate([]api.Service{ + {JSONBase: api.JSONBase{ID: "echo"}, Port: portNum}, + }) + time.Sleep(2 * time.Second) + _, err = net.Dial("tcp", net.JoinHostPort("127.0.0.1", proxyPort)) + if err == nil { + t.Fatalf("Unexpected non-error.") + } + testEchoConnection(t, "127.0.0.1", port) +} From 9519a8049bf92f4a5f3765cd2d1e52e4ba64180e Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Wed, 30 Jul 2014 06:56:42 -0700 Subject: [PATCH 2/3] Fixed tests. --- pkg/proxy/proxier.go | 5 +++++ pkg/proxy/proxier_test.go | 14 ++++++-------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/pkg/proxy/proxier.go b/pkg/proxy/proxier.go index 280685b3451..a300fb53105 100644 --- a/pkg/proxy/proxier.go +++ b/pkg/proxy/proxier.go @@ -156,9 +156,14 @@ func (proxier *Proxier) addService(service string, port int) (net.Listener, erro return l, nil } +// used to globally lock around unused ports. Only used in testing. +var unusedPortLock sync.Mutex + // addService starts listening for a new service, returning the port it's using. // For testing on a system with unknown ports used. func (proxier *Proxier) addServiceOnUnusedPort(service string) (string, error) { + unusedPortLock.Lock() + defer unusedPortLock.Unlock() // Make sure we can start listening on the port before saying all's well. l, err := net.Listen("tcp", ":0") if err != nil { diff --git a/pkg/proxy/proxier_test.go b/pkg/proxy/proxier_test.go index 1cb8c7b8af2..e62907b0f57 100644 --- a/pkg/proxy/proxier_test.go +++ b/pkg/proxy/proxier_test.go @@ -91,7 +91,7 @@ func TestProxyStop(t *testing.T) { } lb := NewLoadBalancerRR() - lb.OnUpdate([]api.Endpoints{{"echo", []string{net.JoinHostPort("127.0.0.1", port)}}}) + lb.OnUpdate([]api.Endpoints{{JSONBase: api.JSONBase{ID: "echo"}, Endpoints: []string{net.JoinHostPort("127.0.0.1", port)}}}) p := NewProxier(lb) @@ -106,6 +106,7 @@ func TestProxyStop(t *testing.T) { conn.Close() p.StopProxy("echo") + // Wait for the port to really close. time.Sleep(2 * time.Second) _, err = net.Dial("tcp", net.JoinHostPort("127.0.0.1", proxyPort)) if err == nil { @@ -120,7 +121,7 @@ func TestProxyUpdateDelete(t *testing.T) { } lb := NewLoadBalancerRR() - lb.OnUpdate([]api.Endpoints{{"echo", []string{net.JoinHostPort("127.0.0.1", port)}}}) + lb.OnUpdate([]api.Endpoints{{JSONBase: api.JSONBase{ID: "echo"}, Endpoints: []string{net.JoinHostPort("127.0.0.1", port)}}}) p := NewProxier(lb) @@ -149,7 +150,7 @@ func TestProxyUpdatePort(t *testing.T) { } lb := NewLoadBalancerRR() - lb.OnUpdate([]api.Endpoints{{"echo", []string{net.JoinHostPort("127.0.0.1", port)}}}) + lb.OnUpdate([]api.Endpoints{{JSONBase: api.JSONBase{ID: "echo"}, Endpoints: []string{net.JoinHostPort("127.0.0.1", port)}}}) p := NewProxier(lb) @@ -157,11 +158,6 @@ func TestProxyUpdatePort(t *testing.T) { if err != nil { t.Fatalf("error adding new service: %#v", err) } - conn, err := net.Dial("tcp", net.JoinHostPort("127.0.0.1", proxyPort)) - if err != nil { - t.Fatalf("error connecting to proxy: %v", err) - } - conn.Close() // add a new dummy listener in order to get a port that is free l, _ := net.Listen("tcp", ":0") @@ -169,6 +165,8 @@ func TestProxyUpdatePort(t *testing.T) { portNum, _ := strconv.Atoi(port) l.Close() + // Wait for the socket to actually get free. + time.Sleep(2 * time.Second) p.OnUpdate([]api.Service{ {JSONBase: api.JSONBase{ID: "echo"}, Port: portNum}, }) From 2cbe2c18d464eb6c1b96abee79bbc32a1a24c325 Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Wed, 30 Jul 2014 15:06:56 -0700 Subject: [PATCH 3/3] Add a comment to tickle travis. --- pkg/proxy/proxier_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/proxy/proxier_test.go b/pkg/proxy/proxier_test.go index e62907b0f57..5a94b623fb3 100644 --- a/pkg/proxy/proxier_test.go +++ b/pkg/proxy/proxier_test.go @@ -136,6 +136,7 @@ func TestProxyUpdateDelete(t *testing.T) { conn.Close() p.OnUpdate([]api.Service{}) + // Wait for the port to close. time.Sleep(2 * time.Second) _, err = net.Dial("tcp", net.JoinHostPort("127.0.0.1", proxyPort)) if err == nil {