diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 0e487d5da5b..57bd77cd742 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -42,14 +42,18 @@ type portal struct { isExternal bool } -type serviceInfo struct { +// ServiceInfo contains information and state for a particular proxied service +type ServiceInfo struct { + // Timeout is the the read/write timeout (used for UDP connections) + Timeout time.Duration + // ActiveClients is the cache of active UDP clients being proxied by this proxy for this service + ActiveClients *ClientCache + isAliveAtomic int32 // Only access this with atomic ops portal portal protocol api.Protocol proxyPort int - socket proxySocket - timeout time.Duration - activeClients *clientCache + socket ProxySocket nodePort int loadBalancerStatus api.LoadBalancerStatus sessionAffinityType api.ServiceAffinity @@ -58,7 +62,7 @@ type serviceInfo struct { externalIPs []string } -func (info *serviceInfo) setAlive(b bool) { +func (info *ServiceInfo) setAlive(b bool) { var i int32 if b { i = 1 @@ -66,7 +70,7 @@ func (info *serviceInfo) setAlive(b bool) { atomic.StoreInt32(&info.isAliveAtomic, i) } -func (info *serviceInfo) isAlive() bool { +func (info *ServiceInfo) IsAlive() bool { return atomic.LoadInt32(&info.isAliveAtomic) != 0 } @@ -85,7 +89,7 @@ func logTimeout(err error) bool { type Proxier struct { loadBalancer LoadBalancer mu sync.Mutex // protects serviceMap - serviceMap map[proxy.ServicePortName]*serviceInfo + serviceMap map[proxy.ServicePortName]*ServiceInfo syncPeriod time.Duration minSyncPeriod time.Duration // unused atm, but plumbed through udpIdleTimeout time.Duration @@ -177,7 +181,7 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables } return &Proxier{ loadBalancer: loadBalancer, - serviceMap: make(map[proxy.ServicePortName]*serviceInfo), + serviceMap: make(map[proxy.ServicePortName]*ServiceInfo), portMap: make(map[portMapKey]*portMapValue), syncPeriod: syncPeriod, // plumbed through if needed, not used atm. @@ -301,14 +305,14 @@ func (proxier *Proxier) cleanupStaleStickySessions() { } // This assumes proxier.mu is not locked. -func (proxier *Proxier) stopProxy(service proxy.ServicePortName, info *serviceInfo) error { +func (proxier *Proxier) stopProxy(service proxy.ServicePortName, info *ServiceInfo) error { proxier.mu.Lock() defer proxier.mu.Unlock() return proxier.stopProxyInternal(service, info) } // This assumes proxier.mu is locked. -func (proxier *Proxier) stopProxyInternal(service proxy.ServicePortName, info *serviceInfo) error { +func (proxier *Proxier) stopProxyInternal(service proxy.ServicePortName, info *ServiceInfo) error { delete(proxier.serviceMap, service) info.setAlive(false) err := info.socket.Close() @@ -317,23 +321,23 @@ func (proxier *Proxier) stopProxyInternal(service proxy.ServicePortName, info *s return err } -func (proxier *Proxier) getServiceInfo(service proxy.ServicePortName) (*serviceInfo, bool) { +func (proxier *Proxier) getServiceInfo(service proxy.ServicePortName) (*ServiceInfo, bool) { proxier.mu.Lock() defer proxier.mu.Unlock() info, ok := proxier.serviceMap[service] return info, ok } -func (proxier *Proxier) setServiceInfo(service proxy.ServicePortName, info *serviceInfo) { +func (proxier *Proxier) setServiceInfo(service proxy.ServicePortName, info *ServiceInfo) { proxier.mu.Lock() defer proxier.mu.Unlock() proxier.serviceMap[service] = info } -// addServiceOnPort starts listening for a new service, returning the serviceInfo. +// addServiceOnPort starts listening for a new service, returning the ServiceInfo. // Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP // connections, for now. -func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*serviceInfo, error) { +func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) { sock, err := newProxySocket(protocol, proxier.listenIP, proxyPort) if err != nil { return nil, err @@ -348,13 +352,14 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol sock.Close() return nil, err } - si := &serviceInfo{ + si := &ServiceInfo{ + Timeout: timeout, + ActiveClients: newClientCache(), + isAliveAtomic: 1, proxyPort: portNum, protocol: protocol, socket: sock, - timeout: timeout, - activeClients: newClientCache(), sessionAffinityType: api.ServiceAffinityNone, // default stickyMaxAgeMinutes: 180, // TODO: parameterize this in the API. } @@ -364,7 +369,7 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol go func(service proxy.ServicePortName, proxier *Proxier) { defer runtime.HandleCrash() atomic.AddInt32(&proxier.numProxyLoops, 1) - sock.ProxyLoop(service, si, proxier) + sock.ProxyLoop(service, si, proxier.loadBalancer) atomic.AddInt32(&proxier.numProxyLoops, -1) }(service, proxier) @@ -455,7 +460,7 @@ func (proxier *Proxier) OnServiceUpdate(services []api.Service) { } } -func sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool { +func sameConfig(info *ServiceInfo, service *api.Service, port *api.ServicePort) bool { if info.protocol != port.Protocol || info.portal.port != int(port.Port) || info.nodePort != int(port.NodePort) { return false } @@ -486,7 +491,7 @@ func ipsEqual(lhs, rhs []string) bool { return true } -func (proxier *Proxier) openPortal(service proxy.ServicePortName, info *serviceInfo) error { +func (proxier *Proxier) openPortal(service proxy.ServicePortName, info *ServiceInfo) error { err := proxier.openOnePortal(info.portal, info.protocol, proxier.listenIP, info.proxyPort, service) if err != nil { return err @@ -668,7 +673,7 @@ func (proxier *Proxier) openNodePort(nodePort int, protocol api.Protocol, proxyI return nil } -func (proxier *Proxier) closePortal(service proxy.ServicePortName, info *serviceInfo) error { +func (proxier *Proxier) closePortal(service proxy.ServicePortName, info *ServiceInfo) error { // Collect errors and report them all at the end. el := proxier.closeOnePortal(info.portal, info.protocol, proxier.listenIP, info.proxyPort, service) for _, publicIP := range info.externalIPs { diff --git a/pkg/proxy/userspace/proxier_test.go b/pkg/proxy/userspace/proxier_test.go index 6650a3d0e47..8a6994fc620 100644 --- a/pkg/proxy/userspace/proxier_test.go +++ b/pkg/proxy/userspace/proxier_test.go @@ -182,14 +182,14 @@ func waitForNumProxyLoops(t *testing.T, p *Proxier, want int32) { t.Errorf("expected %d ProxyLoops running, got %d", want, got) } -func waitForNumProxyClients(t *testing.T, s *serviceInfo, want int, timeout time.Duration) { +func waitForNumProxyClients(t *testing.T, s *ServiceInfo, want int, timeout time.Duration) { var got int now := time.Now() deadline := now.Add(timeout) for time.Now().Before(deadline) { - s.activeClients.mu.Lock() - got = len(s.activeClients.clients) - s.activeClients.mu.Unlock() + s.ActiveClients.Mu.Lock() + got = len(s.ActiveClients.Clients) + s.ActiveClients.Mu.Unlock() if got == want { return } @@ -401,8 +401,8 @@ func TestTCPProxyStop(t *testing.T) { if err != nil { t.Fatalf("error adding new service: %#v", err) } - if !svcInfo.isAlive() { - t.Fatalf("wrong value for isAlive(): expected true") + if !svcInfo.IsAlive() { + t.Fatalf("wrong value for IsAlive(): expected true") } conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort)) if err != nil { @@ -412,8 +412,8 @@ func TestTCPProxyStop(t *testing.T) { waitForNumProxyLoops(t, p, 1) stopProxyByName(p, service) - if svcInfo.isAlive() { - t.Fatalf("wrong value for isAlive(): expected false") + if svcInfo.IsAlive() { + t.Fatalf("wrong value for IsAlive(): expected false") } // Wait for the port to really close. if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { diff --git a/pkg/proxy/userspace/proxysocket.go b/pkg/proxy/userspace/proxysocket.go index 2bc346faa39..b9da00bce90 100644 --- a/pkg/proxy/userspace/proxysocket.go +++ b/pkg/proxy/userspace/proxysocket.go @@ -32,20 +32,20 @@ import ( ) // Abstraction over TCP/UDP sockets which are proxied. -type proxySocket interface { - // Addr gets the net.Addr for a proxySocket. +type ProxySocket interface { + // Addr gets the net.Addr for a ProxySocket. Addr() net.Addr - // Close stops the proxySocket from accepting incoming connections. + // Close stops the ProxySocket from accepting incoming connections. // Each implementation should comment on the impact of calling Close // while sessions are active. Close() error // ProxyLoop proxies incoming connections for the specified service to the service endpoints. - ProxyLoop(service proxy.ServicePortName, info *serviceInfo, proxier *Proxier) - // ListenPort returns the host port that the proxySocket is listening on + ProxyLoop(service proxy.ServicePortName, info *ServiceInfo, loadBalancer LoadBalancer) + // ListenPort returns the host port that the ProxySocket is listening on ListenPort() int } -func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, error) { +func newProxySocket(protocol api.Protocol, ip net.IP, port int) (ProxySocket, error) { host := "" if ip != nil { host = ip.String() @@ -75,7 +75,7 @@ func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, er // How long we wait for a connection to a backend in seconds var endpointDialTimeout = []time.Duration{250 * time.Millisecond, 500 * time.Millisecond, 1 * time.Second, 2 * time.Second} -// tcpProxySocket implements proxySocket. Close() is implemented by net.Listener. When Close() is called, +// tcpProxySocket implements ProxySocket. Close() is implemented by net.Listener. When Close() is called, // no new connections are allowed but existing connections are left untouched. type tcpProxySocket struct { net.Listener @@ -86,10 +86,10 @@ func (tcp *tcpProxySocket) ListenPort() int { return tcp.port } -func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) { +func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string, loadBalancer LoadBalancer) (out net.Conn, err error) { sessionAffinityReset := false for _, dialTimeout := range endpointDialTimeout { - endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr, sessionAffinityReset) + endpoint, err := loadBalancer.NextEndpoint(service, srcAddr, sessionAffinityReset) if err != nil { glog.Errorf("Couldn't find an endpoint for %s: %v", service, err) return nil, err @@ -111,9 +111,9 @@ func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string return nil, fmt.Errorf("failed to connect to an endpoint.") } -func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) { +func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *ServiceInfo, loadBalancer LoadBalancer) { for { - if !myInfo.isAlive() { + if !myInfo.IsAlive() { // The service port was closed or replaced. return } @@ -127,7 +127,7 @@ func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serv if isClosedError(err) { return } - if !myInfo.isAlive() { + if !myInfo.IsAlive() { // Then the service port was just closed so the accept failure is to be expected. return } @@ -135,7 +135,7 @@ func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serv continue } glog.V(3).Infof("Accepted TCP connection from %v to %v", inConn.RemoteAddr(), inConn.LocalAddr()) - outConn, err := tryConnect(service, inConn.(*net.TCPConn).RemoteAddr(), "tcp", proxier) + outConn, err := tryConnect(service, inConn.(*net.TCPConn).RemoteAddr(), "tcp", loadBalancer) if err != nil { glog.Errorf("Failed to connect to balancer: %v", err) inConn.Close() @@ -171,7 +171,7 @@ func copyBytes(direction string, dest, src *net.TCPConn, wg *sync.WaitGroup) { src.Close() } -// udpProxySocket implements proxySocket. Close() is implemented by net.UDPConn. When Close() is called, +// udpProxySocket implements ProxySocket. Close() is implemented by net.UDPConn. When Close() is called, // no new connections are allowed and existing connections are broken. // TODO: We could lame-duck this ourselves, if it becomes important. type udpProxySocket struct { @@ -188,19 +188,19 @@ func (udp *udpProxySocket) Addr() net.Addr { } // Holds all the known UDP clients that have not timed out. -type clientCache struct { - mu sync.Mutex - clients map[string]net.Conn // addr string -> connection +type ClientCache struct { + Mu sync.Mutex + Clients map[string]net.Conn // addr string -> connection } -func newClientCache() *clientCache { - return &clientCache{clients: map[string]net.Conn{}} +func newClientCache() *ClientCache { + return &ClientCache{Clients: map[string]net.Conn{}} } -func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) { +func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *ServiceInfo, loadBalancer LoadBalancer) { var buffer [4096]byte // 4KiB should be enough for most whole-packets for { - if !myInfo.isAlive() { + if !myInfo.IsAlive() { // The service port was closed or replaced. break } @@ -219,7 +219,7 @@ func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serv break } // If this is a client we know already, reuse the connection and goroutine. - svrConn, err := udp.getBackendConn(myInfo.activeClients, cliAddr, proxier, service, myInfo.timeout) + svrConn, err := udp.getBackendConn(myInfo.ActiveClients, cliAddr, loadBalancer, service, myInfo.Timeout) if err != nil { continue } @@ -233,7 +233,7 @@ func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serv } continue } - err = svrConn.SetDeadline(time.Now().Add(myInfo.timeout)) + err = svrConn.SetDeadline(time.Now().Add(myInfo.Timeout)) if err != nil { glog.Errorf("SetDeadline failed: %v", err) continue @@ -241,17 +241,17 @@ func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serv } } -func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service proxy.ServicePortName, timeout time.Duration) (net.Conn, error) { - activeClients.mu.Lock() - defer activeClients.mu.Unlock() +func (udp *udpProxySocket) getBackendConn(activeClients *ClientCache, cliAddr net.Addr, loadBalancer LoadBalancer, service proxy.ServicePortName, timeout time.Duration) (net.Conn, error) { + activeClients.Mu.Lock() + defer activeClients.Mu.Unlock() - svrConn, found := activeClients.clients[cliAddr.String()] + svrConn, found := activeClients.Clients[cliAddr.String()] if !found { // TODO: This could spin up a new goroutine to make the outbound connection, // and keep accepting inbound traffic. glog.V(3).Infof("New UDP connection from %s", cliAddr) var err error - svrConn, err = tryConnect(service, cliAddr, "udp", proxier) + svrConn, err = tryConnect(service, cliAddr, "udp", loadBalancer) if err != nil { return nil, err } @@ -259,8 +259,8 @@ func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr ne glog.Errorf("SetDeadline failed: %v", err) return nil, err } - activeClients.clients[cliAddr.String()] = svrConn - go func(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) { + activeClients.Clients[cliAddr.String()] = svrConn + go func(cliAddr net.Addr, svrConn net.Conn, activeClients *ClientCache, timeout time.Duration) { defer runtime.HandleCrash() udp.proxyClient(cliAddr, svrConn, activeClients, timeout) }(cliAddr, svrConn, activeClients, timeout) @@ -270,7 +270,7 @@ func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr ne // This function is expected to be called as a goroutine. // TODO: Track and log bytes copied, like TCP -func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) { +func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activeClients *ClientCache, timeout time.Duration) { defer svrConn.Close() var buffer [4096]byte for { @@ -294,7 +294,7 @@ func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activ break } } - activeClients.mu.Lock() - delete(activeClients.clients, cliAddr.String()) - activeClients.mu.Unlock() + activeClients.Mu.Lock() + delete(activeClients.Clients, cliAddr.String()) + activeClients.Mu.Unlock() }