Merge pull request #2697 from erictune/no_accept_err

Proxy: Avoid log error.  Simplify locking.
This commit is contained in:
Daniel Smith 2014-12-03 14:06:17 -08:00
commit 6cb26e17a4

View File

@ -38,23 +38,8 @@ type serviceInfo struct {
proxyPort int proxyPort int
socket proxySocket socket proxySocket
timeout time.Duration timeout time.Duration
mu sync.Mutex // protects active // TODO: make this an net.IP address
active bool publicIP []string
publicIP []string // TODO: make this a []net.IP
}
func (si *serviceInfo) isActive() bool {
si.mu.Lock()
defer si.mu.Unlock()
return si.active
}
func (si *serviceInfo) setActive(val bool) bool {
si.mu.Lock()
defer si.mu.Unlock()
tmp := si.active
si.active = val
return tmp
} }
// How long we wait for a connection to a backend in seconds // How long we wait for a connection to a backend in seconds
@ -68,7 +53,7 @@ type proxySocket interface {
// on the impact of calling Close while sessions are active. // on the impact of calling Close while sessions are active.
Close() error Close() error
// ProxyLoop proxies incoming connections for the specified service to the service endpoints. // ProxyLoop proxies incoming connections for the specified service to the service endpoints.
ProxyLoop(service string, info *serviceInfo, proxier *Proxier) ProxyLoop(service string, proxier *Proxier)
} }
// tcpProxySocket implements proxySocket. Close() is implemented by net.Listener. When Close() is called, // tcpProxySocket implements proxySocket. Close() is implemented by net.Listener. When Close() is called,
@ -97,15 +82,21 @@ func tryConnect(service string, srcAddr net.Addr, protocol string, proxier *Prox
return nil, fmt.Errorf("failed to connect to an endpoint.") return nil, fmt.Errorf("failed to connect to an endpoint.")
} }
func (tcp *tcpProxySocket) ProxyLoop(service string, info *serviceInfo, proxier *Proxier) { func (tcp *tcpProxySocket) ProxyLoop(service string, proxier *Proxier) {
for { for {
if !info.isActive() { _, exists := proxier.getServiceInfo(service)
if !exists {
break break
} }
// Block until a connection is made. // Block until a connection is made.
inConn, err := tcp.Accept() inConn, err := tcp.Accept()
if err != nil { if err != nil {
_, exists := proxier.getServiceInfo(service)
if !exists {
// Then the service port was just closed so the accept failure is to be expected.
return
}
glog.Errorf("Accept failed: %v", err) glog.Errorf("Accept failed: %v", err)
continue continue
} }
@ -167,11 +158,12 @@ func newClientCache() *clientCache {
return &clientCache{clients: map[string]net.Conn{}} return &clientCache{clients: map[string]net.Conn{}}
} }
func (udp *udpProxySocket) ProxyLoop(service string, info *serviceInfo, proxier *Proxier) { func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) {
activeClients := newClientCache() activeClients := newClientCache()
var buffer [4096]byte // 4KiB should be enough for most whole-packets var buffer [4096]byte // 4KiB should be enough for most whole-packets
for { for {
if !info.isActive() { info, exists := proxier.getServiceInfo(service)
if !exists {
break break
} }
@ -375,9 +367,6 @@ func (proxier *Proxier) stopProxy(service string, info *serviceInfo) error {
// This assumes proxier.mu is locked. // This assumes proxier.mu is locked.
func (proxier *Proxier) stopProxyInternal(service string, info *serviceInfo) error { func (proxier *Proxier) stopProxyInternal(service string, info *serviceInfo) error {
if !info.setActive(false) {
return nil
}
delete(proxier.serviceMap, service) delete(proxier.serviceMap, service)
return info.socket.Close() return info.socket.Close()
} }
@ -416,17 +405,16 @@ func (proxier *Proxier) addServiceOnPort(service string, protocol api.Protocol,
si := &serviceInfo{ si := &serviceInfo{
proxyPort: portNum, proxyPort: portNum,
protocol: protocol, protocol: protocol,
active: true,
socket: sock, socket: sock,
timeout: timeout, timeout: timeout,
} }
proxier.setServiceInfo(service, si) proxier.setServiceInfo(service, si)
glog.V(1).Infof("Proxying for service %q on %s port %d", service, protocol, portNum) glog.V(1).Infof("Proxying for service %q on %s port %d", service, protocol, portNum)
go func(service string, info *serviceInfo, proxier *Proxier) { go func(service string, proxier *Proxier) {
defer util.HandleCrash() defer util.HandleCrash()
sock.ProxyLoop(service, info, proxier) sock.ProxyLoop(service, proxier)
}(service, si, proxier) }(service, proxier)
return si, nil return si, nil
} }
@ -445,7 +433,7 @@ func (proxier *Proxier) OnUpdate(services []api.Service) {
info, exists := proxier.getServiceInfo(service.Name) info, exists := proxier.getServiceInfo(service.Name)
serviceIP := net.ParseIP(service.Spec.PortalIP) serviceIP := net.ParseIP(service.Spec.PortalIP)
// TODO: check health of the socket? What if ProxyLoop exited? // TODO: check health of the socket? What if ProxyLoop exited?
if exists && info.isActive() && info.portalPort == service.Spec.Port && info.portalIP.Equal(serviceIP) { if exists && info.portalPort == service.Spec.Port && info.portalIP.Equal(serviceIP) {
continue continue
} }
if exists && (info.portalPort != service.Spec.Port || !info.portalIP.Equal(serviceIP) || !ipsEqual(service.Spec.PublicIPs, info.publicIP)) { if exists && (info.portalPort != service.Spec.Port || !info.portalIP.Equal(serviceIP) || !ipsEqual(service.Spec.PublicIPs, info.publicIP)) {