diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index d08276769a5..d888570f212 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -45,6 +45,7 @@ type serviceInfo struct { proxyPort int socket proxySocket timeout time.Duration + activeClients *clientCache nodePort int loadBalancerStatus api.LoadBalancerStatus sessionAffinityType api.ServiceAffinity @@ -257,6 +258,7 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol protocol: protocol, socket: sock, timeout: timeout, + activeClients: newClientCache(), sessionAffinityType: api.ServiceAffinityNone, // default stickyMaxAgeMinutes: 180, // TODO: parameterize this in the API. } diff --git a/pkg/proxy/userspace/proxier_test.go b/pkg/proxy/userspace/proxier_test.go index 54b8e41d766..c36d6bdf0af 100644 --- a/pkg/proxy/userspace/proxier_test.go +++ b/pkg/proxy/userspace/proxier_test.go @@ -210,6 +210,22 @@ func waitForNumProxyLoops(t *testing.T, p *Proxier, want int32) { t.Errorf("expected %d ProxyLoops running, got %d", want, got) } +func waitForNumProxyClients(t *testing.T, s *serviceInfo, want int, timeout time.Duration) { + var got int + now := time.Now() + deadline := now.Add(timeout) + for time.Now().Before(deadline) { + s.activeClients.mu.Lock() + got = len(s.activeClients.clients) + s.activeClients.mu.Unlock() + if got == want { + return + } + time.Sleep(500 * time.Millisecond) + } + t.Errorf("expected %d ProxyClients live, got %d", want, got) +} + func TestTCPProxy(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} @@ -264,6 +280,37 @@ func TestUDPProxy(t *testing.T) { waitForNumProxyLoops(t, p, 1) } +func TestUDPProxyTimeout(t *testing.T) { + lb := NewLoadBalancerRR() + service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + lb.OnEndpointsUpdate([]api.Endpoints{ + { + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, + }}, + }, + }) + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute) + if err != nil { + t.Fatal(err) + } + waitForNumProxyLoops(t, p, 0) + + svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) + if err != nil { + t.Fatalf("error adding new service: %#v", err) + } + waitForNumProxyLoops(t, p, 1) + testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) + // When connecting to a UDP service endpoint, there shoule be a Conn for proxy. + waitForNumProxyClients(t, svcInfo, 1, time.Second) + // If conn has no activity for serviceInfo.timeout since last Read/Write, it shoule be closed because of timeout. + waitForNumProxyClients(t, svcInfo, 0, 2*time.Second) +} + func TestMultiPortProxy(t *testing.T) { lb := NewLoadBalancerRR() serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-p"}, Port: "p"} @@ -826,6 +873,4 @@ func TestProxyUpdatePortal(t *testing.T) { waitForNumProxyLoops(t, p, 1) } -// TODO: Test UDP timeouts. - // TODO(justinsb): Add test for nodePort conflict detection, once we have nodePort wired in diff --git a/pkg/proxy/userspace/proxysocket.go b/pkg/proxy/userspace/proxysocket.go index 94978f1c7bd..3f5a897dd8a 100644 --- a/pkg/proxy/userspace/proxysocket.go +++ b/pkg/proxy/userspace/proxysocket.go @@ -192,7 +192,6 @@ func newClientCache() *clientCache { } func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) { - activeClients := newClientCache() var buffer [4096]byte // 4KiB should be enough for most whole-packets for { if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo { @@ -214,7 +213,7 @@ func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serv break } // If this is a client we know already, reuse the connection and goroutine. - svrConn, err := udp.getBackendConn(activeClients, cliAddr, proxier, service, myInfo.timeout) + svrConn, err := udp.getBackendConn(myInfo.activeClients, cliAddr, proxier, service, myInfo.timeout) if err != nil { continue }