diff --git a/pkg/proxy/proxier.go b/pkg/proxy/proxier.go index 00410e74c17..efd19077961 100644 --- a/pkg/proxy/proxier.go +++ b/pkg/proxy/proxier.go @@ -18,7 +18,6 @@ package proxy import ( "fmt" - "io" "net" "strconv" "strings" @@ -46,234 +45,6 @@ type serviceInfo struct { stickyMaxAgeMinutes int } -// How long we wait for a connection to a backend in seconds -var endpointDialTimeout = []time.Duration{1, 2, 4, 8} - -// Abstraction over TCP/UDP sockets which are proxied. -type proxySocket interface { - // Addr gets the net.Addr for a proxySocket. - Addr() net.Addr - // Close stops the proxySocket from accepting incoming connections. - // Each implementation should comment on the impact of calling Close - // while sessions are active. - Close() error - // ProxyLoop proxies incoming connections for the specified service to the service endpoints. - ProxyLoop(service ServicePortName, info *serviceInfo, proxier *Proxier) -} - -// tcpProxySocket implements proxySocket. Close() is implemented by net.Listener. When Close() is called, -// no new connections are allowed but existing connections are left untouched. -type tcpProxySocket struct { - net.Listener -} - -func tryConnect(service ServicePortName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) { - for _, retryTimeout := range endpointDialTimeout { - endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr) - if err != nil { - glog.Errorf("Couldn't find an endpoint for %s: %v", service, err) - return nil, err - } - glog.V(3).Infof("Mapped service %q to endpoint %s", service, endpoint) - // TODO: This could spin up a new goroutine to make the outbound connection, - // and keep accepting inbound traffic. - outConn, err := net.DialTimeout(protocol, endpoint, retryTimeout*time.Second) - if err != nil { - if isTooManyFDsError(err) { - panic("Dial failed: " + err.Error()) - } - glog.Errorf("Dial failed: %v", err) - continue - } - return outConn, nil - } - return nil, fmt.Errorf("failed to connect to an endpoint.") -} - -func (tcp *tcpProxySocket) ProxyLoop(service ServicePortName, myInfo *serviceInfo, proxier *Proxier) { - for { - if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo { - // The service port was closed or replaced. - return - } - // Block until a connection is made. - inConn, err := tcp.Accept() - if err != nil { - if isTooManyFDsError(err) { - panic("Accept failed: " + err.Error()) - } - - if isClosedError(err) { - return - } - if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo { - // Then the service port was just closed so the accept failure is to be expected. - return - } - glog.Errorf("Accept failed: %v", err) - continue - } - glog.V(2).Infof("Accepted TCP connection from %v to %v", inConn.RemoteAddr(), inConn.LocalAddr()) - outConn, err := tryConnect(service, inConn.(*net.TCPConn).RemoteAddr(), "tcp", proxier) - if err != nil { - glog.Errorf("Failed to connect to balancer: %v", err) - inConn.Close() - continue - } - // Spin up an async copy loop. - go proxyTCP(inConn.(*net.TCPConn), outConn.(*net.TCPConn)) - } -} - -// proxyTCP proxies data bi-directionally between in and out. -func proxyTCP(in, out *net.TCPConn) { - var wg sync.WaitGroup - wg.Add(2) - glog.V(4).Infof("Creating proxy between %v <-> %v <-> %v <-> %v", - in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr()) - go copyBytes("from backend", in, out, &wg) - go copyBytes("to backend", out, in, &wg) - wg.Wait() - in.Close() - out.Close() -} - -func copyBytes(direction string, dest, src *net.TCPConn, wg *sync.WaitGroup) { - defer wg.Done() - glog.V(4).Infof("Copying %s: %s -> %s", direction, src.RemoteAddr(), dest.RemoteAddr()) - n, err := io.Copy(dest, src) - if err != nil { - glog.Errorf("I/O error: %v", err) - } - glog.V(4).Infof("Copied %d bytes %s: %s -> %s", n, direction, src.RemoteAddr(), dest.RemoteAddr()) - dest.CloseWrite() - src.CloseRead() -} - -// udpProxySocket implements proxySocket. Close() is implemented by net.UDPConn. When Close() is called, -// no new connections are allowed and existing connections are broken. -// TODO: We could lame-duck this ourselves, if it becomes important. -type udpProxySocket struct { - *net.UDPConn -} - -func (udp *udpProxySocket) Addr() net.Addr { - return udp.LocalAddr() -} - -// Holds all the known UDP clients that have not timed out. -type clientCache struct { - mu sync.Mutex - clients map[string]net.Conn // addr string -> connection -} - -func newClientCache() *clientCache { - return &clientCache{clients: map[string]net.Conn{}} -} - -func (udp *udpProxySocket) ProxyLoop(service ServicePortName, myInfo *serviceInfo, proxier *Proxier) { - activeClients := newClientCache() - var buffer [4096]byte // 4KiB should be enough for most whole-packets - for { - if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo { - // The service port was closed or replaced. - break - } - - // Block until data arrives. - // TODO: Accumulate a histogram of n or something, to fine tune the buffer size. - n, cliAddr, err := udp.ReadFrom(buffer[0:]) - if err != nil { - if e, ok := err.(net.Error); ok { - if e.Temporary() { - glog.V(1).Infof("ReadFrom had a temporary failure: %v", err) - continue - } - } - glog.Errorf("ReadFrom failed, exiting ProxyLoop: %v", err) - break - } - // If this is a client we know already, reuse the connection and goroutine. - svrConn, err := udp.getBackendConn(activeClients, cliAddr, proxier, service, myInfo.timeout) - if err != nil { - continue - } - // TODO: It would be nice to let the goroutine handle this write, but we don't - // really want to copy the buffer. We could do a pool of buffers or something. - _, err = svrConn.Write(buffer[0:n]) - if err != nil { - if !logTimeout(err) { - glog.Errorf("Write failed: %v", err) - // TODO: Maybe tear down the goroutine for this client/server pair? - } - continue - } - err = svrConn.SetDeadline(time.Now().Add(myInfo.timeout)) - if err != nil { - glog.Errorf("SetDeadline failed: %v", err) - continue - } - } -} - -func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service ServicePortName, timeout time.Duration) (net.Conn, error) { - activeClients.mu.Lock() - defer activeClients.mu.Unlock() - - svrConn, found := activeClients.clients[cliAddr.String()] - if !found { - // TODO: This could spin up a new goroutine to make the outbound connection, - // and keep accepting inbound traffic. - glog.V(2).Infof("New UDP connection from %s", cliAddr) - var err error - svrConn, err = tryConnect(service, cliAddr, "udp", proxier) - if err != nil { - return nil, err - } - if err = svrConn.SetDeadline(time.Now().Add(timeout)); err != nil { - glog.Errorf("SetDeadline failed: %v", err) - return nil, err - } - activeClients.clients[cliAddr.String()] = svrConn - go func(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) { - defer util.HandleCrash() - udp.proxyClient(cliAddr, svrConn, activeClients, timeout) - }(cliAddr, svrConn, activeClients, timeout) - } - return svrConn, nil -} - -// This function is expected to be called as a goroutine. -// TODO: Track and log bytes copied, like TCP -func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) { - defer svrConn.Close() - var buffer [4096]byte - for { - n, err := svrConn.Read(buffer[0:]) - if err != nil { - if !logTimeout(err) { - glog.Errorf("Read failed: %v", err) - } - break - } - err = svrConn.SetDeadline(time.Now().Add(timeout)) - if err != nil { - glog.Errorf("SetDeadline failed: %v", err) - break - } - n, err = udp.WriteTo(buffer[0:n], cliAddr) - if err != nil { - if !logTimeout(err) { - glog.Errorf("WriteTo failed: %v", err) - } - break - } - } - activeClients.mu.Lock() - delete(activeClients.clients, cliAddr.String()) - activeClients.mu.Unlock() -} - func logTimeout(err error) bool { if e, ok := err.(net.Error); ok { if e.Timeout() { @@ -284,29 +55,6 @@ func logTimeout(err error) bool { return false } -func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, error) { - host := ip.String() - switch strings.ToUpper(string(protocol)) { - case "TCP": - listener, err := net.Listen("tcp", net.JoinHostPort(host, strconv.Itoa(port))) - if err != nil { - return nil, err - } - return &tcpProxySocket{listener}, nil - case "UDP": - addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(host, strconv.Itoa(port))) - if err != nil { - return nil, err - } - conn, err := net.ListenUDP("udp", addr) - if err != nil { - return nil, err - } - return &udpProxySocket{conn}, nil - } - return nil, fmt.Errorf("unknown protocol %q", protocol) -} - // Proxier is a simple proxy for TCP connections between a localhost:lport // and services that provide the actual implementations. type Proxier struct { diff --git a/pkg/proxy/proxysocket.go b/pkg/proxy/proxysocket.go new file mode 100644 index 00000000000..398d6af36b0 --- /dev/null +++ b/pkg/proxy/proxysocket.go @@ -0,0 +1,282 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "fmt" + "io" + "net" + "strconv" + "strings" + "sync" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/golang/glog" +) + +// Abstraction over TCP/UDP sockets which are proxied. +type proxySocket interface { + // Addr gets the net.Addr for a proxySocket. + Addr() net.Addr + // Close stops the proxySocket from accepting incoming connections. + // Each implementation should comment on the impact of calling Close + // while sessions are active. + Close() error + // ProxyLoop proxies incoming connections for the specified service to the service endpoints. + ProxyLoop(service ServicePortName, info *serviceInfo, proxier *Proxier) +} + +func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, error) { + host := ip.String() + switch strings.ToUpper(string(protocol)) { + case "TCP": + listener, err := net.Listen("tcp", net.JoinHostPort(host, strconv.Itoa(port))) + if err != nil { + return nil, err + } + return &tcpProxySocket{listener}, nil + case "UDP": + addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(host, strconv.Itoa(port))) + if err != nil { + return nil, err + } + conn, err := net.ListenUDP("udp", addr) + if err != nil { + return nil, err + } + return &udpProxySocket{conn}, nil + } + return nil, fmt.Errorf("unknown protocol %q", protocol) +} + +// How long we wait for a connection to a backend in seconds +var endpointDialTimeout = []time.Duration{1, 2, 4, 8} + +// tcpProxySocket implements proxySocket. Close() is implemented by net.Listener. When Close() is called, +// no new connections are allowed but existing connections are left untouched. +type tcpProxySocket struct { + net.Listener +} + +func tryConnect(service ServicePortName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) { + for _, retryTimeout := range endpointDialTimeout { + endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr) + if err != nil { + glog.Errorf("Couldn't find an endpoint for %s: %v", service, err) + return nil, err + } + glog.V(3).Infof("Mapped service %q to endpoint %s", service, endpoint) + // TODO: This could spin up a new goroutine to make the outbound connection, + // and keep accepting inbound traffic. + outConn, err := net.DialTimeout(protocol, endpoint, retryTimeout*time.Second) + if err != nil { + if isTooManyFDsError(err) { + panic("Dial failed: " + err.Error()) + } + glog.Errorf("Dial failed: %v", err) + continue + } + return outConn, nil + } + return nil, fmt.Errorf("failed to connect to an endpoint.") +} + +func (tcp *tcpProxySocket) ProxyLoop(service ServicePortName, myInfo *serviceInfo, proxier *Proxier) { + for { + if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo { + // The service port was closed or replaced. + return + } + // Block until a connection is made. + inConn, err := tcp.Accept() + if err != nil { + if isTooManyFDsError(err) { + panic("Accept failed: " + err.Error()) + } + + if isClosedError(err) { + return + } + if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo { + // Then the service port was just closed so the accept failure is to be expected. + return + } + glog.Errorf("Accept failed: %v", err) + continue + } + glog.V(2).Infof("Accepted TCP connection from %v to %v", inConn.RemoteAddr(), inConn.LocalAddr()) + outConn, err := tryConnect(service, inConn.(*net.TCPConn).RemoteAddr(), "tcp", proxier) + if err != nil { + glog.Errorf("Failed to connect to balancer: %v", err) + inConn.Close() + continue + } + // Spin up an async copy loop. + go proxyTCP(inConn.(*net.TCPConn), outConn.(*net.TCPConn)) + } +} + +// proxyTCP proxies data bi-directionally between in and out. +func proxyTCP(in, out *net.TCPConn) { + var wg sync.WaitGroup + wg.Add(2) + glog.V(4).Infof("Creating proxy between %v <-> %v <-> %v <-> %v", + in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr()) + go copyBytes("from backend", in, out, &wg) + go copyBytes("to backend", out, in, &wg) + wg.Wait() + in.Close() + out.Close() +} + +func copyBytes(direction string, dest, src *net.TCPConn, wg *sync.WaitGroup) { + defer wg.Done() + glog.V(4).Infof("Copying %s: %s -> %s", direction, src.RemoteAddr(), dest.RemoteAddr()) + n, err := io.Copy(dest, src) + if err != nil { + glog.Errorf("I/O error: %v", err) + } + glog.V(4).Infof("Copied %d bytes %s: %s -> %s", n, direction, src.RemoteAddr(), dest.RemoteAddr()) + dest.CloseWrite() + src.CloseRead() +} + +// udpProxySocket implements proxySocket. Close() is implemented by net.UDPConn. When Close() is called, +// no new connections are allowed and existing connections are broken. +// TODO: We could lame-duck this ourselves, if it becomes important. +type udpProxySocket struct { + *net.UDPConn +} + +func (udp *udpProxySocket) Addr() net.Addr { + return udp.LocalAddr() +} + +// Holds all the known UDP clients that have not timed out. +type clientCache struct { + mu sync.Mutex + clients map[string]net.Conn // addr string -> connection +} + +func newClientCache() *clientCache { + return &clientCache{clients: map[string]net.Conn{}} +} + +func (udp *udpProxySocket) ProxyLoop(service ServicePortName, myInfo *serviceInfo, proxier *Proxier) { + activeClients := newClientCache() + var buffer [4096]byte // 4KiB should be enough for most whole-packets + for { + if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo { + // The service port was closed or replaced. + break + } + + // Block until data arrives. + // TODO: Accumulate a histogram of n or something, to fine tune the buffer size. + n, cliAddr, err := udp.ReadFrom(buffer[0:]) + if err != nil { + if e, ok := err.(net.Error); ok { + if e.Temporary() { + glog.V(1).Infof("ReadFrom had a temporary failure: %v", err) + continue + } + } + glog.Errorf("ReadFrom failed, exiting ProxyLoop: %v", err) + break + } + // If this is a client we know already, reuse the connection and goroutine. + svrConn, err := udp.getBackendConn(activeClients, cliAddr, proxier, service, myInfo.timeout) + if err != nil { + continue + } + // TODO: It would be nice to let the goroutine handle this write, but we don't + // really want to copy the buffer. We could do a pool of buffers or something. + _, err = svrConn.Write(buffer[0:n]) + if err != nil { + if !logTimeout(err) { + glog.Errorf("Write failed: %v", err) + // TODO: Maybe tear down the goroutine for this client/server pair? + } + continue + } + err = svrConn.SetDeadline(time.Now().Add(myInfo.timeout)) + if err != nil { + glog.Errorf("SetDeadline failed: %v", err) + continue + } + } +} + +func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service ServicePortName, timeout time.Duration) (net.Conn, error) { + activeClients.mu.Lock() + defer activeClients.mu.Unlock() + + svrConn, found := activeClients.clients[cliAddr.String()] + if !found { + // TODO: This could spin up a new goroutine to make the outbound connection, + // and keep accepting inbound traffic. + glog.V(2).Infof("New UDP connection from %s", cliAddr) + var err error + svrConn, err = tryConnect(service, cliAddr, "udp", proxier) + if err != nil { + return nil, err + } + if err = svrConn.SetDeadline(time.Now().Add(timeout)); err != nil { + glog.Errorf("SetDeadline failed: %v", err) + return nil, err + } + activeClients.clients[cliAddr.String()] = svrConn + go func(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) { + defer util.HandleCrash() + udp.proxyClient(cliAddr, svrConn, activeClients, timeout) + }(cliAddr, svrConn, activeClients, timeout) + } + return svrConn, nil +} + +// This function is expected to be called as a goroutine. +// TODO: Track and log bytes copied, like TCP +func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) { + defer svrConn.Close() + var buffer [4096]byte + for { + n, err := svrConn.Read(buffer[0:]) + if err != nil { + if !logTimeout(err) { + glog.Errorf("Read failed: %v", err) + } + break + } + err = svrConn.SetDeadline(time.Now().Add(timeout)) + if err != nil { + glog.Errorf("SetDeadline failed: %v", err) + break + } + n, err = udp.WriteTo(buffer[0:n], cliAddr) + if err != nil { + if !logTimeout(err) { + glog.Errorf("WriteTo failed: %v", err) + } + break + } + } + activeClients.mu.Lock() + delete(activeClients.clients, cliAddr.String()) + activeClients.mu.Unlock() +}