proxy/userspace: clean up and consolidate testcase setup

This commit is contained in:
Dan Williams 2020-09-02 12:02:42 -05:00
parent 1372bd94fe
commit 0cb5e55409

View File

@ -291,14 +291,38 @@ func startProxier(p *Proxier, t *testing.T) {
p.OnEndpointsSynced() p.OnEndpointsSynced()
} }
func newServiceObject(namespace, name, clusterIP string, ports []v1.ServicePort) (*v1.Service, []proxy.ServicePortName) {
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: name},
Spec: v1.ServiceSpec{
ClusterIP: clusterIP,
Ports: ports,
},
}
servicePorts := make([]proxy.ServicePortName, len(ports))
for i, port := range ports {
servicePorts[i] = proxy.ServicePortName{
NamespacedName: types.NamespacedName{
Namespace: namespace,
Name: name,
},
Port: port.Name,
}
}
return service, servicePorts
}
func TestTCPProxy(t *testing.T) { func TestTCPProxy(t *testing.T) {
service, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{{Name: "p", Port: 80, Protocol: "TCP"}})
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
servicePort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
lb.OnEndpointsAdd(&v1.Endpoints{ lb.OnEndpointsAdd(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: servicePort.Name, Namespace: servicePort.Namespace}, ObjectMeta: service.ObjectMeta,
Subsets: []v1.EndpointSubset{{ Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}}, Ports: []v1.EndpointPort{{Name: ports[0].Port, Port: tcpServerPort}},
}}, }},
}) })
@ -311,25 +335,19 @@ func TestTCPProxy(t *testing.T) {
startProxier(p, t) startProxier(p, t)
defer p.shutdown() defer p.shutdown()
svcInfo := addServiceAndWaitForInfo(t, p, servicePort, &v1.Service{ svcInfo := addServiceAndWaitForInfo(t, p, ports[0], service)
ObjectMeta: metav1.ObjectMeta{Name: servicePort.Name, Namespace: servicePort.Namespace},
Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
Name: "p",
Port: 80,
Protocol: "TCP",
}}},
})
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
} }
func TestUDPProxy(t *testing.T) { func TestUDPProxy(t *testing.T) {
service, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{{Name: "p", Port: 80, Protocol: "UDP"}})
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
servicePort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
lb.OnEndpointsAdd(&v1.Endpoints{ lb.OnEndpointsAdd(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: servicePort.Name, Namespace: servicePort.Namespace}, ObjectMeta: service.ObjectMeta,
Subsets: []v1.EndpointSubset{{ Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}}, Ports: []v1.EndpointPort{{Name: ports[0].Port, Port: udpServerPort}},
}}, }},
}) })
@ -342,25 +360,19 @@ func TestUDPProxy(t *testing.T) {
startProxier(p, t) startProxier(p, t)
defer p.shutdown() defer p.shutdown()
svcInfo := addServiceAndWaitForInfo(t, p, servicePort, &v1.Service{ svcInfo := addServiceAndWaitForInfo(t, p, ports[0], service)
ObjectMeta: metav1.ObjectMeta{Name: servicePort.Name, Namespace: servicePort.Namespace},
Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
Name: "p",
Port: 80,
Protocol: "UDP",
}}},
})
testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort)
} }
func TestUDPProxyTimeout(t *testing.T) { func TestUDPProxyTimeout(t *testing.T) {
service, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{{Name: "p", Port: 80, Protocol: "UDP"}})
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
servicePort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
lb.OnEndpointsAdd(&v1.Endpoints{ lb.OnEndpointsAdd(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: servicePort.Name, Namespace: servicePort.Namespace}, ObjectMeta: service.ObjectMeta,
Subsets: []v1.EndpointSubset{{ Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}}, Ports: []v1.EndpointPort{{Name: ports[0].Port, Port: udpServerPort}},
}}, }},
}) })
@ -373,14 +385,7 @@ func TestUDPProxyTimeout(t *testing.T) {
startProxier(p, t) startProxier(p, t)
defer p.shutdown() defer p.shutdown()
svcInfo := addServiceAndWaitForInfo(t, p, servicePort, &v1.Service{ svcInfo := addServiceAndWaitForInfo(t, p, ports[0], service)
ObjectMeta: metav1.ObjectMeta{Name: servicePort.Name, Namespace: servicePort.Namespace},
Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
Name: "p",
Port: 80,
Protocol: "UDP",
}}},
})
testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort)
// When connecting to a UDP service endpoint, there should be a Conn for proxy. // When connecting to a UDP service endpoint, there should be a Conn for proxy.
waitForNumProxyClients(t, svcInfo, 1, time.Second) waitForNumProxyClients(t, svcInfo, 1, time.Second)
@ -389,21 +394,24 @@ func TestUDPProxyTimeout(t *testing.T) {
} }
func TestMultiPortProxy(t *testing.T) { func TestMultiPortProxy(t *testing.T) {
service, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{
{Name: "p", Port: 80, Protocol: "TCP"},
{Name: "q", Port: 80, Protocol: "UDP"},
})
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
servicePortP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-p"}, Port: "p"}
servicePortQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-q"}, Port: "q"}
lb.OnEndpointsAdd(&v1.Endpoints{ lb.OnEndpointsAdd(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: servicePortP.Name, Namespace: servicePortP.Namespace}, ObjectMeta: service.ObjectMeta,
Subsets: []v1.EndpointSubset{{ Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []v1.EndpointPort{{Name: "p", Protocol: "TCP", Port: tcpServerPort}}, Ports: []v1.EndpointPort{{Name: ports[0].Port, Protocol: service.Spec.Ports[0].Protocol, Port: tcpServerPort}},
}}, }},
}) })
lb.OnEndpointsAdd(&v1.Endpoints{ lb.OnEndpointsAdd(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: servicePortQ.Name, Namespace: servicePortQ.Namespace}, ObjectMeta: service.ObjectMeta,
Subsets: []v1.EndpointSubset{{ Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []v1.EndpointPort{{Name: "q", Protocol: "UDP", Port: udpServerPort}}, Ports: []v1.EndpointPort{{Name: ports[1].Port, Protocol: service.Spec.Ports[1].Protocol, Port: udpServerPort}},
}}, }},
}) })
@ -416,33 +424,20 @@ func TestMultiPortProxy(t *testing.T) {
startProxier(p, t) startProxier(p, t)
defer p.shutdown() defer p.shutdown()
svcInfo := addServiceAndWaitForInfo(t, p, servicePortP, &v1.Service{ svcInfo := addServiceAndWaitForInfo(t, p, ports[0], service)
ObjectMeta: metav1.ObjectMeta{Name: servicePortP.Name, Namespace: servicePortP.Namespace},
Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
Name: "p",
Port: 80,
Protocol: "TCP",
}}},
})
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
svcInfo = addServiceAndWaitForInfo(t, p, servicePortQ, &v1.Service{ svcInfo = waitForServiceInfo(t, p, ports[1], service)
ObjectMeta: metav1.ObjectMeta{Name: servicePortQ.Name, Namespace: servicePortQ.Namespace},
Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
Name: "q",
Port: 80,
Protocol: "UDP",
}}},
})
testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort)
} }
func TestMultiPortOnServiceAdd(t *testing.T) { func TestMultiPortOnServiceAdd(t *testing.T) {
lb := NewLoadBalancerRR() service, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{
serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} {Name: "p", Port: 80, Protocol: "TCP"},
serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "q"} {Name: "q", Port: 81, Protocol: "UDP"},
serviceX := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "x"} })
lb := NewLoadBalancerRR()
fexec := makeFakeExec() fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
@ -452,23 +447,12 @@ func TestMultiPortOnServiceAdd(t *testing.T) {
startProxier(p, t) startProxier(p, t)
defer p.shutdown() defer p.shutdown()
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
Name: "p",
Port: 80,
Protocol: "TCP",
}, {
Name: "q",
Port: 81,
Protocol: "UDP",
}}},
}
// ports p and q should exist // ports p and q should exist
_ = addServiceAndWaitForInfo(t, p, serviceP, service) _ = addServiceAndWaitForInfo(t, p, ports[0], service)
_ = waitForServiceInfo(t, p, serviceQ, service) _ = waitForServiceInfo(t, p, ports[1], service)
// non-existent port x should not exist // non-existent port x should not exist
serviceX := proxy.ServicePortName{NamespacedName: ports[0].NamespacedName, Port: "x"}
svcInfo, exists := p.getServiceInfo(serviceX) svcInfo, exists := p.getServiceInfo(serviceX)
if exists { if exists {
t.Fatalf("found unwanted serviceInfo for %s: %#v", serviceX, svcInfo) t.Fatalf("found unwanted serviceInfo for %s: %#v", serviceX, svcInfo)
@ -476,13 +460,14 @@ func TestMultiPortOnServiceAdd(t *testing.T) {
} }
func TestTCPProxyStop(t *testing.T) { func TestTCPProxyStop(t *testing.T) {
service, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{{Name: "p", Port: 80, Protocol: "TCP"}})
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
lb.OnEndpointsAdd(&v1.Endpoints{ lb.OnEndpointsAdd(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: serviceP.Namespace, Name: serviceP.Name}, ObjectMeta: service.ObjectMeta,
Subsets: []v1.EndpointSubset{{ Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}}, Ports: []v1.EndpointPort{{Name: ports[0].Port, Port: tcpServerPort}},
}}, }},
}) })
@ -495,15 +480,7 @@ func TestTCPProxyStop(t *testing.T) {
startProxier(p, t) startProxier(p, t)
defer p.shutdown() defer p.shutdown()
service := &v1.Service{ svcInfo := addServiceAndWaitForInfo(t, p, ports[0], service)
ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
Name: "p",
Port: 80,
Protocol: "TCP",
}}},
}
svcInfo := addServiceAndWaitForInfo(t, p, serviceP, service)
conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort)) conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort))
if err != nil { if err != nil {
t.Fatalf("error connecting to proxy: %v", err) t.Fatalf("error connecting to proxy: %v", err)
@ -515,13 +492,14 @@ func TestTCPProxyStop(t *testing.T) {
} }
func TestUDPProxyStop(t *testing.T) { func TestUDPProxyStop(t *testing.T) {
service, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{{Name: "p", Port: 80, Protocol: "UDP"}})
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
lb.OnEndpointsAdd(&v1.Endpoints{ lb.OnEndpointsAdd(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: serviceP.Namespace, Name: serviceP.Name}, ObjectMeta: service.ObjectMeta,
Subsets: []v1.EndpointSubset{{ Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}}, Ports: []v1.EndpointPort{{Name: ports[0].Port, Port: udpServerPort}},
}}, }},
}) })
@ -534,15 +512,7 @@ func TestUDPProxyStop(t *testing.T) {
startProxier(p, t) startProxier(p, t)
defer p.shutdown() defer p.shutdown()
service := &v1.Service{ svcInfo := addServiceAndWaitForInfo(t, p, ports[0], service)
ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
Name: "p",
Port: 80,
Protocol: "UDP",
}}},
}
svcInfo := addServiceAndWaitForInfo(t, p, serviceP, service)
conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort)) conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort))
if err != nil { if err != nil {
t.Fatalf("error connecting to proxy: %v", err) t.Fatalf("error connecting to proxy: %v", err)
@ -554,13 +524,14 @@ func TestUDPProxyStop(t *testing.T) {
} }
func TestTCPProxyUpdateDelete(t *testing.T) { func TestTCPProxyUpdateDelete(t *testing.T) {
service, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{{Name: "p", Port: 9997, Protocol: "TCP"}})
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
lb.OnEndpointsAdd(&v1.Endpoints{ lb.OnEndpointsAdd(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: serviceP.Namespace, Name: serviceP.Name}, ObjectMeta: service.ObjectMeta,
Subsets: []v1.EndpointSubset{{ Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}}, Ports: []v1.EndpointPort{{Name: ports[0].Port, Port: tcpServerPort}},
}}, }},
}) })
@ -573,16 +544,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
startProxier(p, t) startProxier(p, t)
defer p.shutdown() defer p.shutdown()
service := &v1.Service{ svcInfo := addServiceAndWaitForInfo(t, p, ports[0], service)
ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
Name: "p",
Port: 9997,
Protocol: "TCP",
}}},
}
svcInfo := addServiceAndWaitForInfo(t, p, serviceP, service)
conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort)) conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort))
if err != nil { if err != nil {
t.Fatalf("error connecting to proxy: %v", err) t.Fatalf("error connecting to proxy: %v", err)
@ -594,13 +556,14 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
} }
func TestUDPProxyUpdateDelete(t *testing.T) { func TestUDPProxyUpdateDelete(t *testing.T) {
service, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{{Name: "p", Port: 9997, Protocol: "UDP"}})
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
lb.OnEndpointsAdd(&v1.Endpoints{ lb.OnEndpointsAdd(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: serviceP.Namespace, Name: serviceP.Name}, ObjectMeta: service.ObjectMeta,
Subsets: []v1.EndpointSubset{{ Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}}, Ports: []v1.EndpointPort{{Name: ports[0].Port, Port: udpServerPort}},
}}, }},
}) })
@ -613,16 +576,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
startProxier(p, t) startProxier(p, t)
defer p.shutdown() defer p.shutdown()
service := &v1.Service{ svcInfo := addServiceAndWaitForInfo(t, p, ports[0], service)
ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
Name: "p",
Port: 9997,
Protocol: "UDP",
}}},
}
svcInfo := addServiceAndWaitForInfo(t, p, serviceP, service)
conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort)) conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort))
if err != nil { if err != nil {
t.Fatalf("error connecting to proxy: %v", err) t.Fatalf("error connecting to proxy: %v", err)
@ -634,13 +588,14 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
} }
func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
service, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{{Name: "p", Port: 9997, Protocol: "TCP"}})
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
endpoint := &v1.Endpoints{ endpoint := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, ObjectMeta: service.ObjectMeta,
Subsets: []v1.EndpointSubset{{ Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}}, Ports: []v1.EndpointPort{{Name: ports[0].Port, Port: tcpServerPort}},
}}, }},
} }
lb.OnEndpointsAdd(endpoint) lb.OnEndpointsAdd(endpoint)
@ -654,16 +609,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
startProxier(p, t) startProxier(p, t)
defer p.shutdown() defer p.shutdown()
service := &v1.Service{ svcInfo := addServiceAndWaitForInfo(t, p, ports[0], service)
ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
Name: "p",
Port: 9997,
Protocol: "TCP",
}}},
}
svcInfo := addServiceAndWaitForInfo(t, p, serviceP, service)
conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort)) conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort))
if err != nil { if err != nil {
t.Fatalf("error connecting to proxy: %v", err) t.Fatalf("error connecting to proxy: %v", err)
@ -675,18 +621,19 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
// need to add endpoint here because it got clean up during service delete // need to add endpoint here because it got clean up during service delete
lb.OnEndpointsAdd(endpoint) lb.OnEndpointsAdd(endpoint)
svcInfo = addServiceAndWaitForInfo(t, p, serviceP, service) svcInfo = addServiceAndWaitForInfo(t, p, ports[0], service)
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
} }
func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
service, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{{Name: "p", Port: 9997, Protocol: "UDP"}})
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
endpoint := &v1.Endpoints{ endpoint := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, ObjectMeta: service.ObjectMeta,
Subsets: []v1.EndpointSubset{{ Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}}, Ports: []v1.EndpointPort{{Name: ports[0].Port, Port: udpServerPort}},
}}, }},
} }
lb.OnEndpointsAdd(endpoint) lb.OnEndpointsAdd(endpoint)
@ -700,16 +647,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
startProxier(p, t) startProxier(p, t)
defer p.shutdown() defer p.shutdown()
service := &v1.Service{ svcInfo := addServiceAndWaitForInfo(t, p, ports[0], service)
ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
Name: "p",
Port: 9997,
Protocol: "UDP",
}}},
}
svcInfo := addServiceAndWaitForInfo(t, p, serviceP, service)
conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort)) conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort))
if err != nil { if err != nil {
t.Fatalf("error connecting to proxy: %v", err) t.Fatalf("error connecting to proxy: %v", err)
@ -721,18 +659,20 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
// need to add endpoint here because it got clean up during service delete // need to add endpoint here because it got clean up during service delete
lb.OnEndpointsAdd(endpoint) lb.OnEndpointsAdd(endpoint)
svcInfo = addServiceAndWaitForInfo(t, p, serviceP, service) svcInfo = addServiceAndWaitForInfo(t, p, ports[0], service)
testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort)
} }
func TestTCPProxyUpdatePort(t *testing.T) { func TestTCPProxyUpdatePort(t *testing.T) {
origPort := int32(99)
service, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{{Name: "p", Port: origPort, Protocol: "TCP"}})
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
lb.OnEndpointsAdd(&v1.Endpoints{ lb.OnEndpointsAdd(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, ObjectMeta: service.ObjectMeta,
Subsets: []v1.EndpointSubset{{ Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}}, Ports: []v1.EndpointPort{{Name: ports[0].Port, Port: tcpServerPort}},
}}, }},
}) })
@ -745,39 +685,32 @@ func TestTCPProxyUpdatePort(t *testing.T) {
startProxier(p, t) startProxier(p, t)
defer p.shutdown() defer p.shutdown()
origPort := 99 svcInfo := addServiceAndWaitForInfo(t, p, ports[0], service)
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
Name: "p",
Port: int32(origPort),
Protocol: "TCP",
}}},
}
svcInfo := addServiceAndWaitForInfo(t, p, serviceP, service)
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
service.Spec.Ports[0].Port = 100 newService := service.DeepCopy()
p.OnServiceAdd(service) newService.Spec.Ports[0].Port = 100
p.OnServiceUpdate(service, newService)
// Wait for the socket to actually get free. // Wait for the socket to actually get free.
if err := waitForClosedPortTCP(p, origPort); err != nil { if err := waitForClosedPortTCP(p, int(origPort)); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
waitForProxyFinished(t, svcInfo) waitForProxyFinished(t, svcInfo)
svcInfo = addServiceAndWaitForInfo(t, p, serviceP, service) svcInfo = waitForServiceInfo(t, p, ports[0], newService)
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
} }
func TestUDPProxyUpdatePort(t *testing.T) { func TestUDPProxyUpdatePort(t *testing.T) {
origPort := int32(99)
service, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{{Name: "p", Port: origPort, Protocol: "UDP"}})
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
lb.OnEndpointsAdd(&v1.Endpoints{ lb.OnEndpointsAdd(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, ObjectMeta: service.ObjectMeta,
Subsets: []v1.EndpointSubset{{ Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}}, Ports: []v1.EndpointPort{{Name: ports[0].Port, Port: udpServerPort}},
}}, }},
}) })
@ -790,39 +723,32 @@ func TestUDPProxyUpdatePort(t *testing.T) {
startProxier(p, t) startProxier(p, t)
defer p.shutdown() defer p.shutdown()
origPort := 99 svcInfo := addServiceAndWaitForInfo(t, p, ports[0], service)
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
Name: "p",
Port: int32(origPort),
Protocol: "UDP",
}}},
}
svcInfo := addServiceAndWaitForInfo(t, p, serviceP, service)
testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort)
service.Spec.Ports[0].Port = 100 newService := service.DeepCopy()
p.OnServiceAdd(service) newService.Spec.Ports[0].Port = 100
p.OnServiceUpdate(service, newService)
// Wait for the socket to actually get free. // Wait for the socket to actually get free.
if err := waitForClosedPortUDP(p, origPort); err != nil { if err := waitForClosedPortUDP(p, int(origPort)); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
waitForProxyFinished(t, svcInfo) waitForProxyFinished(t, svcInfo)
svcInfo = addServiceAndWaitForInfo(t, p, serviceP, service) svcInfo = waitForServiceInfo(t, p, ports[0], newService)
testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort)
} }
func TestProxyUpdatePublicIPs(t *testing.T) { func TestProxyUpdatePublicIPs(t *testing.T) {
origPort := int32(9997)
service, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{{Name: "p", Port: origPort, Protocol: "TCP"}})
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
lb.OnEndpointsAdd(&v1.Endpoints{ lb.OnEndpointsAdd(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, ObjectMeta: service.ObjectMeta,
Subsets: []v1.EndpointSubset{{ Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}}, Ports: []v1.EndpointPort{{Name: ports[0].Port, Port: tcpServerPort}},
}}, }},
}) })
@ -835,37 +761,32 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
startProxier(p, t) startProxier(p, t)
defer p.shutdown() defer p.shutdown()
origPort := 9997 svcInfo := addServiceAndWaitForInfo(t, p, ports[0], service)
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
Name: "p",
Port: int32(origPort),
Protocol: "TCP",
}}},
}
svcInfo := addServiceAndWaitForInfo(t, p, serviceP, service)
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
service.Spec.ExternalIPs = []string{"4.3.2.1"} newService := service.DeepCopy()
svcInfo = addServiceAndWaitForInfo(t, p, serviceP, service) newService.Spec.ExternalIPs = []string{"4.3.2.1"}
p.OnServiceUpdate(service, newService)
// Wait for the socket to actually get free. // Wait for the socket to actually get free.
if err := waitForClosedPortTCP(p, origPort); err != nil { if err := waitForClosedPortTCP(p, int(origPort)); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
waitForProxyFinished(t, svcInfo) waitForProxyFinished(t, svcInfo)
svcInfo = waitForServiceInfo(t, p, ports[0], newService)
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
} }
func TestProxyUpdatePortal(t *testing.T) { func TestProxyUpdatePortal(t *testing.T) {
svcv0, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{{Name: "p", Port: 9997, Protocol: "TCP"}})
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
servicePort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
endpoint := &v1.Endpoints{ endpoint := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: servicePort.Name, Namespace: servicePort.Namespace}, ObjectMeta: svcv0.ObjectMeta,
Subsets: []v1.EndpointSubset{{ Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}}, Ports: []v1.EndpointPort{{Name: ports[0].Port, Port: tcpServerPort}},
}}, }},
} }
lb.OnEndpointsAdd(endpoint) lb.OnEndpointsAdd(endpoint)
@ -879,30 +800,17 @@ func TestProxyUpdatePortal(t *testing.T) {
startProxier(p, t) startProxier(p, t)
defer p.shutdown() defer p.shutdown()
svcv0 := &v1.Service{ svcInfo := addServiceAndWaitForInfo(t, p, ports[0], svcv0)
ObjectMeta: metav1.ObjectMeta{Name: servicePort.Name, Namespace: servicePort.Namespace},
Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
Name: "p",
Port: 9997,
Protocol: "TCP",
}}},
}
svcInfo := addServiceAndWaitForInfo(t, p, servicePort, svcv0)
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
svcv1 := &v1.Service{ svcv1 := svcv0.DeepCopy()
ObjectMeta: svcv0.ObjectMeta, svcv1.Spec.ClusterIP = ""
Spec: v1.ServiceSpec{ClusterIP: "", Ports: []v1.ServicePort{
svcv0.Spec.Ports[0],
}},
}
p.OnServiceUpdate(svcv0, svcv1) p.OnServiceUpdate(svcv0, svcv1)
// Wait for the service to be removed because it had an empty ClusterIP // Wait for the service to be removed because it had an empty ClusterIP
var exists bool var exists bool
for i := 0; i < 50; i++ { for i := 0; i < 50; i++ {
_, exists = p.getServiceInfo(servicePort) _, exists = p.getServiceInfo(ports[0])
if !exists { if !exists {
break break
} }
@ -913,27 +821,18 @@ func TestProxyUpdatePortal(t *testing.T) {
} }
waitForProxyFinished(t, svcInfo) waitForProxyFinished(t, svcInfo)
svcv2 := &v1.Service{ svcv2 := svcv0.DeepCopy()
ObjectMeta: svcv0.ObjectMeta, svcv2.Spec.ClusterIP = "None"
Spec: v1.ServiceSpec{ClusterIP: "None", Ports: []v1.ServicePort{
svcv0.Spec.Ports[0],
}},
}
p.OnServiceUpdate(svcv1, svcv2) p.OnServiceUpdate(svcv1, svcv2)
_, exists = p.getServiceInfo(servicePort) _, exists = p.getServiceInfo(ports[0])
if exists { if exists {
t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy") t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy")
} }
svcv3 := &v1.Service{ // Set the ClusterIP again and make sure the proxy opens the port
ObjectMeta: svcv0.ObjectMeta,
Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{
svcv0.Spec.Ports[0],
}},
}
lb.OnEndpointsAdd(endpoint) lb.OnEndpointsAdd(endpoint)
p.OnServiceUpdate(svcv2, svcv3) p.OnServiceUpdate(svcv2, svcv0)
svcInfo = waitForServiceInfo(t, p, servicePort, svcv3) svcInfo = waitForServiceInfo(t, p, ports[0], svcv0)
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
} }