Move UDP backend setup to a function

This commit is contained in:
Tim Hockin 2014-09-11 16:08:25 -07:00
parent 86d12681f2
commit 1e50f118fd

View File

@ -40,6 +40,20 @@ type serviceInfo struct {
active bool
}
func (si *serviceInfo) isActive() bool {
si.mu.Lock()
defer si.mu.Unlock()
return si.active
}
func (si *serviceInfo) setActive(val bool) bool {
si.mu.Lock()
defer si.mu.Unlock()
tmp := si.active
si.active = val
return tmp
}
// How long we wait for a connection to a backend.
const endpointDialTimeout = 5 * time.Second
@ -67,12 +81,9 @@ func (tcp *tcpProxySocket) ProxyLoop(service string, proxier *Proxier) {
return
}
for {
info.mu.Lock()
if !info.active {
info.mu.Unlock()
if !info.isActive() {
break
}
info.mu.Unlock()
// Block until a connection is made.
inConn, err := tcp.Accept()
@ -140,12 +151,9 @@ func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) {
activeClients := newClientCache()
var buffer [4096]byte // 4KiB should be enough for most whole-packets
for {
info.mu.Lock()
if !info.active {
info.mu.Unlock()
if !info.isActive() {
break
}
info.mu.Unlock()
// Block until data arrives.
// TODO: Accumulate a histogram of n or something, to fine tune the buffer size.
@ -161,30 +169,10 @@ func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) {
break
}
// If this is a client we know already, reuse the connection and goroutine.
activeClients.mu.Lock()
svrConn, found := activeClients.clients[cliAddr.String()]
if !found {
// TODO: This could spin up a new goroutine to make the outbound connection,
// and keep accepting inbound traffic.
glog.Infof("New UDP connection from %s", cliAddr)
endpoint, err := proxier.loadBalancer.NextEndpoint(service, cliAddr)
svrConn, err := udp.getBackendConn(activeClients, cliAddr, proxier, service, info.timeout)
if err != nil {
glog.Errorf("Couldn't find an endpoint for %s %v", service, err)
activeClients.mu.Unlock()
continue
}
glog.Infof("Mapped service %s to endpoint %s", service, endpoint)
svrConn, err = net.DialTimeout("udp", endpoint, endpointDialTimeout)
if err != nil {
// TODO: Try another endpoint?
glog.Errorf("Dial failed: %v", err)
activeClients.mu.Unlock()
continue
}
activeClients.clients[cliAddr.String()] = svrConn
go udp.proxyClient(cliAddr, svrConn, activeClients, info.timeout)
}
activeClients.mu.Unlock()
// TODO: It would be nice to let the goroutine handle this write, but we don't
// really want to copy the buffer. We could do a pool of buffers or something.
_, err = svrConn.Write(buffer[0:n])
@ -203,6 +191,33 @@ func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) {
}
}
func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service string, timeout time.Duration) (net.Conn, error) {
activeClients.mu.Lock()
defer activeClients.mu.Unlock()
svrConn, found := activeClients.clients[cliAddr.String()]
if !found {
// TODO: This could spin up a new goroutine to make the outbound connection,
// and keep accepting inbound traffic.
glog.Infof("New UDP connection from %s", cliAddr)
endpoint, err := proxier.loadBalancer.NextEndpoint(service, cliAddr)
if err != nil {
glog.Errorf("Couldn't find an endpoint for %s %v", service, err)
return nil, err
}
glog.Infof("Mapped service %s to endpoint %s", service, endpoint)
svrConn, err = net.DialTimeout("udp", endpoint, endpointDialTimeout)
if err != nil {
// TODO: Try another endpoint?
glog.Errorf("Dial failed: %v", err)
return nil, err
}
activeClients.clients[cliAddr.String()] = svrConn
go udp.proxyClient(cliAddr, svrConn, activeClients, timeout)
}
return svrConn, nil
}
// This function is expected to be called as a goroutine.
func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) {
defer svrConn.Close()
@ -305,9 +320,7 @@ func (proxier *Proxier) StopProxy(service string) error {
}
func (proxier *Proxier) stopProxyInternal(info *serviceInfo) error {
info.mu.Lock()
defer info.mu.Unlock()
if !info.active {
if !info.setActive(false) {
return nil
}
glog.Infof("Removing service: %s", info.name)
@ -378,7 +391,7 @@ func (proxier *Proxier) OnUpdate(services []api.Service) {
activeServices.Insert(service.ID)
info, exists := proxier.getServiceInfo(service.ID)
// TODO: check health of the socket? What if ProxyLoop exited?
if exists && info.active && info.port == service.Port {
if exists && info.isActive() && info.port == service.Port {
continue
}
if exists && info.port != service.Port {