diff --git a/pkg/proxy/userspace/proxier_test.go b/pkg/proxy/userspace/proxier_test.go index db032c4f7e4..6d0bb008130 100644 --- a/pkg/proxy/userspace/proxier_test.go +++ b/pkg/proxy/userspace/proxier_test.go @@ -291,14 +291,38 @@ func startProxier(p *Proxier, t *testing.T) { p.OnEndpointsSynced() } +func newServiceObject(namespace, name, clusterIP string, ports []v1.ServicePort) (*v1.Service, []proxy.ServicePortName) { + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: name}, + Spec: v1.ServiceSpec{ + ClusterIP: clusterIP, + Ports: ports, + }, + } + + servicePorts := make([]proxy.ServicePortName, len(ports)) + for i, port := range ports { + servicePorts[i] = proxy.ServicePortName{ + NamespacedName: types.NamespacedName{ + Namespace: namespace, + Name: name, + }, + Port: port.Name, + } + } + + return service, servicePorts +} + func TestTCPProxy(t *testing.T) { + service, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{{Name: "p", Port: 80, Protocol: "TCP"}}) + lb := NewLoadBalancerRR() - servicePort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} lb.OnEndpointsAdd(&v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Name: servicePort.Name, Namespace: servicePort.Namespace}, + ObjectMeta: service.ObjectMeta, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}}, + Ports: []v1.EndpointPort{{Name: ports[0].Port, Port: tcpServerPort}}, }}, }) @@ -311,25 +335,19 @@ func TestTCPProxy(t *testing.T) { startProxier(p, t) defer p.shutdown() - svcInfo := addServiceAndWaitForInfo(t, p, servicePort, &v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: servicePort.Name, Namespace: servicePort.Namespace}, - Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ - Name: "p", - Port: 80, - Protocol: "TCP", - }}}, - }) + svcInfo := addServiceAndWaitForInfo(t, p, ports[0], service) testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) } func TestUDPProxy(t *testing.T) { + service, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{{Name: "p", Port: 80, Protocol: "UDP"}}) + lb := NewLoadBalancerRR() - servicePort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} lb.OnEndpointsAdd(&v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Name: servicePort.Name, Namespace: servicePort.Namespace}, + ObjectMeta: service.ObjectMeta, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}}, + Ports: []v1.EndpointPort{{Name: ports[0].Port, Port: udpServerPort}}, }}, }) @@ -342,25 +360,19 @@ func TestUDPProxy(t *testing.T) { startProxier(p, t) defer p.shutdown() - svcInfo := addServiceAndWaitForInfo(t, p, servicePort, &v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: servicePort.Name, Namespace: servicePort.Namespace}, - Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ - Name: "p", - Port: 80, - Protocol: "UDP", - }}}, - }) + svcInfo := addServiceAndWaitForInfo(t, p, ports[0], service) testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) } func TestUDPProxyTimeout(t *testing.T) { + service, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{{Name: "p", Port: 80, Protocol: "UDP"}}) + lb := NewLoadBalancerRR() - servicePort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} lb.OnEndpointsAdd(&v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Name: servicePort.Name, Namespace: servicePort.Namespace}, + ObjectMeta: service.ObjectMeta, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}}, + Ports: []v1.EndpointPort{{Name: ports[0].Port, Port: udpServerPort}}, }}, }) @@ -373,14 +385,7 @@ func TestUDPProxyTimeout(t *testing.T) { startProxier(p, t) defer p.shutdown() - svcInfo := addServiceAndWaitForInfo(t, p, servicePort, &v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: servicePort.Name, Namespace: servicePort.Namespace}, - Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ - Name: "p", - Port: 80, - Protocol: "UDP", - }}}, - }) + svcInfo := addServiceAndWaitForInfo(t, p, ports[0], service) testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) // When connecting to a UDP service endpoint, there should be a Conn for proxy. waitForNumProxyClients(t, svcInfo, 1, time.Second) @@ -389,21 +394,24 @@ func TestUDPProxyTimeout(t *testing.T) { } func TestMultiPortProxy(t *testing.T) { + service, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{ + {Name: "p", Port: 80, Protocol: "TCP"}, + {Name: "q", Port: 80, Protocol: "UDP"}, + }) + lb := NewLoadBalancerRR() - servicePortP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-p"}, Port: "p"} - servicePortQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-q"}, Port: "q"} lb.OnEndpointsAdd(&v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Name: servicePortP.Name, Namespace: servicePortP.Namespace}, + ObjectMeta: service.ObjectMeta, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []v1.EndpointPort{{Name: "p", Protocol: "TCP", Port: tcpServerPort}}, + Ports: []v1.EndpointPort{{Name: ports[0].Port, Protocol: service.Spec.Ports[0].Protocol, Port: tcpServerPort}}, }}, }) lb.OnEndpointsAdd(&v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Name: servicePortQ.Name, Namespace: servicePortQ.Namespace}, + ObjectMeta: service.ObjectMeta, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []v1.EndpointPort{{Name: "q", Protocol: "UDP", Port: udpServerPort}}, + Ports: []v1.EndpointPort{{Name: ports[1].Port, Protocol: service.Spec.Ports[1].Protocol, Port: udpServerPort}}, }}, }) @@ -416,33 +424,20 @@ func TestMultiPortProxy(t *testing.T) { startProxier(p, t) defer p.shutdown() - svcInfo := addServiceAndWaitForInfo(t, p, servicePortP, &v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: servicePortP.Name, Namespace: servicePortP.Namespace}, - Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ - Name: "p", - Port: 80, - Protocol: "TCP", - }}}, - }) + svcInfo := addServiceAndWaitForInfo(t, p, ports[0], service) testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) - svcInfo = addServiceAndWaitForInfo(t, p, servicePortQ, &v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: servicePortQ.Name, Namespace: servicePortQ.Namespace}, - Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ - Name: "q", - Port: 80, - Protocol: "UDP", - }}}, - }) + svcInfo = waitForServiceInfo(t, p, ports[1], service) testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) } func TestMultiPortOnServiceAdd(t *testing.T) { - lb := NewLoadBalancerRR() - serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "q"} - serviceX := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "x"} + service, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{ + {Name: "p", Port: 80, Protocol: "TCP"}, + {Name: "q", Port: 81, Protocol: "UDP"}, + }) + lb := NewLoadBalancerRR() fexec := makeFakeExec() p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) @@ -452,23 +447,12 @@ func TestMultiPortOnServiceAdd(t *testing.T) { startProxier(p, t) defer p.shutdown() - service := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, - Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ - Name: "p", - Port: 80, - Protocol: "TCP", - }, { - Name: "q", - Port: 81, - Protocol: "UDP", - }}}, - } - // ports p and q should exist - _ = addServiceAndWaitForInfo(t, p, serviceP, service) - _ = waitForServiceInfo(t, p, serviceQ, service) + _ = addServiceAndWaitForInfo(t, p, ports[0], service) + _ = waitForServiceInfo(t, p, ports[1], service) + // non-existent port x should not exist + serviceX := proxy.ServicePortName{NamespacedName: ports[0].NamespacedName, Port: "x"} svcInfo, exists := p.getServiceInfo(serviceX) if exists { t.Fatalf("found unwanted serviceInfo for %s: %#v", serviceX, svcInfo) @@ -476,13 +460,14 @@ func TestMultiPortOnServiceAdd(t *testing.T) { } func TestTCPProxyStop(t *testing.T) { + service, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{{Name: "p", Port: 80, Protocol: "TCP"}}) + lb := NewLoadBalancerRR() - serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} lb.OnEndpointsAdd(&v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Namespace: serviceP.Namespace, Name: serviceP.Name}, + ObjectMeta: service.ObjectMeta, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}}, + Ports: []v1.EndpointPort{{Name: ports[0].Port, Port: tcpServerPort}}, }}, }) @@ -495,15 +480,7 @@ func TestTCPProxyStop(t *testing.T) { startProxier(p, t) defer p.shutdown() - service := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, - Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ - Name: "p", - Port: 80, - Protocol: "TCP", - }}}, - } - svcInfo := addServiceAndWaitForInfo(t, p, serviceP, service) + svcInfo := addServiceAndWaitForInfo(t, p, ports[0], service) conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort)) if err != nil { t.Fatalf("error connecting to proxy: %v", err) @@ -515,13 +492,14 @@ func TestTCPProxyStop(t *testing.T) { } func TestUDPProxyStop(t *testing.T) { + service, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{{Name: "p", Port: 80, Protocol: "UDP"}}) + lb := NewLoadBalancerRR() - serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} lb.OnEndpointsAdd(&v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Namespace: serviceP.Namespace, Name: serviceP.Name}, + ObjectMeta: service.ObjectMeta, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}}, + Ports: []v1.EndpointPort{{Name: ports[0].Port, Port: udpServerPort}}, }}, }) @@ -534,15 +512,7 @@ func TestUDPProxyStop(t *testing.T) { startProxier(p, t) defer p.shutdown() - service := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, - Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ - Name: "p", - Port: 80, - Protocol: "UDP", - }}}, - } - svcInfo := addServiceAndWaitForInfo(t, p, serviceP, service) + svcInfo := addServiceAndWaitForInfo(t, p, ports[0], service) conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort)) if err != nil { t.Fatalf("error connecting to proxy: %v", err) @@ -554,13 +524,14 @@ func TestUDPProxyStop(t *testing.T) { } func TestTCPProxyUpdateDelete(t *testing.T) { + service, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{{Name: "p", Port: 9997, Protocol: "TCP"}}) + lb := NewLoadBalancerRR() - serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} lb.OnEndpointsAdd(&v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Namespace: serviceP.Namespace, Name: serviceP.Name}, + ObjectMeta: service.ObjectMeta, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}}, + Ports: []v1.EndpointPort{{Name: ports[0].Port, Port: tcpServerPort}}, }}, }) @@ -573,16 +544,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) { startProxier(p, t) defer p.shutdown() - service := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, - Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ - Name: "p", - Port: 9997, - Protocol: "TCP", - }}}, - } - - svcInfo := addServiceAndWaitForInfo(t, p, serviceP, service) + svcInfo := addServiceAndWaitForInfo(t, p, ports[0], service) conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort)) if err != nil { t.Fatalf("error connecting to proxy: %v", err) @@ -594,13 +556,14 @@ func TestTCPProxyUpdateDelete(t *testing.T) { } func TestUDPProxyUpdateDelete(t *testing.T) { + service, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{{Name: "p", Port: 9997, Protocol: "UDP"}}) + lb := NewLoadBalancerRR() - serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} lb.OnEndpointsAdd(&v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Namespace: serviceP.Namespace, Name: serviceP.Name}, + ObjectMeta: service.ObjectMeta, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}}, + Ports: []v1.EndpointPort{{Name: ports[0].Port, Port: udpServerPort}}, }}, }) @@ -613,16 +576,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) { startProxier(p, t) defer p.shutdown() - service := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, - Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ - Name: "p", - Port: 9997, - Protocol: "UDP", - }}}, - } - - svcInfo := addServiceAndWaitForInfo(t, p, serviceP, service) + svcInfo := addServiceAndWaitForInfo(t, p, ports[0], service) conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort)) if err != nil { t.Fatalf("error connecting to proxy: %v", err) @@ -634,13 +588,14 @@ func TestUDPProxyUpdateDelete(t *testing.T) { } func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { + service, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{{Name: "p", Port: 9997, Protocol: "TCP"}}) + lb := NewLoadBalancerRR() - serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} endpoint := &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, + ObjectMeta: service.ObjectMeta, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}}, + Ports: []v1.EndpointPort{{Name: ports[0].Port, Port: tcpServerPort}}, }}, } lb.OnEndpointsAdd(endpoint) @@ -654,16 +609,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { startProxier(p, t) defer p.shutdown() - service := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, - Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ - Name: "p", - Port: 9997, - Protocol: "TCP", - }}}, - } - - svcInfo := addServiceAndWaitForInfo(t, p, serviceP, service) + svcInfo := addServiceAndWaitForInfo(t, p, ports[0], service) conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort)) if err != nil { t.Fatalf("error connecting to proxy: %v", err) @@ -675,18 +621,19 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { // need to add endpoint here because it got clean up during service delete lb.OnEndpointsAdd(endpoint) - svcInfo = addServiceAndWaitForInfo(t, p, serviceP, service) + svcInfo = addServiceAndWaitForInfo(t, p, ports[0], service) testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) } func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { + service, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{{Name: "p", Port: 9997, Protocol: "UDP"}}) + lb := NewLoadBalancerRR() - serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} endpoint := &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, + ObjectMeta: service.ObjectMeta, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}}, + Ports: []v1.EndpointPort{{Name: ports[0].Port, Port: udpServerPort}}, }}, } lb.OnEndpointsAdd(endpoint) @@ -700,16 +647,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { startProxier(p, t) defer p.shutdown() - service := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, - Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ - Name: "p", - Port: 9997, - Protocol: "UDP", - }}}, - } - - svcInfo := addServiceAndWaitForInfo(t, p, serviceP, service) + svcInfo := addServiceAndWaitForInfo(t, p, ports[0], service) conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort)) if err != nil { t.Fatalf("error connecting to proxy: %v", err) @@ -721,18 +659,20 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { // need to add endpoint here because it got clean up during service delete lb.OnEndpointsAdd(endpoint) - svcInfo = addServiceAndWaitForInfo(t, p, serviceP, service) + svcInfo = addServiceAndWaitForInfo(t, p, ports[0], service) testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) } func TestTCPProxyUpdatePort(t *testing.T) { + origPort := int32(99) + service, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{{Name: "p", Port: origPort, Protocol: "TCP"}}) + lb := NewLoadBalancerRR() - serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} lb.OnEndpointsAdd(&v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, + ObjectMeta: service.ObjectMeta, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}}, + Ports: []v1.EndpointPort{{Name: ports[0].Port, Port: tcpServerPort}}, }}, }) @@ -745,39 +685,32 @@ func TestTCPProxyUpdatePort(t *testing.T) { startProxier(p, t) defer p.shutdown() - origPort := 99 - service := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, - Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ - Name: "p", - Port: int32(origPort), - Protocol: "TCP", - }}}, - } - - svcInfo := addServiceAndWaitForInfo(t, p, serviceP, service) + svcInfo := addServiceAndWaitForInfo(t, p, ports[0], service) testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) - service.Spec.Ports[0].Port = 100 - p.OnServiceAdd(service) + newService := service.DeepCopy() + newService.Spec.Ports[0].Port = 100 + p.OnServiceUpdate(service, newService) // Wait for the socket to actually get free. - if err := waitForClosedPortTCP(p, origPort); err != nil { + if err := waitForClosedPortTCP(p, int(origPort)); err != nil { t.Fatalf(err.Error()) } waitForProxyFinished(t, svcInfo) - svcInfo = addServiceAndWaitForInfo(t, p, serviceP, service) + svcInfo = waitForServiceInfo(t, p, ports[0], newService) testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) } func TestUDPProxyUpdatePort(t *testing.T) { + origPort := int32(99) + service, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{{Name: "p", Port: origPort, Protocol: "UDP"}}) + lb := NewLoadBalancerRR() - serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} lb.OnEndpointsAdd(&v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, + ObjectMeta: service.ObjectMeta, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}}, + Ports: []v1.EndpointPort{{Name: ports[0].Port, Port: udpServerPort}}, }}, }) @@ -790,39 +723,32 @@ func TestUDPProxyUpdatePort(t *testing.T) { startProxier(p, t) defer p.shutdown() - origPort := 99 - service := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, - Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ - Name: "p", - Port: int32(origPort), - Protocol: "UDP", - }}}, - } - - svcInfo := addServiceAndWaitForInfo(t, p, serviceP, service) + svcInfo := addServiceAndWaitForInfo(t, p, ports[0], service) testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) - service.Spec.Ports[0].Port = 100 - p.OnServiceAdd(service) + newService := service.DeepCopy() + newService.Spec.Ports[0].Port = 100 + p.OnServiceUpdate(service, newService) // Wait for the socket to actually get free. - if err := waitForClosedPortUDP(p, origPort); err != nil { + if err := waitForClosedPortUDP(p, int(origPort)); err != nil { t.Fatalf(err.Error()) } waitForProxyFinished(t, svcInfo) - svcInfo = addServiceAndWaitForInfo(t, p, serviceP, service) + svcInfo = waitForServiceInfo(t, p, ports[0], newService) testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) } func TestProxyUpdatePublicIPs(t *testing.T) { + origPort := int32(9997) + service, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{{Name: "p", Port: origPort, Protocol: "TCP"}}) + lb := NewLoadBalancerRR() - serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} lb.OnEndpointsAdd(&v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, + ObjectMeta: service.ObjectMeta, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}}, + Ports: []v1.EndpointPort{{Name: ports[0].Port, Port: tcpServerPort}}, }}, }) @@ -835,37 +761,32 @@ func TestProxyUpdatePublicIPs(t *testing.T) { startProxier(p, t) defer p.shutdown() - origPort := 9997 - service := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, - Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ - Name: "p", - Port: int32(origPort), - Protocol: "TCP", - }}}, - } - - svcInfo := addServiceAndWaitForInfo(t, p, serviceP, service) + svcInfo := addServiceAndWaitForInfo(t, p, ports[0], service) testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) - service.Spec.ExternalIPs = []string{"4.3.2.1"} - svcInfo = addServiceAndWaitForInfo(t, p, serviceP, service) + newService := service.DeepCopy() + newService.Spec.ExternalIPs = []string{"4.3.2.1"} + p.OnServiceUpdate(service, newService) + // Wait for the socket to actually get free. - if err := waitForClosedPortTCP(p, origPort); err != nil { + if err := waitForClosedPortTCP(p, int(origPort)); err != nil { t.Fatalf(err.Error()) } waitForProxyFinished(t, svcInfo) + + svcInfo = waitForServiceInfo(t, p, ports[0], newService) testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) } func TestProxyUpdatePortal(t *testing.T) { + svcv0, ports := newServiceObject("testnamespace", "echo", "1.2.3.4", []v1.ServicePort{{Name: "p", Port: 9997, Protocol: "TCP"}}) + lb := NewLoadBalancerRR() - servicePort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} endpoint := &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Name: servicePort.Name, Namespace: servicePort.Namespace}, + ObjectMeta: svcv0.ObjectMeta, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}}, + Ports: []v1.EndpointPort{{Name: ports[0].Port, Port: tcpServerPort}}, }}, } lb.OnEndpointsAdd(endpoint) @@ -879,30 +800,17 @@ func TestProxyUpdatePortal(t *testing.T) { startProxier(p, t) defer p.shutdown() - svcv0 := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: servicePort.Name, Namespace: servicePort.Namespace}, - Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ - Name: "p", - Port: 9997, - Protocol: "TCP", - }}}, - } - - svcInfo := addServiceAndWaitForInfo(t, p, servicePort, svcv0) + svcInfo := addServiceAndWaitForInfo(t, p, ports[0], svcv0) testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) - svcv1 := &v1.Service{ - ObjectMeta: svcv0.ObjectMeta, - Spec: v1.ServiceSpec{ClusterIP: "", Ports: []v1.ServicePort{ - svcv0.Spec.Ports[0], - }}, - } + svcv1 := svcv0.DeepCopy() + svcv1.Spec.ClusterIP = "" p.OnServiceUpdate(svcv0, svcv1) // Wait for the service to be removed because it had an empty ClusterIP var exists bool for i := 0; i < 50; i++ { - _, exists = p.getServiceInfo(servicePort) + _, exists = p.getServiceInfo(ports[0]) if !exists { break } @@ -913,27 +821,18 @@ func TestProxyUpdatePortal(t *testing.T) { } waitForProxyFinished(t, svcInfo) - svcv2 := &v1.Service{ - ObjectMeta: svcv0.ObjectMeta, - Spec: v1.ServiceSpec{ClusterIP: "None", Ports: []v1.ServicePort{ - svcv0.Spec.Ports[0], - }}, - } + svcv2 := svcv0.DeepCopy() + svcv2.Spec.ClusterIP = "None" p.OnServiceUpdate(svcv1, svcv2) - _, exists = p.getServiceInfo(servicePort) + _, exists = p.getServiceInfo(ports[0]) if exists { t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy") } - svcv3 := &v1.Service{ - ObjectMeta: svcv0.ObjectMeta, - Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{ - svcv0.Spec.Ports[0], - }}, - } + // Set the ClusterIP again and make sure the proxy opens the port lb.OnEndpointsAdd(endpoint) - p.OnServiceUpdate(svcv2, svcv3) - svcInfo = waitForServiceInfo(t, p, servicePort, svcv3) + p.OnServiceUpdate(svcv2, svcv0) + svcInfo = waitForServiceInfo(t, p, ports[0], svcv0) testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) }