proxy/userspace: add proxy shutdown function and use in testcases

If a testcase does time out and 'go test' prints the call stack,
make sure everything from previous tests is cleaned up so the call
stack is easier to understand.
This commit is contained in:
Dan Williams 2019-03-31 14:41:11 -05:00
parent 4b07f80d20
commit ddab79a233
2 changed files with 39 additions and 3 deletions

View File

@ -109,6 +109,8 @@ type Proxier struct {
proxyPorts PortAllocator
makeProxySocket ProxySocketFunc
exec utilexec.Interface
stopChan chan struct{}
}
// assert Proxier is a ProxyProvider
@ -216,6 +218,7 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables
proxyPorts: proxyPorts,
makeProxySocket: makeProxySocket,
exec: exec,
stopChan: make(chan struct{}),
}, nil
}
@ -287,6 +290,20 @@ func CleanupLeftovers(ipt iptables.Interface) (encounteredError bool) {
return encounteredError
}
// shutdown closes all service port proxies and returns from the proxy's
// sync loop. Used from testcases.
func (proxier *Proxier) shutdown() {
defer proxier.cleanupStaleStickySessions()
proxier.mu.Lock()
defer proxier.mu.Unlock()
for serviceName, info := range proxier.serviceMap {
proxier.stopProxyInternal(serviceName, info)
}
close(proxier.stopChan)
}
// Sync is called to immediately synchronize the proxier state to iptables
func (proxier *Proxier) Sync() {
if err := iptablesInit(proxier.iptables); err != nil {
@ -301,9 +318,13 @@ func (proxier *Proxier) SyncLoop() {
t := time.NewTicker(proxier.syncPeriod)
defer t.Stop()
for {
<-t.C
klog.V(6).Infof("Periodic sync")
proxier.Sync()
select {
case <-t.C:
klog.V(6).Infof("Periodic sync")
proxier.Sync()
case <-proxier.stopChan:
return
}
}
}

View File

@ -243,6 +243,7 @@ func TestTCPProxy(t *testing.T) {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil {
@ -270,6 +271,7 @@ func TestUDPProxy(t *testing.T) {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
if err != nil {
@ -297,6 +299,7 @@ func TestUDPProxyTimeout(t *testing.T) {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
if err != nil {
@ -336,6 +339,7 @@ func TestMultiPortProxy(t *testing.T) {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
defer p.shutdown()
svcInfoP, err := p.addServiceOnPort(serviceP, "TCP", 0, time.Second)
if err != nil {
@ -365,6 +369,7 @@ func TestMultiPortOnServiceAdd(t *testing.T) {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
defer p.shutdown()
p.OnServiceAdd(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
@ -428,6 +433,7 @@ func TestTCPProxyStop(t *testing.T) {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil {
@ -472,6 +478,7 @@ func TestUDPProxyStop(t *testing.T) {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
if err != nil {
@ -510,6 +517,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil {
@ -554,6 +562,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
if err != nil {
@ -599,6 +608,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil {
@ -661,6 +671,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
if err != nil {
@ -722,6 +733,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil {
@ -770,6 +782,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
if err != nil {
@ -815,6 +828,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil {
@ -868,6 +882,7 @@ func TestProxyUpdatePortal(t *testing.T) {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil {