Proxy: Avoid log error. Simplify locking.

Don't log an error when Accept failed because the interface (portal)
was just removed.

Don't pass around a pointer to a serviceInfo since another thread
deletes those.  Instead, just check if service name is still in the
service map.

Delete the locking on the serviceInfo object since it is only used
by the "main" proxier thread.
This commit is contained in:
Eric Tune 2014-12-01 15:16:53 -08:00
parent 5c31fedfc8
commit 909f82f463

View File

@ -38,23 +38,8 @@ type serviceInfo struct {
proxyPort int proxyPort int
socket proxySocket socket proxySocket
timeout time.Duration timeout time.Duration
mu sync.Mutex // protects active // TODO: make this an net.IP address
active bool publicIP []string
publicIP []string // TODO: make this a []net.IP
}
func (si *serviceInfo) isActive() bool {
si.mu.Lock()
defer si.mu.Unlock()
return si.active
}
func (si *serviceInfo) setActive(val bool) bool {
si.mu.Lock()
defer si.mu.Unlock()
tmp := si.active
si.active = val
return tmp
} }
// How long we wait for a connection to a backend in seconds // How long we wait for a connection to a backend in seconds
@ -68,7 +53,7 @@ type proxySocket interface {
// on the impact of calling Close while sessions are active. // on the impact of calling Close while sessions are active.
Close() error Close() error
// ProxyLoop proxies incoming connections for the specified service to the service endpoints. // ProxyLoop proxies incoming connections for the specified service to the service endpoints.
ProxyLoop(service string, info *serviceInfo, proxier *Proxier) ProxyLoop(service string, proxier *Proxier)
} }
// tcpProxySocket implements proxySocket. Close() is implemented by net.Listener. When Close() is called, // tcpProxySocket implements proxySocket. Close() is implemented by net.Listener. When Close() is called,
@ -97,15 +82,21 @@ func tryConnect(service string, srcAddr net.Addr, protocol string, proxier *Prox
return nil, fmt.Errorf("failed to connect to an endpoint.") return nil, fmt.Errorf("failed to connect to an endpoint.")
} }
func (tcp *tcpProxySocket) ProxyLoop(service string, info *serviceInfo, proxier *Proxier) { func (tcp *tcpProxySocket) ProxyLoop(service string, proxier *Proxier) {
for { for {
if !info.isActive() { _, exists := proxier.getServiceInfo(service)
if !exists {
break break
} }
// Block until a connection is made. // Block until a connection is made.
inConn, err := tcp.Accept() inConn, err := tcp.Accept()
if err != nil { if err != nil {
_, exists := proxier.getServiceInfo(service)
if !exists {
// Then the service port was just closed so the accept failure is to be expected.
return
}
glog.Errorf("Accept failed: %v", err) glog.Errorf("Accept failed: %v", err)
continue continue
} }
@ -167,11 +158,12 @@ func newClientCache() *clientCache {
return &clientCache{clients: map[string]net.Conn{}} return &clientCache{clients: map[string]net.Conn{}}
} }
func (udp *udpProxySocket) ProxyLoop(service string, info *serviceInfo, proxier *Proxier) { func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) {
activeClients := newClientCache() activeClients := newClientCache()
var buffer [4096]byte // 4KiB should be enough for most whole-packets var buffer [4096]byte // 4KiB should be enough for most whole-packets
for { for {
if !info.isActive() { info, exists := proxier.getServiceInfo(service)
if !exists {
break break
} }
@ -375,9 +367,6 @@ func (proxier *Proxier) stopProxy(service string, info *serviceInfo) error {
// This assumes proxier.mu is locked. // This assumes proxier.mu is locked.
func (proxier *Proxier) stopProxyInternal(service string, info *serviceInfo) error { func (proxier *Proxier) stopProxyInternal(service string, info *serviceInfo) error {
if !info.setActive(false) {
return nil
}
delete(proxier.serviceMap, service) delete(proxier.serviceMap, service)
return info.socket.Close() return info.socket.Close()
} }
@ -416,17 +405,16 @@ func (proxier *Proxier) addServiceOnPort(service string, protocol api.Protocol,
si := &serviceInfo{ si := &serviceInfo{
proxyPort: portNum, proxyPort: portNum,
protocol: protocol, protocol: protocol,
active: true,
socket: sock, socket: sock,
timeout: timeout, timeout: timeout,
} }
proxier.setServiceInfo(service, si) proxier.setServiceInfo(service, si)
glog.V(1).Infof("Proxying for service %q on %s port %d", service, protocol, portNum) glog.V(1).Infof("Proxying for service %q on %s port %d", service, protocol, portNum)
go func(service string, info *serviceInfo, proxier *Proxier) { go func(service string, proxier *Proxier) {
defer util.HandleCrash() defer util.HandleCrash()
sock.ProxyLoop(service, info, proxier) sock.ProxyLoop(service, proxier)
}(service, si, proxier) }(service, proxier)
return si, nil return si, nil
} }
@ -445,7 +433,7 @@ func (proxier *Proxier) OnUpdate(services []api.Service) {
info, exists := proxier.getServiceInfo(service.Name) info, exists := proxier.getServiceInfo(service.Name)
serviceIP := net.ParseIP(service.Spec.PortalIP) serviceIP := net.ParseIP(service.Spec.PortalIP)
// TODO: check health of the socket? What if ProxyLoop exited? // TODO: check health of the socket? What if ProxyLoop exited?
if exists && info.isActive() && info.portalPort == service.Spec.Port && info.portalIP.Equal(serviceIP) { if exists && info.portalPort == service.Spec.Port && info.portalIP.Equal(serviceIP) {
continue continue
} }
if exists && (info.portalPort != service.Spec.Port || !info.portalIP.Equal(serviceIP) || !ipsEqual(service.Spec.PublicIPs, info.publicIP)) { if exists && (info.portalPort != service.Spec.Port || !info.portalIP.Equal(serviceIP) || !ipsEqual(service.Spec.PublicIPs, info.publicIP)) {