diff --git a/pkg/proxy/proxier.go b/pkg/proxy/proxier.go index 6180b39d1cf..cad3d3af465 100644 --- a/pkg/proxy/proxier.go +++ b/pkg/proxy/proxier.go @@ -56,8 +56,8 @@ func (si *serviceInfo) setActive(val bool) bool { return tmp } -// How long we wait for a connection to a backend. -const endpointDialTimeout = 5 * time.Second +// How long we wait for a connection to a backend in seconds +var endpointDialTimeout = []time.Duration{1, 2, 4, 8} // Abstraction over TCP/UDP sockets which are proxied. type proxySocket interface { @@ -76,6 +76,26 @@ type tcpProxySocket struct { net.Listener } +func tryConnect(service string, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) { + for _, retryTimeout := range endpointDialTimeout { + endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr) + if err != nil { + glog.Errorf("Couldn't find an endpoint for %s %v", service, err) + return nil, err + } + glog.V(3).Infof("Mapped service %q to endpoint %s", service, endpoint) + // TODO: This could spin up a new goroutine to make the outbound connection, + // and keep accepting inbound traffic. + outConn, err := net.DialTimeout(protocol, endpoint, retryTimeout*time.Second) + if err != nil { + glog.Errorf("Dial failed: %v", err) + continue + } + return outConn, nil + } + return nil, fmt.Errorf("failed to connect to an endpoint.") +} + func (tcp *tcpProxySocket) ProxyLoop(service string, info *serviceInfo, proxier *Proxier) { for { if !info.isActive() { @@ -89,19 +109,9 @@ func (tcp *tcpProxySocket) ProxyLoop(service string, info *serviceInfo, proxier continue } glog.V(2).Infof("Accepted TCP connection from %v to %v", inConn.RemoteAddr(), inConn.LocalAddr()) - endpoint, err := proxier.loadBalancer.NextEndpoint(service, inConn.RemoteAddr()) + outConn, err := tryConnect(service, inConn.(*net.TCPConn).RemoteAddr(), "tcp", proxier) if err != nil { - glog.Errorf("Couldn't find an endpoint for %s %v", service, err) - inConn.Close() - continue - } - glog.V(3).Infof("Mapped service %q to endpoint %s", service, endpoint) - // TODO: This could spin up a new goroutine to make the outbound connection, - // and keep accepting inbound traffic. - outConn, err := net.DialTimeout("tcp", endpoint, endpointDialTimeout) - if err != nil { - // TODO: Try another endpoint? - glog.Errorf("Dial failed: %v", err) + glog.Errorf("Failed to connect to balancer: %v", err) inConn.Close() continue } @@ -209,16 +219,9 @@ func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr ne // TODO: This could spin up a new goroutine to make the outbound connection, // and keep accepting inbound traffic. glog.V(2).Infof("New UDP connection from %s", cliAddr) - endpoint, err := proxier.loadBalancer.NextEndpoint(service, cliAddr) + var err error + svrConn, err = tryConnect(service, cliAddr, "udp", proxier) if err != nil { - glog.Errorf("Couldn't find an endpoint for %s %v", service, err) - return nil, err - } - glog.V(3).Infof("Mapped service %q to endpoint %s", service, endpoint) - svrConn, err = net.DialTimeout("udp", endpoint, endpointDialTimeout) - if err != nil { - // TODO: Try another endpoint? - glog.Errorf("Dial failed: %v", err) return nil, err } activeClients.clients[cliAddr.String()] = svrConn diff --git a/pkg/proxy/proxier_test.go b/pkg/proxy/proxier_test.go index 44d8982f045..2c845e42236 100644 --- a/pkg/proxy/proxier_test.go +++ b/pkg/proxy/proxier_test.go @@ -28,6 +28,7 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/iptables" ) @@ -97,6 +98,9 @@ var tcpServerPort string var udpServerPort string func init() { + // Don't handle panics + util.ReallyCrash = true + // TCP setup. tcp := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK)