diff --git a/pkg/proxy/proxier.go b/pkg/proxy/proxier.go index a2a4f5763b9..80d293522d9 100644 --- a/pkg/proxy/proxier.go +++ b/pkg/proxy/proxier.go @@ -38,23 +38,8 @@ type serviceInfo struct { proxyPort int socket proxySocket timeout time.Duration - mu sync.Mutex // protects active - active bool - publicIP []string // TODO: make this a []net.IP -} - -func (si *serviceInfo) isActive() bool { - si.mu.Lock() - defer si.mu.Unlock() - return si.active -} - -func (si *serviceInfo) setActive(val bool) bool { - si.mu.Lock() - defer si.mu.Unlock() - tmp := si.active - si.active = val - return tmp + // TODO: make this an net.IP address + publicIP []string } // How long we wait for a connection to a backend in seconds @@ -68,7 +53,7 @@ type proxySocket interface { // on the impact of calling Close while sessions are active. Close() error // ProxyLoop proxies incoming connections for the specified service to the service endpoints. - ProxyLoop(service string, info *serviceInfo, proxier *Proxier) + ProxyLoop(service string, proxier *Proxier) } // tcpProxySocket implements proxySocket. Close() is implemented by net.Listener. When Close() is called, @@ -97,15 +82,21 @@ func tryConnect(service string, srcAddr net.Addr, protocol string, proxier *Prox return nil, fmt.Errorf("failed to connect to an endpoint.") } -func (tcp *tcpProxySocket) ProxyLoop(service string, info *serviceInfo, proxier *Proxier) { +func (tcp *tcpProxySocket) ProxyLoop(service string, proxier *Proxier) { for { - if !info.isActive() { + _, exists := proxier.getServiceInfo(service) + if !exists { break } // Block until a connection is made. inConn, err := tcp.Accept() if err != nil { + _, exists := proxier.getServiceInfo(service) + if !exists { + // Then the service port was just closed so the accept failure is to be expected. + return + } glog.Errorf("Accept failed: %v", err) continue } @@ -167,11 +158,12 @@ func newClientCache() *clientCache { return &clientCache{clients: map[string]net.Conn{}} } -func (udp *udpProxySocket) ProxyLoop(service string, info *serviceInfo, proxier *Proxier) { +func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) { activeClients := newClientCache() var buffer [4096]byte // 4KiB should be enough for most whole-packets for { - if !info.isActive() { + info, exists := proxier.getServiceInfo(service) + if !exists { break } @@ -375,9 +367,6 @@ func (proxier *Proxier) stopProxy(service string, info *serviceInfo) error { // This assumes proxier.mu is locked. func (proxier *Proxier) stopProxyInternal(service string, info *serviceInfo) error { - if !info.setActive(false) { - return nil - } delete(proxier.serviceMap, service) return info.socket.Close() } @@ -416,17 +405,16 @@ func (proxier *Proxier) addServiceOnPort(service string, protocol api.Protocol, si := &serviceInfo{ proxyPort: portNum, protocol: protocol, - active: true, socket: sock, timeout: timeout, } proxier.setServiceInfo(service, si) glog.V(1).Infof("Proxying for service %q on %s port %d", service, protocol, portNum) - go func(service string, info *serviceInfo, proxier *Proxier) { + go func(service string, proxier *Proxier) { defer util.HandleCrash() - sock.ProxyLoop(service, info, proxier) - }(service, si, proxier) + sock.ProxyLoop(service, proxier) + }(service, proxier) return si, nil } @@ -445,7 +433,7 @@ func (proxier *Proxier) OnUpdate(services []api.Service) { info, exists := proxier.getServiceInfo(service.Name) serviceIP := net.ParseIP(service.Spec.PortalIP) // TODO: check health of the socket? What if ProxyLoop exited? - if exists && info.isActive() && info.portalPort == service.Spec.Port && info.portalIP.Equal(serviceIP) { + if exists && info.portalPort == service.Spec.Port && info.portalIP.Equal(serviceIP) { continue } if exists && (info.portalPort != service.Spec.Port || !info.portalIP.Equal(serviceIP) || !ipsEqual(service.Spec.PublicIPs, info.publicIP)) {