mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 01:40:07 +00:00
Merge pull request #2281 from brendandburns/proxy
Add some retry to the service proxy.
This commit is contained in:
commit
787c221fd6
@ -56,8 +56,8 @@ func (si *serviceInfo) setActive(val bool) bool {
|
|||||||
return tmp
|
return tmp
|
||||||
}
|
}
|
||||||
|
|
||||||
// How long we wait for a connection to a backend.
|
// How long we wait for a connection to a backend in seconds
|
||||||
const endpointDialTimeout = 5 * time.Second
|
var endpointDialTimeout = []time.Duration{1, 2, 4, 8}
|
||||||
|
|
||||||
// Abstraction over TCP/UDP sockets which are proxied.
|
// Abstraction over TCP/UDP sockets which are proxied.
|
||||||
type proxySocket interface {
|
type proxySocket interface {
|
||||||
@ -76,6 +76,26 @@ type tcpProxySocket struct {
|
|||||||
net.Listener
|
net.Listener
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func tryConnect(service string, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) {
|
||||||
|
for _, retryTimeout := range endpointDialTimeout {
|
||||||
|
endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Couldn't find an endpoint for %s %v", service, err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
glog.V(3).Infof("Mapped service %q to endpoint %s", service, endpoint)
|
||||||
|
// TODO: This could spin up a new goroutine to make the outbound connection,
|
||||||
|
// and keep accepting inbound traffic.
|
||||||
|
outConn, err := net.DialTimeout(protocol, endpoint, retryTimeout*time.Second)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Dial failed: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return outConn, nil
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("failed to connect to an endpoint.")
|
||||||
|
}
|
||||||
|
|
||||||
func (tcp *tcpProxySocket) ProxyLoop(service string, info *serviceInfo, proxier *Proxier) {
|
func (tcp *tcpProxySocket) ProxyLoop(service string, info *serviceInfo, proxier *Proxier) {
|
||||||
for {
|
for {
|
||||||
if !info.isActive() {
|
if !info.isActive() {
|
||||||
@ -89,19 +109,9 @@ func (tcp *tcpProxySocket) ProxyLoop(service string, info *serviceInfo, proxier
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
glog.V(2).Infof("Accepted TCP connection from %v to %v", inConn.RemoteAddr(), inConn.LocalAddr())
|
glog.V(2).Infof("Accepted TCP connection from %v to %v", inConn.RemoteAddr(), inConn.LocalAddr())
|
||||||
endpoint, err := proxier.loadBalancer.NextEndpoint(service, inConn.RemoteAddr())
|
outConn, err := tryConnect(service, inConn.(*net.TCPConn).RemoteAddr(), "tcp", proxier)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Couldn't find an endpoint for %s %v", service, err)
|
glog.Errorf("Failed to connect to balancer: %v", err)
|
||||||
inConn.Close()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
glog.V(3).Infof("Mapped service %q to endpoint %s", service, endpoint)
|
|
||||||
// TODO: This could spin up a new goroutine to make the outbound connection,
|
|
||||||
// and keep accepting inbound traffic.
|
|
||||||
outConn, err := net.DialTimeout("tcp", endpoint, endpointDialTimeout)
|
|
||||||
if err != nil {
|
|
||||||
// TODO: Try another endpoint?
|
|
||||||
glog.Errorf("Dial failed: %v", err)
|
|
||||||
inConn.Close()
|
inConn.Close()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -209,16 +219,9 @@ func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr ne
|
|||||||
// TODO: This could spin up a new goroutine to make the outbound connection,
|
// TODO: This could spin up a new goroutine to make the outbound connection,
|
||||||
// and keep accepting inbound traffic.
|
// and keep accepting inbound traffic.
|
||||||
glog.V(2).Infof("New UDP connection from %s", cliAddr)
|
glog.V(2).Infof("New UDP connection from %s", cliAddr)
|
||||||
endpoint, err := proxier.loadBalancer.NextEndpoint(service, cliAddr)
|
var err error
|
||||||
|
svrConn, err = tryConnect(service, cliAddr, "udp", proxier)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Couldn't find an endpoint for %s %v", service, err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
glog.V(3).Infof("Mapped service %q to endpoint %s", service, endpoint)
|
|
||||||
svrConn, err = net.DialTimeout("udp", endpoint, endpointDialTimeout)
|
|
||||||
if err != nil {
|
|
||||||
// TODO: Try another endpoint?
|
|
||||||
glog.Errorf("Dial failed: %v", err)
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
activeClients.clients[cliAddr.String()] = svrConn
|
activeClients.clients[cliAddr.String()] = svrConn
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/iptables"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/iptables"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -97,6 +98,9 @@ var tcpServerPort string
|
|||||||
var udpServerPort string
|
var udpServerPort string
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
// Don't handle panics
|
||||||
|
util.ReallyCrash = true
|
||||||
|
|
||||||
// TCP setup.
|
// TCP setup.
|
||||||
tcp := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
tcp := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
|
Loading…
Reference in New Issue
Block a user