diff --git a/pkg/proxy/proxier.go b/pkg/proxy/proxier.go index 280685b3451..a9c4ddc88fe 100644 --- a/pkg/proxy/proxier.go +++ b/pkg/proxy/proxier.go @@ -20,34 +20,22 @@ import ( "fmt" "io" "net" - "strconv" - "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/golang/glog" ) -type serviceInfo struct { - port int - active bool - listener net.Listener - lock sync.Mutex -} - // Proxier is a simple proxy for tcp connections between a localhost:lport and services that provide // the actual implementations. type Proxier struct { loadBalancer LoadBalancer - serviceMap map[string]*serviceInfo - // protects 'serviceMap' - serviceLock sync.Mutex + serviceMap map[string]int } // NewProxier returns a newly created and correctly initialized instance of Proxier. func NewProxier(loadBalancer LoadBalancer) *Proxier { - return &Proxier{loadBalancer: loadBalancer, serviceMap: make(map[string]*serviceInfo)} + return &Proxier{loadBalancer: loadBalancer, serviceMap: make(map[string]int)} } func copyBytes(in, out *net.TCPConn) { @@ -71,52 +59,10 @@ func proxyConnection(in, out *net.TCPConn) { go copyBytes(out, in) } -// StopProxy stops a proxy for the named service. It stops the proxy loop and closes the socket. -func (proxier *Proxier) StopProxy(service string) error { - // TODO: delete from map here? - info, found := proxier.getServiceInfo(service) - if !found { - return fmt.Errorf("unknown service: %s", service) - } - info.lock.Lock() - defer info.lock.Unlock() - return proxier.stopProxyInternal(info) -} - -// Requires that info.lock be held before calling. -func (proxier *Proxier) stopProxyInternal(info *serviceInfo) error { - info.active = false - return info.listener.Close() -} - -func (proxier *Proxier) getServiceInfo(service string) (*serviceInfo, bool) { - proxier.serviceLock.Lock() - defer proxier.serviceLock.Unlock() - info, ok := proxier.serviceMap[service] - return info, ok -} - -func (proxier *Proxier) setServiceInfo(service string, info *serviceInfo) { - proxier.serviceLock.Lock() - defer proxier.serviceLock.Unlock() - proxier.serviceMap[service] = info -} - // AcceptHandler begins accepting incoming connections from listener and proxying the connections to the load-balanced endpoints. // It never returns. -func (proxier *Proxier) AcceptHandler(service string, listener net.Listener) { - info, found := proxier.getServiceInfo(service) - if !found { - glog.Errorf("Failed to find service: %s", service) - return - } +func (proxier Proxier) AcceptHandler(service string, listener net.Listener) { for { - info.lock.Lock() - if !info.active { - info.lock.Unlock() - break - } - info.lock.Unlock() inConn, err := listener.Accept() if err != nil { glog.Errorf("Accept failed: %v", err) @@ -146,42 +92,30 @@ func (proxier *Proxier) AcceptHandler(service string, listener net.Listener) { } // addService starts listening for a new service on a given port. -func (proxier *Proxier) addService(service string, port int) (net.Listener, error) { +func (proxier Proxier) addService(service string, port int) error { // Make sure we can start listening on the port before saying all's well. l, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { - return nil, err + return err } proxier.addServiceCommon(service, l) - return l, nil + return nil } // addService starts listening for a new service, returning the port it's using. // For testing on a system with unknown ports used. -func (proxier *Proxier) addServiceOnUnusedPort(service string) (string, error) { +func (proxier Proxier) addServiceOnUnusedPort(service string) (string, error) { // Make sure we can start listening on the port before saying all's well. l, err := net.Listen("tcp", ":0") if err != nil { return "", err } - _, port, err := net.SplitHostPort(l.Addr().String()) - if err != nil { - return "", err - } - portNum, err := strconv.Atoi(port) - if err != nil { - return "", err - } - proxier.setServiceInfo(service, &serviceInfo{ - port: portNum, - active: true, - listener: l, - }) proxier.addServiceCommon(service, l) + _, port, err := net.SplitHostPort(l.Addr().String()) return port, nil } -func (proxier *Proxier) addServiceCommon(service string, l net.Listener) { +func (proxier Proxier) addServiceCommon(service string, l net.Listener) { glog.Infof("Listening for %s on %s", service, l.Addr().String()) // If that succeeds, start the accepting loop. go proxier.AcceptHandler(service, l) @@ -189,41 +123,19 @@ func (proxier *Proxier) addServiceCommon(service string, l net.Listener) { // OnUpdate receives update notices for the updated services and start listening newly added services. // It implements "github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config".ServiceConfigHandler.OnUpdate. -func (proxier *Proxier) OnUpdate(services []api.Service) { +func (proxier Proxier) OnUpdate(services []api.Service) { glog.Infof("Received update notice: %+v", services) - serviceNames := util.StringSet{} - for _, service := range services { - serviceNames.Insert(service.ID) - info, exists := proxier.getServiceInfo(service.ID) - if exists && info.port == service.Port { + port, exists := proxier.serviceMap[service.ID] + if exists && port == service.Port { continue } - if exists { - // Stop the old proxier. - proxier.StopProxy(service.ID) - } glog.Infof("Adding a new service %s on port %d", service.ID, service.Port) - listener, err := proxier.addService(service.ID, service.Port) + err := proxier.addService(service.ID, service.Port) if err != nil { glog.Infof("Failed to start listening for %s on %d", service.ID, service.Port) continue } - proxier.setServiceInfo(service.ID, &serviceInfo{ - port: service.Port, - active: true, - listener: listener, - }) - } - - proxier.serviceLock.Lock() - defer proxier.serviceLock.Unlock() - for name, info := range proxier.serviceMap { - info.lock.Lock() - if !serviceNames.Has(name) && info.active { - glog.Infof("Removing service: %s", name) - proxier.stopProxyInternal(info) - } - info.lock.Unlock() + proxier.serviceMap[service.ID] = service.Port } } diff --git a/pkg/proxy/proxier_test.go b/pkg/proxy/proxier_test.go index 1cb8c7b8af2..8489d8d358f 100644 --- a/pkg/proxy/proxier_test.go +++ b/pkg/proxy/proxier_test.go @@ -20,9 +20,7 @@ import ( "fmt" "io" "net" - "strconv" "testing" - "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" ) @@ -47,24 +45,6 @@ func echoServer(t *testing.T, addr string) (string, error) { return port, err } -func testEchoConnection(t *testing.T, address, port string) { - conn, err := net.Dial("tcp", net.JoinHostPort(address, port)) - if err != nil { - t.Fatalf("error connecting to proxy: %v", err) - } - magic := "aaaaa" - if _, err := conn.Write([]byte(magic)); err != nil { - t.Fatalf("error writing to proxy: %v", err) - } - buf := make([]byte, 5) - if _, err := conn.Read(buf); err != nil { - t.Fatalf("error reading from proxy: %v", err) - } - if string(buf) != magic { - t.Fatalf("bad echo from proxy: got: %q, expected %q", string(buf), magic) - } -} - func TestProxy(t *testing.T) { port, err := echoServer(t, "127.0.0.1:0") if err != nil { @@ -77,24 +57,6 @@ func TestProxy(t *testing.T) { p := NewProxier(lb) - proxyPort, err := p.addServiceOnUnusedPort("echo") - if err != nil { - t.Fatalf("error adding new service: %#v", err) - } - testEchoConnection(t, "127.0.0.1", proxyPort) -} - -func TestProxyStop(t *testing.T) { - port, err := echoServer(t, "127.0.0.1:0") - if err != nil { - t.Fatal(err) - } - - lb := NewLoadBalancerRR() - lb.OnUpdate([]api.Endpoints{{"echo", []string{net.JoinHostPort("127.0.0.1", port)}}}) - - p := NewProxier(lb) - proxyPort, err := p.addServiceOnUnusedPort("echo") if err != nil { t.Fatalf("error adding new service: %#v", err) @@ -103,79 +65,15 @@ func TestProxyStop(t *testing.T) { if err != nil { t.Fatalf("error connecting to proxy: %v", err) } - conn.Close() - - p.StopProxy("echo") - time.Sleep(2 * time.Second) - _, err = net.Dial("tcp", net.JoinHostPort("127.0.0.1", proxyPort)) - if err == nil { - t.Fatalf("Unexpected non-error.") + magic := "aaaaa" + if _, err := conn.Write([]byte(magic)); err != nil { + t.Fatalf("error writing to proxy: %v", err) + } + buf := make([]byte, 5) + if _, err := conn.Read(buf); err != nil { + t.Fatalf("error reading from proxy: %v", err) + } + if string(buf) != magic { + t.Fatalf("bad echo from proxy: got: %q, expected %q", string(buf), magic) } } - -func TestProxyUpdateDelete(t *testing.T) { - port, err := echoServer(t, "127.0.0.1:0") - if err != nil { - t.Fatal(err) - } - - lb := NewLoadBalancerRR() - lb.OnUpdate([]api.Endpoints{{"echo", []string{net.JoinHostPort("127.0.0.1", port)}}}) - - p := NewProxier(lb) - - proxyPort, err := p.addServiceOnUnusedPort("echo") - if err != nil { - t.Fatalf("error adding new service: %#v", err) - } - conn, err := net.Dial("tcp", net.JoinHostPort("127.0.0.1", proxyPort)) - if err != nil { - t.Fatalf("error connecting to proxy: %v", err) - } - conn.Close() - - p.OnUpdate([]api.Service{}) - time.Sleep(2 * time.Second) - _, err = net.Dial("tcp", net.JoinHostPort("127.0.0.1", proxyPort)) - if err == nil { - t.Fatalf("Unexpected non-error.") - } -} - -func TestProxyUpdatePort(t *testing.T) { - port, err := echoServer(t, "127.0.0.1:0") - if err != nil { - t.Fatal(err) - } - - lb := NewLoadBalancerRR() - lb.OnUpdate([]api.Endpoints{{"echo", []string{net.JoinHostPort("127.0.0.1", port)}}}) - - p := NewProxier(lb) - - proxyPort, err := p.addServiceOnUnusedPort("echo") - if err != nil { - t.Fatalf("error adding new service: %#v", err) - } - conn, err := net.Dial("tcp", net.JoinHostPort("127.0.0.1", proxyPort)) - if err != nil { - t.Fatalf("error connecting to proxy: %v", err) - } - conn.Close() - - // add a new dummy listener in order to get a port that is free - l, _ := net.Listen("tcp", ":0") - _, port, _ = net.SplitHostPort(l.Addr().String()) - portNum, _ := strconv.Atoi(port) - l.Close() - - p.OnUpdate([]api.Service{ - {JSONBase: api.JSONBase{ID: "echo"}, Port: portNum}, - }) - time.Sleep(2 * time.Second) - _, err = net.Dial("tcp", net.JoinHostPort("127.0.0.1", proxyPort)) - if err == nil { - t.Fatalf("Unexpected non-error.") - } - testEchoConnection(t, "127.0.0.1", port) -}