From 9dcf8ef344e93b0baacf108ab781549215f2462b Mon Sep 17 00:00:00 2001 From: Solly Ross Date: Thu, 18 Aug 2016 13:41:51 -0400 Subject: [PATCH 1/6] Userspace Proxy: allow check for endpoints on svc This commit adds a method to the `LoadBalancer` interface in the userspace proxy which allows consumers of the `LoadBalancer` to check if it thinks a given service has endpoints available. --- pkg/proxy/userspace/loadbalancer.go | 1 + pkg/proxy/userspace/roundrobin.go | 10 ++++++++++ 2 files changed, 11 insertions(+) diff --git a/pkg/proxy/userspace/loadbalancer.go b/pkg/proxy/userspace/loadbalancer.go index dee5cb843de..2bf23d5b940 100644 --- a/pkg/proxy/userspace/loadbalancer.go +++ b/pkg/proxy/userspace/loadbalancer.go @@ -31,4 +31,5 @@ type LoadBalancer interface { NewService(service proxy.ServicePortName, sessionAffinityType api.ServiceAffinity, stickyMaxAgeMinutes int) error DeleteService(service proxy.ServicePortName) CleanupStaleStickySessions(service proxy.ServicePortName) + ServiceHasEndpoints(service proxy.ServicePortName) bool } diff --git a/pkg/proxy/userspace/roundrobin.go b/pkg/proxy/userspace/roundrobin.go index 3d52445837e..ca0ccc6eddf 100644 --- a/pkg/proxy/userspace/roundrobin.go +++ b/pkg/proxy/userspace/roundrobin.go @@ -120,6 +120,16 @@ func isSessionAffinity(affinity *affinityPolicy) bool { return true } +// ServiceHasEndpoints checks whether a service entry has endpoints. +func (lb *LoadBalancerRR) ServiceHasEndpoints(svcPort proxy.ServicePortName) bool { + lb.lock.Lock() + defer lb.lock.Unlock() + state, exists := lb.services[svcPort] + // TODO: while nothing ever assigns nil to the map, *some* of the code using the map + // checks for it. The code should all follow the same convention. + return exists && state != nil && len(state.endpoints) > 0 +} + // NextEndpoint returns a service endpoint. // The service endpoint is chosen using the round-robin algorithm. func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool) (string, error) { From 43c4d7ae2398261016eb74ca7e25c10b32fd9a30 Mon Sep 17 00:00:00 2001 From: Solly Ross Date: Thu, 18 Aug 2016 13:59:09 -0400 Subject: [PATCH 2/6] Userspace Proxy: Make ProxySocket Implementable This commit makes it possible for the `ProxySocket` interface to be implemented by types outside of the `userspace` package. It mainly just exposes relevant types and fields as public. --- pkg/proxy/userspace/proxier.go | 47 +++++++++++--------- pkg/proxy/userspace/proxier_test.go | 16 +++---- pkg/proxy/userspace/proxysocket.go | 68 ++++++++++++++--------------- 3 files changed, 68 insertions(+), 63 deletions(-) diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 0e487d5da5b..57bd77cd742 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -42,14 +42,18 @@ type portal struct { isExternal bool } -type serviceInfo struct { +// ServiceInfo contains information and state for a particular proxied service +type ServiceInfo struct { + // Timeout is the the read/write timeout (used for UDP connections) + Timeout time.Duration + // ActiveClients is the cache of active UDP clients being proxied by this proxy for this service + ActiveClients *ClientCache + isAliveAtomic int32 // Only access this with atomic ops portal portal protocol api.Protocol proxyPort int - socket proxySocket - timeout time.Duration - activeClients *clientCache + socket ProxySocket nodePort int loadBalancerStatus api.LoadBalancerStatus sessionAffinityType api.ServiceAffinity @@ -58,7 +62,7 @@ type serviceInfo struct { externalIPs []string } -func (info *serviceInfo) setAlive(b bool) { +func (info *ServiceInfo) setAlive(b bool) { var i int32 if b { i = 1 @@ -66,7 +70,7 @@ func (info *serviceInfo) setAlive(b bool) { atomic.StoreInt32(&info.isAliveAtomic, i) } -func (info *serviceInfo) isAlive() bool { +func (info *ServiceInfo) IsAlive() bool { return atomic.LoadInt32(&info.isAliveAtomic) != 0 } @@ -85,7 +89,7 @@ func logTimeout(err error) bool { type Proxier struct { loadBalancer LoadBalancer mu sync.Mutex // protects serviceMap - serviceMap map[proxy.ServicePortName]*serviceInfo + serviceMap map[proxy.ServicePortName]*ServiceInfo syncPeriod time.Duration minSyncPeriod time.Duration // unused atm, but plumbed through udpIdleTimeout time.Duration @@ -177,7 +181,7 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables } return &Proxier{ loadBalancer: loadBalancer, - serviceMap: make(map[proxy.ServicePortName]*serviceInfo), + serviceMap: make(map[proxy.ServicePortName]*ServiceInfo), portMap: make(map[portMapKey]*portMapValue), syncPeriod: syncPeriod, // plumbed through if needed, not used atm. @@ -301,14 +305,14 @@ func (proxier *Proxier) cleanupStaleStickySessions() { } // This assumes proxier.mu is not locked. -func (proxier *Proxier) stopProxy(service proxy.ServicePortName, info *serviceInfo) error { +func (proxier *Proxier) stopProxy(service proxy.ServicePortName, info *ServiceInfo) error { proxier.mu.Lock() defer proxier.mu.Unlock() return proxier.stopProxyInternal(service, info) } // This assumes proxier.mu is locked. -func (proxier *Proxier) stopProxyInternal(service proxy.ServicePortName, info *serviceInfo) error { +func (proxier *Proxier) stopProxyInternal(service proxy.ServicePortName, info *ServiceInfo) error { delete(proxier.serviceMap, service) info.setAlive(false) err := info.socket.Close() @@ -317,23 +321,23 @@ func (proxier *Proxier) stopProxyInternal(service proxy.ServicePortName, info *s return err } -func (proxier *Proxier) getServiceInfo(service proxy.ServicePortName) (*serviceInfo, bool) { +func (proxier *Proxier) getServiceInfo(service proxy.ServicePortName) (*ServiceInfo, bool) { proxier.mu.Lock() defer proxier.mu.Unlock() info, ok := proxier.serviceMap[service] return info, ok } -func (proxier *Proxier) setServiceInfo(service proxy.ServicePortName, info *serviceInfo) { +func (proxier *Proxier) setServiceInfo(service proxy.ServicePortName, info *ServiceInfo) { proxier.mu.Lock() defer proxier.mu.Unlock() proxier.serviceMap[service] = info } -// addServiceOnPort starts listening for a new service, returning the serviceInfo. +// addServiceOnPort starts listening for a new service, returning the ServiceInfo. // Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP // connections, for now. -func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*serviceInfo, error) { +func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) { sock, err := newProxySocket(protocol, proxier.listenIP, proxyPort) if err != nil { return nil, err @@ -348,13 +352,14 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol sock.Close() return nil, err } - si := &serviceInfo{ + si := &ServiceInfo{ + Timeout: timeout, + ActiveClients: newClientCache(), + isAliveAtomic: 1, proxyPort: portNum, protocol: protocol, socket: sock, - timeout: timeout, - activeClients: newClientCache(), sessionAffinityType: api.ServiceAffinityNone, // default stickyMaxAgeMinutes: 180, // TODO: parameterize this in the API. } @@ -364,7 +369,7 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol go func(service proxy.ServicePortName, proxier *Proxier) { defer runtime.HandleCrash() atomic.AddInt32(&proxier.numProxyLoops, 1) - sock.ProxyLoop(service, si, proxier) + sock.ProxyLoop(service, si, proxier.loadBalancer) atomic.AddInt32(&proxier.numProxyLoops, -1) }(service, proxier) @@ -455,7 +460,7 @@ func (proxier *Proxier) OnServiceUpdate(services []api.Service) { } } -func sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool { +func sameConfig(info *ServiceInfo, service *api.Service, port *api.ServicePort) bool { if info.protocol != port.Protocol || info.portal.port != int(port.Port) || info.nodePort != int(port.NodePort) { return false } @@ -486,7 +491,7 @@ func ipsEqual(lhs, rhs []string) bool { return true } -func (proxier *Proxier) openPortal(service proxy.ServicePortName, info *serviceInfo) error { +func (proxier *Proxier) openPortal(service proxy.ServicePortName, info *ServiceInfo) error { err := proxier.openOnePortal(info.portal, info.protocol, proxier.listenIP, info.proxyPort, service) if err != nil { return err @@ -668,7 +673,7 @@ func (proxier *Proxier) openNodePort(nodePort int, protocol api.Protocol, proxyI return nil } -func (proxier *Proxier) closePortal(service proxy.ServicePortName, info *serviceInfo) error { +func (proxier *Proxier) closePortal(service proxy.ServicePortName, info *ServiceInfo) error { // Collect errors and report them all at the end. el := proxier.closeOnePortal(info.portal, info.protocol, proxier.listenIP, info.proxyPort, service) for _, publicIP := range info.externalIPs { diff --git a/pkg/proxy/userspace/proxier_test.go b/pkg/proxy/userspace/proxier_test.go index 6650a3d0e47..8a6994fc620 100644 --- a/pkg/proxy/userspace/proxier_test.go +++ b/pkg/proxy/userspace/proxier_test.go @@ -182,14 +182,14 @@ func waitForNumProxyLoops(t *testing.T, p *Proxier, want int32) { t.Errorf("expected %d ProxyLoops running, got %d", want, got) } -func waitForNumProxyClients(t *testing.T, s *serviceInfo, want int, timeout time.Duration) { +func waitForNumProxyClients(t *testing.T, s *ServiceInfo, want int, timeout time.Duration) { var got int now := time.Now() deadline := now.Add(timeout) for time.Now().Before(deadline) { - s.activeClients.mu.Lock() - got = len(s.activeClients.clients) - s.activeClients.mu.Unlock() + s.ActiveClients.Mu.Lock() + got = len(s.ActiveClients.Clients) + s.ActiveClients.Mu.Unlock() if got == want { return } @@ -401,8 +401,8 @@ func TestTCPProxyStop(t *testing.T) { if err != nil { t.Fatalf("error adding new service: %#v", err) } - if !svcInfo.isAlive() { - t.Fatalf("wrong value for isAlive(): expected true") + if !svcInfo.IsAlive() { + t.Fatalf("wrong value for IsAlive(): expected true") } conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort)) if err != nil { @@ -412,8 +412,8 @@ func TestTCPProxyStop(t *testing.T) { waitForNumProxyLoops(t, p, 1) stopProxyByName(p, service) - if svcInfo.isAlive() { - t.Fatalf("wrong value for isAlive(): expected false") + if svcInfo.IsAlive() { + t.Fatalf("wrong value for IsAlive(): expected false") } // Wait for the port to really close. if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { diff --git a/pkg/proxy/userspace/proxysocket.go b/pkg/proxy/userspace/proxysocket.go index 2bc346faa39..b9da00bce90 100644 --- a/pkg/proxy/userspace/proxysocket.go +++ b/pkg/proxy/userspace/proxysocket.go @@ -32,20 +32,20 @@ import ( ) // Abstraction over TCP/UDP sockets which are proxied. -type proxySocket interface { - // Addr gets the net.Addr for a proxySocket. +type ProxySocket interface { + // Addr gets the net.Addr for a ProxySocket. Addr() net.Addr - // Close stops the proxySocket from accepting incoming connections. + // Close stops the ProxySocket from accepting incoming connections. // Each implementation should comment on the impact of calling Close // while sessions are active. Close() error // ProxyLoop proxies incoming connections for the specified service to the service endpoints. - ProxyLoop(service proxy.ServicePortName, info *serviceInfo, proxier *Proxier) - // ListenPort returns the host port that the proxySocket is listening on + ProxyLoop(service proxy.ServicePortName, info *ServiceInfo, loadBalancer LoadBalancer) + // ListenPort returns the host port that the ProxySocket is listening on ListenPort() int } -func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, error) { +func newProxySocket(protocol api.Protocol, ip net.IP, port int) (ProxySocket, error) { host := "" if ip != nil { host = ip.String() @@ -75,7 +75,7 @@ func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, er // How long we wait for a connection to a backend in seconds var endpointDialTimeout = []time.Duration{250 * time.Millisecond, 500 * time.Millisecond, 1 * time.Second, 2 * time.Second} -// tcpProxySocket implements proxySocket. Close() is implemented by net.Listener. When Close() is called, +// tcpProxySocket implements ProxySocket. Close() is implemented by net.Listener. When Close() is called, // no new connections are allowed but existing connections are left untouched. type tcpProxySocket struct { net.Listener @@ -86,10 +86,10 @@ func (tcp *tcpProxySocket) ListenPort() int { return tcp.port } -func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) { +func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string, loadBalancer LoadBalancer) (out net.Conn, err error) { sessionAffinityReset := false for _, dialTimeout := range endpointDialTimeout { - endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr, sessionAffinityReset) + endpoint, err := loadBalancer.NextEndpoint(service, srcAddr, sessionAffinityReset) if err != nil { glog.Errorf("Couldn't find an endpoint for %s: %v", service, err) return nil, err @@ -111,9 +111,9 @@ func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string return nil, fmt.Errorf("failed to connect to an endpoint.") } -func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) { +func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *ServiceInfo, loadBalancer LoadBalancer) { for { - if !myInfo.isAlive() { + if !myInfo.IsAlive() { // The service port was closed or replaced. return } @@ -127,7 +127,7 @@ func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serv if isClosedError(err) { return } - if !myInfo.isAlive() { + if !myInfo.IsAlive() { // Then the service port was just closed so the accept failure is to be expected. return } @@ -135,7 +135,7 @@ func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serv continue } glog.V(3).Infof("Accepted TCP connection from %v to %v", inConn.RemoteAddr(), inConn.LocalAddr()) - outConn, err := tryConnect(service, inConn.(*net.TCPConn).RemoteAddr(), "tcp", proxier) + outConn, err := tryConnect(service, inConn.(*net.TCPConn).RemoteAddr(), "tcp", loadBalancer) if err != nil { glog.Errorf("Failed to connect to balancer: %v", err) inConn.Close() @@ -171,7 +171,7 @@ func copyBytes(direction string, dest, src *net.TCPConn, wg *sync.WaitGroup) { src.Close() } -// udpProxySocket implements proxySocket. Close() is implemented by net.UDPConn. When Close() is called, +// udpProxySocket implements ProxySocket. Close() is implemented by net.UDPConn. When Close() is called, // no new connections are allowed and existing connections are broken. // TODO: We could lame-duck this ourselves, if it becomes important. type udpProxySocket struct { @@ -188,19 +188,19 @@ func (udp *udpProxySocket) Addr() net.Addr { } // Holds all the known UDP clients that have not timed out. -type clientCache struct { - mu sync.Mutex - clients map[string]net.Conn // addr string -> connection +type ClientCache struct { + Mu sync.Mutex + Clients map[string]net.Conn // addr string -> connection } -func newClientCache() *clientCache { - return &clientCache{clients: map[string]net.Conn{}} +func newClientCache() *ClientCache { + return &ClientCache{Clients: map[string]net.Conn{}} } -func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) { +func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *ServiceInfo, loadBalancer LoadBalancer) { var buffer [4096]byte // 4KiB should be enough for most whole-packets for { - if !myInfo.isAlive() { + if !myInfo.IsAlive() { // The service port was closed or replaced. break } @@ -219,7 +219,7 @@ func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serv break } // If this is a client we know already, reuse the connection and goroutine. - svrConn, err := udp.getBackendConn(myInfo.activeClients, cliAddr, proxier, service, myInfo.timeout) + svrConn, err := udp.getBackendConn(myInfo.ActiveClients, cliAddr, loadBalancer, service, myInfo.Timeout) if err != nil { continue } @@ -233,7 +233,7 @@ func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serv } continue } - err = svrConn.SetDeadline(time.Now().Add(myInfo.timeout)) + err = svrConn.SetDeadline(time.Now().Add(myInfo.Timeout)) if err != nil { glog.Errorf("SetDeadline failed: %v", err) continue @@ -241,17 +241,17 @@ func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serv } } -func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service proxy.ServicePortName, timeout time.Duration) (net.Conn, error) { - activeClients.mu.Lock() - defer activeClients.mu.Unlock() +func (udp *udpProxySocket) getBackendConn(activeClients *ClientCache, cliAddr net.Addr, loadBalancer LoadBalancer, service proxy.ServicePortName, timeout time.Duration) (net.Conn, error) { + activeClients.Mu.Lock() + defer activeClients.Mu.Unlock() - svrConn, found := activeClients.clients[cliAddr.String()] + svrConn, found := activeClients.Clients[cliAddr.String()] if !found { // TODO: This could spin up a new goroutine to make the outbound connection, // and keep accepting inbound traffic. glog.V(3).Infof("New UDP connection from %s", cliAddr) var err error - svrConn, err = tryConnect(service, cliAddr, "udp", proxier) + svrConn, err = tryConnect(service, cliAddr, "udp", loadBalancer) if err != nil { return nil, err } @@ -259,8 +259,8 @@ func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr ne glog.Errorf("SetDeadline failed: %v", err) return nil, err } - activeClients.clients[cliAddr.String()] = svrConn - go func(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) { + activeClients.Clients[cliAddr.String()] = svrConn + go func(cliAddr net.Addr, svrConn net.Conn, activeClients *ClientCache, timeout time.Duration) { defer runtime.HandleCrash() udp.proxyClient(cliAddr, svrConn, activeClients, timeout) }(cliAddr, svrConn, activeClients, timeout) @@ -270,7 +270,7 @@ func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr ne // This function is expected to be called as a goroutine. // TODO: Track and log bytes copied, like TCP -func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) { +func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activeClients *ClientCache, timeout time.Duration) { defer svrConn.Close() var buffer [4096]byte for { @@ -294,7 +294,7 @@ func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activ break } } - activeClients.mu.Lock() - delete(activeClients.clients, cliAddr.String()) - activeClients.mu.Unlock() + activeClients.Mu.Lock() + delete(activeClients.Clients, cliAddr.String()) + activeClients.Mu.Unlock() } From de2285ac7b8500599b397632b79e201e00454680 Mon Sep 17 00:00:00 2001 From: Solly Ross Date: Thu, 18 Aug 2016 14:07:24 -0400 Subject: [PATCH 3/6] Userspace Proxy: Allow any ProxySocket in Proxier This commit adds a new method for constructing userspace proxiers, `NewCustomProxier`. `NewCustomProxier` functions identically to `NewProxier`, except that it allows a custom constructor method to be passed in to construct instances of ProxySocket. --- pkg/proxy/userspace/proxier.go | 59 ++++++++++++++++++----------- pkg/proxy/userspace/proxier_test.go | 30 +++++++-------- 2 files changed, 51 insertions(+), 38 deletions(-) diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 57bd77cd742..8925e5541a0 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -84,22 +84,26 @@ func logTimeout(err error) bool { return false } +// ProxySocketFunc is a function which constructs a ProxySocket from a protocol, ip, and port +type ProxySocketFunc func(protocol api.Protocol, ip net.IP, port int) (ProxySocket, error) + // Proxier is a simple proxy for TCP connections between a localhost:lport // and services that provide the actual implementations. type Proxier struct { - loadBalancer LoadBalancer - mu sync.Mutex // protects serviceMap - serviceMap map[proxy.ServicePortName]*ServiceInfo - syncPeriod time.Duration - minSyncPeriod time.Duration // unused atm, but plumbed through - udpIdleTimeout time.Duration - portMapMutex sync.Mutex - portMap map[portMapKey]*portMapValue - numProxyLoops int32 // use atomic ops to access this; mostly for testing - listenIP net.IP - iptables iptables.Interface - hostIP net.IP - proxyPorts PortAllocator + loadBalancer LoadBalancer + mu sync.Mutex // protects serviceMap + serviceMap map[proxy.ServicePortName]*ServiceInfo + syncPeriod time.Duration + minSyncPeriod time.Duration // unused atm, but plumbed through + udpIdleTimeout time.Duration + portMapMutex sync.Mutex + portMap map[portMapKey]*portMapValue + numProxyLoops int32 // use atomic ops to access this; mostly for testing + listenIP net.IP + iptables iptables.Interface + hostIP net.IP + proxyPorts PortAllocator + makeProxySocket ProxySocketFunc } // assert Proxier is a ProxyProvider @@ -145,6 +149,14 @@ func IsProxyLocked(err error) bool { // created, it will keep iptables up to date in the background and will not // terminate if a particular iptables call fails. func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) { + return NewCustomProxier(loadBalancer, listenIP, iptables, pr, syncPeriod, minSyncPeriod, udpIdleTimeout, newProxySocket) +} + +// NewCustomProxier functions similarly to NewProxier, returing a new Proxier +// for the given LoadBalancer and address. The new proxier is constructed using +// the ProxySocket constructor provided, however, instead of constructing the +// default ProxySockets. +func NewCustomProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration, makeProxySocket ProxySocketFunc) (*Proxier, error) { if listenIP.Equal(localhostIPv4) || listenIP.Equal(localhostIPv6) { return nil, ErrProxyOnLocalhost } @@ -162,10 +174,10 @@ func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.In proxyPorts := newPortAllocator(pr) glog.V(2).Infof("Setting proxy IP to %v and initializing iptables", hostIP) - return createProxier(loadBalancer, listenIP, iptables, hostIP, proxyPorts, syncPeriod, minSyncPeriod, udpIdleTimeout) + return createProxier(loadBalancer, listenIP, iptables, hostIP, proxyPorts, syncPeriod, minSyncPeriod, udpIdleTimeout, makeProxySocket) } -func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) { +func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration, makeProxySocket ProxySocketFunc) (*Proxier, error) { // convenient to pass nil for tests.. if proxyPorts == nil { proxyPorts = newPortAllocator(utilnet.PortRange{}) @@ -185,12 +197,13 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables portMap: make(map[portMapKey]*portMapValue), syncPeriod: syncPeriod, // plumbed through if needed, not used atm. - minSyncPeriod: minSyncPeriod, - udpIdleTimeout: udpIdleTimeout, - listenIP: listenIP, - iptables: iptables, - hostIP: hostIP, - proxyPorts: proxyPorts, + minSyncPeriod: minSyncPeriod, + udpIdleTimeout: udpIdleTimeout, + listenIP: listenIP, + iptables: iptables, + hostIP: hostIP, + proxyPorts: proxyPorts, + makeProxySocket: makeProxySocket, }, nil } @@ -338,7 +351,7 @@ func (proxier *Proxier) setServiceInfo(service proxy.ServicePortName, info *Serv // Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP // connections, for now. func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) { - sock, err := newProxySocket(protocol, proxier.listenIP, proxyPort) + sock, err := proxier.makeProxySocket(protocol, proxier.listenIP, proxyPort) if err != nil { return nil, err } @@ -593,7 +606,7 @@ func (proxier *Proxier) claimNodePort(ip net.IP, port int, protocol api.Protocol // it. Tools like 'ss' and 'netstat' do not show sockets that are // bind()ed but not listen()ed, and at least the default debian netcat // has no way to avoid about 10 seconds of retries. - socket, err := newProxySocket(protocol, ip, port) + socket, err := proxier.makeProxySocket(protocol, ip, port) if err != nil { return fmt.Errorf("can't open node port for %s: %v", key.String(), err) } diff --git a/pkg/proxy/userspace/proxier_test.go b/pkg/proxy/userspace/proxier_test.go index 8a6994fc620..66383e7a6be 100644 --- a/pkg/proxy/userspace/proxier_test.go +++ b/pkg/proxy/userspace/proxier_test.go @@ -211,7 +211,7 @@ func TestTCPProxy(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -238,7 +238,7 @@ func TestUDPProxy(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -265,7 +265,7 @@ func TestUDPProxyTimeout(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -301,7 +301,7 @@ func TestMultiPortProxy(t *testing.T) { }}, }}) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -328,7 +328,7 @@ func TestMultiPortOnServiceUpdate(t *testing.T) { serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "q"} serviceX := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "x"} - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -391,7 +391,7 @@ func TestTCPProxyStop(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -435,7 +435,7 @@ func TestUDPProxyStop(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -473,7 +473,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -510,7 +510,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -546,7 +546,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { } lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -599,7 +599,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { } lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -653,7 +653,7 @@ func TestTCPProxyUpdatePort(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -701,7 +701,7 @@ func TestUDPProxyUpdatePort(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -746,7 +746,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -797,7 +797,7 @@ func TestProxyUpdatePortal(t *testing.T) { } lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } From f5526727fb52c735e9befc34dcde55f5a43e7615 Mon Sep 17 00:00:00 2001 From: Solly Ross Date: Thu, 18 Aug 2016 14:11:05 -0400 Subject: [PATCH 4/6] Userspace Proxy: Expose ProxySocket utility funcs This commit exposes several utility functions that are valuable for implementing custom ProxySockets. --- pkg/proxy/userspace/proxysocket.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/pkg/proxy/userspace/proxysocket.go b/pkg/proxy/userspace/proxysocket.go index b9da00bce90..a28b6011b6c 100644 --- a/pkg/proxy/userspace/proxysocket.go +++ b/pkg/proxy/userspace/proxysocket.go @@ -73,7 +73,7 @@ func newProxySocket(protocol api.Protocol, ip net.IP, port int) (ProxySocket, er } // How long we wait for a connection to a backend in seconds -var endpointDialTimeout = []time.Duration{250 * time.Millisecond, 500 * time.Millisecond, 1 * time.Second, 2 * time.Second} +var EndpointDialTimeouts = []time.Duration{250 * time.Millisecond, 500 * time.Millisecond, 1 * time.Second, 2 * time.Second} // tcpProxySocket implements ProxySocket. Close() is implemented by net.Listener. When Close() is called, // no new connections are allowed but existing connections are left untouched. @@ -86,9 +86,11 @@ func (tcp *tcpProxySocket) ListenPort() int { return tcp.port } -func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string, loadBalancer LoadBalancer) (out net.Conn, err error) { +// TryConnectEndpoints attempts to connect to the next available endpoint for the given service, cycling +// through until it is able to successully connect, or it has tried with all timeouts in EndpointDialTimeouts. +func TryConnectEndpoints(service proxy.ServicePortName, srcAddr net.Addr, protocol string, loadBalancer LoadBalancer) (out net.Conn, err error) { sessionAffinityReset := false - for _, dialTimeout := range endpointDialTimeout { + for _, dialTimeout := range EndpointDialTimeouts { endpoint, err := loadBalancer.NextEndpoint(service, srcAddr, sessionAffinityReset) if err != nil { glog.Errorf("Couldn't find an endpoint for %s: %v", service, err) @@ -135,19 +137,19 @@ func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *Serv continue } glog.V(3).Infof("Accepted TCP connection from %v to %v", inConn.RemoteAddr(), inConn.LocalAddr()) - outConn, err := tryConnect(service, inConn.(*net.TCPConn).RemoteAddr(), "tcp", loadBalancer) + outConn, err := TryConnectEndpoints(service, inConn.(*net.TCPConn).RemoteAddr(), "tcp", loadBalancer) if err != nil { glog.Errorf("Failed to connect to balancer: %v", err) inConn.Close() continue } // Spin up an async copy loop. - go proxyTCP(inConn.(*net.TCPConn), outConn.(*net.TCPConn)) + go ProxyTCP(inConn.(*net.TCPConn), outConn.(*net.TCPConn)) } } -// proxyTCP proxies data bi-directionally between in and out. -func proxyTCP(in, out *net.TCPConn) { +// ProxyTCP proxies data bi-directionally between in and out. +func ProxyTCP(in, out *net.TCPConn) { var wg sync.WaitGroup wg.Add(2) glog.V(4).Infof("Creating proxy between %v <-> %v <-> %v <-> %v", @@ -251,7 +253,7 @@ func (udp *udpProxySocket) getBackendConn(activeClients *ClientCache, cliAddr ne // and keep accepting inbound traffic. glog.V(3).Infof("New UDP connection from %s", cliAddr) var err error - svrConn, err = tryConnect(service, cliAddr, "udp", loadBalancer) + svrConn, err = TryConnectEndpoints(service, cliAddr, "udp", loadBalancer) if err != nil { return nil, err } From 655b33825651a8ddfe0cfc739c1f83ae59c24bab Mon Sep 17 00:00:00 2001 From: Solly Ross Date: Thu, 18 Aug 2016 14:18:12 -0400 Subject: [PATCH 5/6] Userspace Proxy: Keep ref to service being proxied This commit makes the userspace proxy keep an ObjectReference to the service being proxied. This allows the consumers of the `ServiceInfo` struct, like `ProxySockets` to emit events about or otherwise refer to the service. --- pkg/proxy/userspace/proxier.go | 18 ++++++++++-- pkg/proxy/userspace/proxier_test.go | 45 +++++++++++++++++++---------- 2 files changed, 46 insertions(+), 17 deletions(-) diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 8925e5541a0..7b375739b8c 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -48,6 +48,8 @@ type ServiceInfo struct { Timeout time.Duration // ActiveClients is the cache of active UDP clients being proxied by this proxy for this service ActiveClients *ClientCache + // ServiceRef is a full object reference to the the service described by this ServiceInfo + ServiceRef api.ObjectReference isAliveAtomic int32 // Only access this with atomic ops portal portal @@ -350,7 +352,7 @@ func (proxier *Proxier) setServiceInfo(service proxy.ServicePortName, info *Serv // addServiceOnPort starts listening for a new service, returning the ServiceInfo. // Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP // connections, for now. -func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) { +func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, serviceRef api.ObjectReference, protocol api.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) { sock, err := proxier.makeProxySocket(protocol, proxier.listenIP, proxyPort) if err != nil { return nil, err @@ -368,6 +370,7 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol si := &ServiceInfo{ Timeout: timeout, ActiveClients: newClientCache(), + ServiceRef: serviceRef, isAliveAtomic: 1, proxyPort: portNum, @@ -404,6 +407,17 @@ func (proxier *Proxier) OnServiceUpdate(services []api.Service) { continue } + // TODO: should this just be api.GetReference? + svcGVK := service.GetObjectKind().GroupVersionKind() + svcRef := api.ObjectReference{ + Kind: svcGVK.Kind, + Namespace: service.Namespace, + Name: service.Name, + UID: service.UID, + APIVersion: svcGVK.GroupVersion().String(), + ResourceVersion: service.ResourceVersion, + } + for i := range service.Spec.Ports { servicePort := &service.Spec.Ports[i] serviceName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: servicePort.Name} @@ -434,7 +448,7 @@ func (proxier *Proxier) OnServiceUpdate(services []api.Service) { } glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol) - info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout) + info, err = proxier.addServiceOnPort(serviceName, svcRef, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout) if err != nil { glog.Errorf("Failed to start proxy for %q: %v", serviceName, err) continue diff --git a/pkg/proxy/userspace/proxier_test.go b/pkg/proxy/userspace/proxier_test.go index 66383e7a6be..c5f327667f5 100644 --- a/pkg/proxy/userspace/proxier_test.go +++ b/pkg/proxy/userspace/proxier_test.go @@ -217,7 +217,8 @@ func TestTCPProxy(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) + serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -244,7 +245,8 @@ func TestUDPProxy(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) + serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -271,7 +273,8 @@ func TestUDPProxyTimeout(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) + serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -307,14 +310,16 @@ func TestMultiPortProxy(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - svcInfoP, err := p.addServiceOnPort(serviceP, "TCP", 0, time.Second) + serviceRefP := api.ObjectReference{Name: serviceP.Name, Namespace: serviceP.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfoP, err := p.addServiceOnPort(serviceP, serviceRefP, "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } testEchoTCP(t, "127.0.0.1", svcInfoP.proxyPort) waitForNumProxyLoops(t, p, 1) - svcInfoQ, err := p.addServiceOnPort(serviceQ, "UDP", 0, time.Second) + serviceRefQ := api.ObjectReference{Name: serviceQ.Name, Namespace: serviceQ.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfoQ, err := p.addServiceOnPort(serviceQ, serviceRefQ, "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -397,7 +402,8 @@ func TestTCPProxyStop(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) + serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -441,7 +447,8 @@ func TestUDPProxyStop(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) + serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -479,7 +486,8 @@ func TestTCPProxyUpdateDelete(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) + serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -516,7 +524,8 @@ func TestUDPProxyUpdateDelete(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) + serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -552,7 +561,8 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) + serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -605,7 +615,8 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) + serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -659,7 +670,8 @@ func TestTCPProxyUpdatePort(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) + serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -707,7 +719,8 @@ func TestUDPProxyUpdatePort(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) + serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -752,7 +765,8 @@ func TestProxyUpdatePublicIPs(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) + serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -803,7 +817,8 @@ func TestProxyUpdatePortal(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) + serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } From 5447db30488b752c4fea469c29caeb349a8c1a4e Mon Sep 17 00:00:00 2001 From: Benjamin Bennett Date: Fri, 22 Jul 2016 14:26:36 -0400 Subject: [PATCH 6/6] Userspace proxy should remove conntrack entries This changes the userspace proxy so that it cleans up its conntrack settings when a service is removed (as the iptables proxy already does). This could theoretically cause problems when a UDP service as deleted and recreated quickly (with the same IP address). As long as packets from the same UDP source IP and port were going to the same destination IP and port, the the conntrack would apply and the packets would be sent to the old destination. This is astronomically unlikely if you did not specify the IP address to use in the service, and even then, only happens with an "established" UDP connection. However, in cases where a service could be "switched" between using the iptables proxy and the userspace proxy, this case becomes much more frequent. --- cmd/kube-proxy/app/server.go | 1 + pkg/proxy/BUILD | 1 + pkg/proxy/iptables/BUILD | 1 + pkg/proxy/iptables/proxier.go | 35 +------- pkg/proxy/iptables/proxier_test.go | 99 ----------------------- pkg/proxy/userspace/BUILD | 4 + pkg/proxy/userspace/proxier.go | 24 ++++-- pkg/proxy/userspace/proxier_test.go | 75 +++++++++++++---- pkg/proxy/util/BUILD | 40 ++++++++++ pkg/proxy/util/conntrack.go | 58 ++++++++++++++ pkg/proxy/util/conntrack_test.go | 120 ++++++++++++++++++++++++++++ test/test_owners.csv | 1 + 12 files changed, 309 insertions(+), 150 deletions(-) create mode 100644 pkg/proxy/util/BUILD create mode 100644 pkg/proxy/util/conntrack.go create mode 100644 pkg/proxy/util/conntrack_test.go diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index da6727d8e5b..f934aca6eea 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -272,6 +272,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err loadBalancer, net.ParseIP(config.BindAddress), iptInterface, + execer, *utilnet.ParsePortRangeOrDie(config.PortRange), config.IPTablesSyncPeriod.Duration, config.IPTablesMinSyncPeriod.Duration, diff --git a/pkg/proxy/BUILD b/pkg/proxy/BUILD index 2c366011efa..a21f2bd8d55 100644 --- a/pkg/proxy/BUILD +++ b/pkg/proxy/BUILD @@ -35,6 +35,7 @@ filegroup( "//pkg/proxy/healthcheck:all-srcs", "//pkg/proxy/iptables:all-srcs", "//pkg/proxy/userspace:all-srcs", + "//pkg/proxy/util:all-srcs", "//pkg/proxy/winuserspace:all-srcs", ], tags = ["automanaged"], diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index 821b14da8a3..4b64995719f 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -18,6 +18,7 @@ go_library( "//pkg/features:go_default_library", "//pkg/proxy:go_default_library", "//pkg/proxy/healthcheck:go_default_library", + "//pkg/proxy/util:go_default_library", "//pkg/util/exec:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/sysctl:go_default_library", diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 6208dc00128..1d2bfe8ceee 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -46,6 +46,7 @@ import ( "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/healthcheck" + utilproxy "k8s.io/kubernetes/pkg/proxy/util" utilexec "k8s.io/kubernetes/pkg/util/exec" utiliptables "k8s.io/kubernetes/pkg/util/iptables" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" @@ -552,7 +553,7 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { glog.V(4).Infof("Skipping proxy iptables rule sync on service update because nothing changed") } - proxier.deleteServiceConnections(staleUDPServices.List()) + utilproxy.DeleteServiceConnections(proxier.exec, staleUDPServices.List()) } // Reconstruct the list of endpoint infos from the endpointIP list @@ -792,7 +793,7 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServ if svcInfo, ok := proxier.serviceMap[epSvcPair.servicePortName]; ok && svcInfo.protocol == api.ProtocolUDP { endpointIP := strings.Split(epSvcPair.endpoint, ":")[0] glog.V(2).Infof("Deleting connection tracking state for service IP %s, endpoint IP %s", svcInfo.clusterIP.String(), endpointIP) - err := proxier.execConntrackTool("-D", "--orig-dst", svcInfo.clusterIP.String(), "--dst-nat", endpointIP, "-p", "udp") + err := utilproxy.ExecConntrackTool(proxier.exec, "-D", "--orig-dst", svcInfo.clusterIP.String(), "--dst-nat", endpointIP, "-p", "udp") if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it @@ -803,33 +804,6 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServ } } -// deleteServiceConnection use conntrack-tool to delete UDP connection specified by service ip -func (proxier *Proxier) deleteServiceConnections(svcIPs []string) { - for _, ip := range svcIPs { - glog.V(2).Infof("Deleting connection tracking state for service IP %s", ip) - err := proxier.execConntrackTool("-D", "--orig-dst", ip, "-p", "udp") - if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { - // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. - // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it - // is expensive to baby sit all udp connections to kubernetes services. - glog.Errorf("conntrack return with error: %v", err) - } - } -} - -//execConntrackTool executes conntrack tool using given parameters -func (proxier *Proxier) execConntrackTool(parameters ...string) error { - conntrackPath, err := proxier.exec.LookPath("conntrack") - if err != nil { - return fmt.Errorf("Error looking for path of conntrack: %v", err) - } - output, err := proxier.exec.Command(conntrackPath, parameters...).CombinedOutput() - if err != nil { - return fmt.Errorf("Conntrack command returned: %q, error message: %s", string(output), err) - } - return nil -} - // This is where all of the iptables-save/restore calls happen. // The only other iptables rules are those that are setup in iptablesInit() // assumes proxier.mu is held @@ -1392,10 +1366,9 @@ func (proxier *Proxier) syncProxyRules() { // https://github.com/docker/docker/issues/8795 // https://github.com/kubernetes/kubernetes/issues/31983 func (proxier *Proxier) clearUdpConntrackForPort(port int) { - var err error = nil glog.V(2).Infof("Deleting conntrack entries for udp connections") if port > 0 { - err = proxier.execConntrackTool("-D", "-p", "udp", "--dport", strconv.Itoa(port)) + err := utilproxy.ExecConntrackTool(proxier.exec, "-D", "-p", "udp", "--dport", strconv.Itoa(port)) if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { glog.Errorf("conntrack return with error: %v", err) } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 7fbc27c1d42..6fb4532174c 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -172,57 +172,6 @@ func TestGetChainLinesMultipleTables(t *testing.T) { checkAllLines(t, utiliptables.TableNAT, []byte(iptables_save), expected) } -func TestExecConntrackTool(t *testing.T) { - fcmd := exec.FakeCmd{ - CombinedOutputScript: []exec.FakeCombinedOutputAction{ - func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, - func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, - func() ([]byte, error) { - return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.") - }, - }, - } - fexec := exec.FakeExec{ - CommandScript: []exec.FakeCommandAction{ - func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, - }, - LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, - } - - fakeProxier := Proxier{exec: &fexec} - - testCases := [][]string{ - {"-L", "-p", "udp"}, - {"-D", "-p", "udp", "-d", "10.0.240.1"}, - {"-D", "-p", "udp", "--orig-dst", "10.240.0.2", "--dst-nat", "10.0.10.2"}, - } - - expectErr := []bool{false, false, true} - - for i := range testCases { - err := fakeProxier.execConntrackTool(testCases[i]...) - - if expectErr[i] { - if err == nil { - t.Errorf("expected err, got %v", err) - } - } else { - if err != nil { - t.Errorf("expected success, got %v", err) - } - } - - execCmd := strings.Join(fcmd.CombinedOutputLog[i], " ") - expectCmd := fmt.Sprintf("%s %s", "conntrack", strings.Join(testCases[i], " ")) - - if execCmd != expectCmd { - t.Errorf("expect execute command: %s, but got: %s", expectCmd, execCmd) - } - } -} - func newFakeServiceInfo(service proxy.ServicePortName, ip net.IP, port int, protocol api.Protocol, onlyNodeLocalEndpoints bool) *serviceInfo { return &serviceInfo{ sessionAffinityType: api.ServiceAffinityNone, // default @@ -296,54 +245,6 @@ func TestDeleteEndpointConnections(t *testing.T) { } } -func TestDeleteServiceConnections(t *testing.T) { - fcmd := exec.FakeCmd{ - CombinedOutputScript: []exec.FakeCombinedOutputAction{ - func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, - func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, - func() ([]byte, error) { - return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.") - }, - }, - } - fexec := exec.FakeExec{ - CommandScript: []exec.FakeCommandAction{ - func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, - }, - LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, - } - - fakeProxier := Proxier{exec: &fexec} - - testCases := [][]string{ - { - "10.240.0.3", - "10.240.0.5", - }, - { - "10.240.0.4", - }, - } - - svcCount := 0 - for i := range testCases { - fakeProxier.deleteServiceConnections(testCases[i]) - for _, ip := range testCases[i] { - expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", ip) - execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ") - if expectCommand != execCommand { - t.Errorf("Exepect comand: %s, but executed %s", expectCommand, execCommand) - } - svcCount += 1 - } - if svcCount != fexec.CommandCalls { - t.Errorf("Exepect comand executed %d times, but got %d", svcCount, fexec.CommandCalls) - } - } -} - type fakeClosable struct { closed bool } diff --git a/pkg/proxy/userspace/BUILD b/pkg/proxy/userspace/BUILD index 2ed2504f08c..d58780bc6fd 100644 --- a/pkg/proxy/userspace/BUILD +++ b/pkg/proxy/userspace/BUILD @@ -23,6 +23,8 @@ go_library( deps = [ "//pkg/api:go_default_library", "//pkg/proxy:go_default_library", + "//pkg/proxy/util:go_default_library", + "//pkg/util/exec:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/slice:go_default_library", "//vendor:github.com/golang/glog", @@ -30,6 +32,7 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/util/errors", "//vendor:k8s.io/apimachinery/pkg/util/net", "//vendor:k8s.io/apimachinery/pkg/util/runtime", + "//vendor:k8s.io/apimachinery/pkg/util/sets", "//vendor:k8s.io/apimachinery/pkg/util/wait", ], ) @@ -46,6 +49,7 @@ go_test( deps = [ "//pkg/api:go_default_library", "//pkg/proxy:go_default_library", + "//pkg/util/exec:go_default_library", "//pkg/util/iptables/testing:go_default_library", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/types", diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 7b375739b8c..79b9b61a76d 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -33,6 +33,9 @@ import ( utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + utilproxy "k8s.io/kubernetes/pkg/proxy/util" + utilexec "k8s.io/kubernetes/pkg/util/exec" "k8s.io/kubernetes/pkg/util/iptables" ) @@ -106,6 +109,7 @@ type Proxier struct { hostIP net.IP proxyPorts PortAllocator makeProxySocket ProxySocketFunc + exec utilexec.Interface } // assert Proxier is a ProxyProvider @@ -150,15 +154,15 @@ func IsProxyLocked(err error) bool { // if iptables fails to update or acquire the initial lock. Once a proxier is // created, it will keep iptables up to date in the background and will not // terminate if a particular iptables call fails. -func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) { - return NewCustomProxier(loadBalancer, listenIP, iptables, pr, syncPeriod, minSyncPeriod, udpIdleTimeout, newProxySocket) +func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, exec utilexec.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) { + return NewCustomProxier(loadBalancer, listenIP, iptables, exec, pr, syncPeriod, minSyncPeriod, udpIdleTimeout, newProxySocket) } // NewCustomProxier functions similarly to NewProxier, returing a new Proxier // for the given LoadBalancer and address. The new proxier is constructed using // the ProxySocket constructor provided, however, instead of constructing the // default ProxySockets. -func NewCustomProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration, makeProxySocket ProxySocketFunc) (*Proxier, error) { +func NewCustomProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, exec utilexec.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration, makeProxySocket ProxySocketFunc) (*Proxier, error) { if listenIP.Equal(localhostIPv4) || listenIP.Equal(localhostIPv6) { return nil, ErrProxyOnLocalhost } @@ -176,10 +180,10 @@ func NewCustomProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptab proxyPorts := newPortAllocator(pr) glog.V(2).Infof("Setting proxy IP to %v and initializing iptables", hostIP) - return createProxier(loadBalancer, listenIP, iptables, hostIP, proxyPorts, syncPeriod, minSyncPeriod, udpIdleTimeout, makeProxySocket) + return createProxier(loadBalancer, listenIP, iptables, exec, hostIP, proxyPorts, syncPeriod, minSyncPeriod, udpIdleTimeout, makeProxySocket) } -func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration, makeProxySocket ProxySocketFunc) (*Proxier, error) { +func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, exec utilexec.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration, makeProxySocket ProxySocketFunc) (*Proxier, error) { // convenient to pass nil for tests.. if proxyPorts == nil { proxyPorts = newPortAllocator(utilnet.PortRange{}) @@ -206,6 +210,7 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables hostIP: hostIP, proxyPorts: proxyPorts, makeProxySocket: makeProxySocket, + exec: exec, }, nil } @@ -469,11 +474,18 @@ func (proxier *Proxier) OnServiceUpdate(services []api.Service) { proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeMinutes) } } + + staleUDPServices := sets.NewString() proxier.mu.Lock() defer proxier.mu.Unlock() for name, info := range proxier.serviceMap { if !activeServices[name] { glog.V(1).Infof("Stopping service %q", name) + + if proxier.serviceMap[name].protocol == api.ProtocolUDP { + staleUDPServices.Insert(proxier.serviceMap[name].portal.ip.String()) + } + err := proxier.closePortal(name, info) if err != nil { glog.Errorf("Failed to close portal for %q: %v", name, err) @@ -485,6 +497,8 @@ func (proxier *Proxier) OnServiceUpdate(services []api.Service) { proxier.loadBalancer.DeleteService(name) } } + + utilproxy.DeleteServiceConnections(proxier.exec, staleUDPServices.List()) } func sameConfig(info *ServiceInfo, service *api.Service, port *api.ServicePort) bool { diff --git a/pkg/proxy/userspace/proxier_test.go b/pkg/proxy/userspace/proxier_test.go index c5f327667f5..ed5080c2028 100644 --- a/pkg/proxy/userspace/proxier_test.go +++ b/pkg/proxy/userspace/proxier_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/proxy" + "k8s.io/kubernetes/pkg/util/exec" ipttest "k8s.io/kubernetes/pkg/util/iptables/testing" ) @@ -211,7 +212,9 @@ func TestTCPProxy(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -239,7 +242,9 @@ func TestUDPProxy(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -267,7 +272,9 @@ func TestUDPProxyTimeout(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -304,7 +311,9 @@ func TestMultiPortProxy(t *testing.T) { }}, }}) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -333,7 +342,9 @@ func TestMultiPortOnServiceUpdate(t *testing.T) { serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "q"} serviceX := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "x"} - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -396,7 +407,9 @@ func TestTCPProxyStop(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -441,7 +454,9 @@ func TestUDPProxyStop(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -480,7 +495,9 @@ func TestTCPProxyUpdateDelete(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -518,7 +535,9 @@ func TestUDPProxyUpdateDelete(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -555,7 +574,9 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { } lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -609,7 +630,9 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { } lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -664,7 +687,9 @@ func TestTCPProxyUpdatePort(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -713,7 +738,9 @@ func TestUDPProxyUpdatePort(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -759,7 +786,9 @@ func TestProxyUpdatePublicIPs(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -811,7 +840,9 @@ func TestProxyUpdatePortal(t *testing.T) { } lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -868,4 +899,18 @@ func TestProxyUpdatePortal(t *testing.T) { waitForNumProxyLoops(t, p, 1) } +func makeFakeExec() *exec.FakeExec { + fcmd := exec.FakeCmd{ + CombinedOutputScript: []exec.FakeCombinedOutputAction{ + func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, + }, + } + return &exec.FakeExec{ + CommandScript: []exec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, + } +} + // TODO(justinsb): Add test for nodePort conflict detection, once we have nodePort wired in diff --git a/pkg/proxy/util/BUILD b/pkg/proxy/util/BUILD new file mode 100644 index 00000000000..24e16ffb3cf --- /dev/null +++ b/pkg/proxy/util/BUILD @@ -0,0 +1,40 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_library( + name = "go_default_library", + srcs = ["conntrack.go"], + tags = ["automanaged"], + deps = [ + "//pkg/util/exec:go_default_library", + "//vendor:github.com/golang/glog", + ], +) + +go_test( + name = "go_default_test", + srcs = ["conntrack_test.go"], + library = ":go_default_library", + tags = ["automanaged"], + deps = ["//pkg/util/exec:go_default_library"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/pkg/proxy/util/conntrack.go b/pkg/proxy/util/conntrack.go new file mode 100644 index 00000000000..436045ecb30 --- /dev/null +++ b/pkg/proxy/util/conntrack.go @@ -0,0 +1,58 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + "strings" + + "k8s.io/kubernetes/pkg/util/exec" + + "github.com/golang/glog" +) + +// Utilities for dealing with conntrack + +const noConnectionToDelete = "0 flow entries have been deleted" + +// DeleteServiceConnection uses the conntrack tool to delete the conntrack entries +// for the UDP connections specified by the given service IPs +func DeleteServiceConnections(execer exec.Interface, svcIPs []string) { + for _, ip := range svcIPs { + glog.V(2).Infof("Deleting connection tracking state for service IP %s", ip) + err := ExecConntrackTool(execer, "-D", "--orig-dst", ip, "-p", "udp") + if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { + // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. + // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it + // is expensive to baby-sit all udp connections to kubernetes services. + glog.Errorf("conntrack returned error: %v", err) + } + } +} + +// ExecConntrackTool executes the conntrack tool using the given parameters +func ExecConntrackTool(execer exec.Interface, parameters ...string) error { + conntrackPath, err := execer.LookPath("conntrack") + if err != nil { + return fmt.Errorf("error looking for path of conntrack: %v", err) + } + output, err := execer.Command(conntrackPath, parameters...).CombinedOutput() + if err != nil { + return fmt.Errorf("conntrack command returned: %q, error message: %s", string(output), err) + } + return nil +} diff --git a/pkg/proxy/util/conntrack_test.go b/pkg/proxy/util/conntrack_test.go new file mode 100644 index 00000000000..05729f51e3c --- /dev/null +++ b/pkg/proxy/util/conntrack_test.go @@ -0,0 +1,120 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + "strings" + "testing" + + "k8s.io/kubernetes/pkg/util/exec" +) + +func TestExecConntrackTool(t *testing.T) { + fcmd := exec.FakeCmd{ + CombinedOutputScript: []exec.FakeCombinedOutputAction{ + func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, + func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, + func() ([]byte, error) { + return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.") + }, + }, + } + fexec := exec.FakeExec{ + CommandScript: []exec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, + } + + testCases := [][]string{ + {"-L", "-p", "udp"}, + {"-D", "-p", "udp", "-d", "10.0.240.1"}, + {"-D", "-p", "udp", "--orig-dst", "10.240.0.2", "--dst-nat", "10.0.10.2"}, + } + + expectErr := []bool{false, false, true} + + for i := range testCases { + err := ExecConntrackTool(&fexec, testCases[i]...) + + if expectErr[i] { + if err == nil { + t.Errorf("expected err, got %v", err) + } + } else { + if err != nil { + t.Errorf("expected success, got %v", err) + } + } + + execCmd := strings.Join(fcmd.CombinedOutputLog[i], " ") + expectCmd := fmt.Sprintf("%s %s", "conntrack", strings.Join(testCases[i], " ")) + + if execCmd != expectCmd { + t.Errorf("expect execute command: %s, but got: %s", expectCmd, execCmd) + } + } +} + +func TestDeleteServiceConnections(t *testing.T) { + fcmd := exec.FakeCmd{ + CombinedOutputScript: []exec.FakeCombinedOutputAction{ + func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, + func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, + func() ([]byte, error) { + return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.") + }, + }, + } + fexec := exec.FakeExec{ + CommandScript: []exec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, + } + + testCases := [][]string{ + { + "10.240.0.3", + "10.240.0.5", + }, + { + "10.240.0.4", + }, + } + + svcCount := 0 + for i := range testCases { + DeleteServiceConnections(&fexec, testCases[i]) + for _, ip := range testCases[i] { + expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", ip) + execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ") + if expectCommand != execCommand { + t.Errorf("Exepect comand: %s, but executed %s", expectCommand, execCommand) + } + svcCount += 1 + } + if svcCount != fexec.CommandCalls { + t.Errorf("Exepect comand executed %d times, but got %d", svcCount, fexec.CommandCalls) + } + } +} diff --git a/test/test_owners.csv b/test/test_owners.csv index 4f82a186a9b..d7a7bdcedee 100644 --- a/test/test_owners.csv +++ b/test/test_owners.csv @@ -768,6 +768,7 @@ k8s.io/kubernetes/pkg/proxy/config,ixdy,1, k8s.io/kubernetes/pkg/proxy/healthcheck,rrati,0, k8s.io/kubernetes/pkg/proxy/iptables,freehan,0, k8s.io/kubernetes/pkg/proxy/userspace,luxas,1, +k8s.io/kubernetes/pkg/proxy/util,knobunc,0, k8s.io/kubernetes/pkg/proxy/winuserspace,jbhurat,0, k8s.io/kubernetes/pkg/quota,sttts,1, k8s.io/kubernetes/pkg/quota/evaluator/core,yifan-gu,1,