Merge pull request #1037 from thockin/proxy

Fix a race in proxy.
This commit is contained in:
Daniel Smith 2014-08-25 22:10:54 -07:00
commit aeaa3f99fb

View File

@ -144,23 +144,11 @@ func (proxier *Proxier) AcceptHandler(service string, listener net.Listener) {
} }
} }
// addService creates and registers a service proxy for the given service on
// the specified port.
// It returns the net.Listener of the service proxy.
func (proxier *Proxier) addService(service string, port int) (net.Listener, error) {
l, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
return nil, err
}
proxier.addServiceCommon(service, l)
return l, nil
}
// used to globally lock around unused ports. Only used in testing. // used to globally lock around unused ports. Only used in testing.
var unusedPortLock sync.Mutex var unusedPortLock sync.Mutex
// addService starts listening for a new service, returning the port it's using. // addServiceOnUnusedPort starts listening for a new service, returning the
// For testing on a system with unknown ports used. // port it's using. For testing on a system with unknown ports used.
func (proxier *Proxier) addServiceOnUnusedPort(service string) (string, error) { func (proxier *Proxier) addServiceOnUnusedPort(service string) (string, error) {
unusedPortLock.Lock() unusedPortLock.Lock()
defer unusedPortLock.Unlock() defer unusedPortLock.Unlock()
@ -181,11 +169,11 @@ func (proxier *Proxier) addServiceOnUnusedPort(service string) (string, error) {
active: true, active: true,
listener: l, listener: l,
}) })
proxier.addServiceCommon(service, l) proxier.startAccepting(service, l)
return port, nil return port, nil
} }
func (proxier *Proxier) addServiceCommon(service string, l net.Listener) { func (proxier *Proxier) startAccepting(service string, l net.Listener) {
glog.Infof("Listening for %s on %s", service, l.Addr().String()) glog.Infof("Listening for %s on %s", service, l.Addr().String())
go proxier.AcceptHandler(service, l) go proxier.AcceptHandler(service, l)
} }
@ -206,7 +194,7 @@ func (proxier *Proxier) OnUpdate(services []api.Service) {
proxier.StopProxy(service.ID) proxier.StopProxy(service.ID)
} }
glog.Infof("Adding a new service %s on port %d", service.ID, service.Port) glog.Infof("Adding a new service %s on port %d", service.ID, service.Port)
listener, err := proxier.addService(service.ID, service.Port) listener, err := net.Listen("tcp", fmt.Sprintf(":%d", service.Port))
if err != nil { if err != nil {
glog.Infof("Failed to start listening for %s on %d", service.ID, service.Port) glog.Infof("Failed to start listening for %s on %d", service.ID, service.Port)
continue continue
@ -216,6 +204,7 @@ func (proxier *Proxier) OnUpdate(services []api.Service) {
active: true, active: true,
listener: listener, listener: listener,
}) })
proxier.startAccepting(service.ID, listener)
} }
proxier.mu.Lock() proxier.mu.Lock()
defer proxier.mu.Unlock() defer proxier.mu.Unlock()