Merge pull request #12920 from jiangyaoguo/test-udp-timeout

Test UDP timeout
This commit is contained in:
Wojciech Tyczynski 2015-08-20 09:34:10 +02:00
commit ff8dbfe889
3 changed files with 50 additions and 4 deletions

View File

@ -45,6 +45,7 @@ type serviceInfo struct {
proxyPort int
socket proxySocket
timeout time.Duration
activeClients *clientCache
nodePort int
loadBalancerStatus api.LoadBalancerStatus
sessionAffinityType api.ServiceAffinity
@ -257,6 +258,7 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol
protocol: protocol,
socket: sock,
timeout: timeout,
activeClients: newClientCache(),
sessionAffinityType: api.ServiceAffinityNone, // default
stickyMaxAgeMinutes: 180, // TODO: parameterize this in the API.
}

View File

@ -210,6 +210,22 @@ func waitForNumProxyLoops(t *testing.T, p *Proxier, want int32) {
t.Errorf("expected %d ProxyLoops running, got %d", want, got)
}
func waitForNumProxyClients(t *testing.T, s *serviceInfo, want int, timeout time.Duration) {
var got int
now := time.Now()
deadline := now.Add(timeout)
for time.Now().Before(deadline) {
s.activeClients.mu.Lock()
got = len(s.activeClients.clients)
s.activeClients.mu.Unlock()
if got == want {
return
}
time.Sleep(500 * time.Millisecond)
}
t.Errorf("expected %d ProxyClients live, got %d", want, got)
}
func TestTCPProxy(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
@ -264,6 +280,37 @@ func TestUDPProxy(t *testing.T) {
waitForNumProxyLoops(t, p, 1)
}
func TestUDPProxyTimeout(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
lb.OnEndpointsUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}},
}},
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
waitForNumProxyLoops(t, p, 1)
testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort)
// When connecting to a UDP service endpoint, there shoule be a Conn for proxy.
waitForNumProxyClients(t, svcInfo, 1, time.Second)
// If conn has no activity for serviceInfo.timeout since last Read/Write, it shoule be closed because of timeout.
waitForNumProxyClients(t, svcInfo, 0, 2*time.Second)
}
func TestMultiPortProxy(t *testing.T) {
lb := NewLoadBalancerRR()
serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-p"}, Port: "p"}
@ -826,6 +873,4 @@ func TestProxyUpdatePortal(t *testing.T) {
waitForNumProxyLoops(t, p, 1)
}
// TODO: Test UDP timeouts.
// TODO(justinsb): Add test for nodePort conflict detection, once we have nodePort wired in

View File

@ -192,7 +192,6 @@ func newClientCache() *clientCache {
}
func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) {
activeClients := newClientCache()
var buffer [4096]byte // 4KiB should be enough for most whole-packets
for {
if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo {
@ -214,7 +213,7 @@ func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serv
break
}
// If this is a client we know already, reuse the connection and goroutine.
svrConn, err := udp.getBackendConn(activeClients, cliAddr, proxier, service, myInfo.timeout)
svrConn, err := udp.getBackendConn(myInfo.activeClients, cliAddr, proxier, service, myInfo.timeout)
if err != nil {
continue
}