From f5526727fb52c735e9befc34dcde55f5a43e7615 Mon Sep 17 00:00:00 2001 From: Solly Ross Date: Thu, 18 Aug 2016 14:11:05 -0400 Subject: [PATCH] Userspace Proxy: Expose ProxySocket utility funcs This commit exposes several utility functions that are valuable for implementing custom ProxySockets. --- pkg/proxy/userspace/proxysocket.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/pkg/proxy/userspace/proxysocket.go b/pkg/proxy/userspace/proxysocket.go index b9da00bce90..a28b6011b6c 100644 --- a/pkg/proxy/userspace/proxysocket.go +++ b/pkg/proxy/userspace/proxysocket.go @@ -73,7 +73,7 @@ func newProxySocket(protocol api.Protocol, ip net.IP, port int) (ProxySocket, er } // How long we wait for a connection to a backend in seconds -var endpointDialTimeout = []time.Duration{250 * time.Millisecond, 500 * time.Millisecond, 1 * time.Second, 2 * time.Second} +var EndpointDialTimeouts = []time.Duration{250 * time.Millisecond, 500 * time.Millisecond, 1 * time.Second, 2 * time.Second} // tcpProxySocket implements ProxySocket. Close() is implemented by net.Listener. When Close() is called, // no new connections are allowed but existing connections are left untouched. @@ -86,9 +86,11 @@ func (tcp *tcpProxySocket) ListenPort() int { return tcp.port } -func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string, loadBalancer LoadBalancer) (out net.Conn, err error) { +// TryConnectEndpoints attempts to connect to the next available endpoint for the given service, cycling +// through until it is able to successully connect, or it has tried with all timeouts in EndpointDialTimeouts. +func TryConnectEndpoints(service proxy.ServicePortName, srcAddr net.Addr, protocol string, loadBalancer LoadBalancer) (out net.Conn, err error) { sessionAffinityReset := false - for _, dialTimeout := range endpointDialTimeout { + for _, dialTimeout := range EndpointDialTimeouts { endpoint, err := loadBalancer.NextEndpoint(service, srcAddr, sessionAffinityReset) if err != nil { glog.Errorf("Couldn't find an endpoint for %s: %v", service, err) @@ -135,19 +137,19 @@ func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *Serv continue } glog.V(3).Infof("Accepted TCP connection from %v to %v", inConn.RemoteAddr(), inConn.LocalAddr()) - outConn, err := tryConnect(service, inConn.(*net.TCPConn).RemoteAddr(), "tcp", loadBalancer) + outConn, err := TryConnectEndpoints(service, inConn.(*net.TCPConn).RemoteAddr(), "tcp", loadBalancer) if err != nil { glog.Errorf("Failed to connect to balancer: %v", err) inConn.Close() continue } // Spin up an async copy loop. - go proxyTCP(inConn.(*net.TCPConn), outConn.(*net.TCPConn)) + go ProxyTCP(inConn.(*net.TCPConn), outConn.(*net.TCPConn)) } } -// proxyTCP proxies data bi-directionally between in and out. -func proxyTCP(in, out *net.TCPConn) { +// ProxyTCP proxies data bi-directionally between in and out. +func ProxyTCP(in, out *net.TCPConn) { var wg sync.WaitGroup wg.Add(2) glog.V(4).Infof("Creating proxy between %v <-> %v <-> %v <-> %v", @@ -251,7 +253,7 @@ func (udp *udpProxySocket) getBackendConn(activeClients *ClientCache, cliAddr ne // and keep accepting inbound traffic. glog.V(3).Infof("New UDP connection from %s", cliAddr) var err error - svrConn, err = tryConnect(service, cliAddr, "udp", loadBalancer) + svrConn, err = TryConnectEndpoints(service, cliAddr, "udp", loadBalancer) if err != nil { return nil, err }