diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 71c04391955..71076129f9d 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -68,6 +68,29 @@ type ServiceInfo struct { stickyMaxAgeSeconds int // Deprecated, but required for back-compat (including e2e) externalIPs []string + + // isStartedAtomic is set to non-zero when the service's socket begins + // accepting requests. Used in testcases. Only access this with atomic ops. + isStartedAtomic int32 + // isFinishedAtomic is set to non-zero when the service's socket shuts + // down. Used in testcases. Only access this with atomic ops. + isFinishedAtomic int32 +} + +func (info *ServiceInfo) setStarted() { + atomic.StoreInt32(&info.isStartedAtomic, 1) +} + +func (info *ServiceInfo) IsStarted() bool { + return atomic.LoadInt32(&info.isStartedAtomic) != 0 +} + +func (info *ServiceInfo) setFinished() { + atomic.StoreInt32(&info.isFinishedAtomic, 1) +} + +func (info *ServiceInfo) IsFinished() bool { + return atomic.LoadInt32(&info.isFinishedAtomic) != 0 } func (info *ServiceInfo) setAlive(b bool) { @@ -124,7 +147,6 @@ type Proxier struct { udpIdleTimeout time.Duration portMapMutex sync.Mutex portMap map[portMapKey]*portMapValue - numProxyLoops int32 // use atomic ops to access this; mostly for testing listenIP net.IP iptables iptables.Interface hostIP net.IP @@ -427,14 +449,6 @@ func (proxier *Proxier) getServiceInfo(service proxy.ServicePortName) (*ServiceI return info, ok } -// addServiceOnPort lockes the proxy before calling addServiceOnPortInternal. -// Used from testcases. -func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol v1.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) { - proxier.mu.Lock() - defer proxier.mu.Unlock() - return proxier.addServiceOnPortInternal(service, protocol, proxyPort, timeout) -} - // addServiceOnPortInternal starts listening for a new service, returning the ServiceInfo. // Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP // connections, for now. @@ -465,12 +479,10 @@ func (proxier *Proxier) addServiceOnPortInternal(service proxy.ServicePortName, proxier.serviceMap[service] = si klog.V(2).Infof("Proxying for service %q on %s port %d", service, protocol, portNum) - go func(service proxy.ServicePortName, proxier *Proxier) { + go func() { defer runtime.HandleCrash() - atomic.AddInt32(&proxier.numProxyLoops, 1) sock.ProxyLoop(service, si, proxier.loadBalancer) - atomic.AddInt32(&proxier.numProxyLoops, -1) - }(service, proxier) + }() return si, nil } @@ -509,6 +521,7 @@ func (proxier *Proxier) mergeService(service *v1.Service) sets.String { if err := proxier.cleanupPortalAndProxy(serviceName, info); err != nil { klog.Error(err) } + info.setFinished() } proxyPort, err := proxier.proxyPorts.AllocateNext() if err != nil { @@ -541,6 +554,8 @@ func (proxier *Proxier) mergeService(service *v1.Service) sets.String { klog.Errorf("Failed to open portal for %q: %v", serviceName, err) } proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeSeconds) + + info.setStarted() } return existingPorts @@ -578,6 +593,7 @@ func (proxier *Proxier) unmergeService(service *v1.Service, existingPorts sets.S klog.Error(err) } proxier.loadBalancer.DeleteService(serviceName) + info.setFinished() } for _, svcIP := range staleUDPServices.UnsortedList() { if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil { diff --git a/pkg/proxy/userspace/proxier_test.go b/pkg/proxy/userspace/proxier_test.go index b51cc9d9007..db032c4f7e4 100644 --- a/pkg/proxy/userspace/proxier_test.go +++ b/pkg/proxy/userspace/proxier_test.go @@ -26,7 +26,6 @@ import ( "os" "reflect" "strconv" - "sync/atomic" "testing" "time" @@ -89,14 +88,72 @@ func waitForClosedPortUDP(p *Proxier, proxyPort int) error { return fmt.Errorf("port %d still open", proxyPort) } -func waitForServiceInfo(p *Proxier, service proxy.ServicePortName) (*ServiceInfo, bool) { +func waitForProxyFinished(t *testing.T, svcInfo *ServiceInfo) { + if err := wait.PollImmediate(50*time.Millisecond, 30*time.Second, func() (bool, error) { + return svcInfo.IsFinished(), nil + }); err != nil { + t.Errorf("timed out waiting for proxy socket to finish: %v", err) + } +} + +func waitForServiceInfo(t *testing.T, p *Proxier, servicePortName proxy.ServicePortName, service *v1.Service) *ServiceInfo { var svcInfo *ServiceInfo var exists bool wait.PollImmediate(50*time.Millisecond, 3*time.Second, func() (bool, error) { - svcInfo, exists = p.getServiceInfo(service) + svcInfo, exists = p.getServiceInfo(servicePortName) return exists, nil }) - return svcInfo, exists + if !exists { + t.Fatalf("can't find serviceInfo for %s", servicePortName) + } + if !svcInfo.IsAlive() { + t.Fatalf("expected IsAlive() true for %s", servicePortName) + } + + var servicePort *v1.ServicePort + for _, port := range service.Spec.Ports { + if port.Name == servicePortName.Port { + servicePort = &port + break + } + } + if servicePort == nil { + t.Errorf("failed to find service %s port with name %q", servicePortName.NamespacedName, servicePortName.Port) + } + if svcInfo.portal.ip.String() != service.Spec.ClusterIP || int32(svcInfo.portal.port) != servicePort.Port || svcInfo.protocol != servicePort.Protocol { + t.Errorf("unexpected serviceInfo for %s: %#v", servicePortName, svcInfo) + } + + // Wait for proxy socket to start up + if err := wait.PollImmediate(50*time.Millisecond, 30*time.Second, func() (bool, error) { + return svcInfo.IsStarted(), nil + }); err != nil { + t.Errorf("timed out waiting for proxy socket %s to start: %v", servicePortName, err) + } + + return svcInfo +} + +// addServiceAndWaitForInfoIndex adds the service to the proxy and waits for the +// named port to be ready +func addServiceAndWaitForInfo(t *testing.T, p *Proxier, servicePortName proxy.ServicePortName, service *v1.Service) *ServiceInfo { + p.OnServiceAdd(service) + return waitForServiceInfo(t, p, servicePortName, service) +} + +// deleteServiceAndWait deletes the servicein the proxy and waits until it +// has been cleaned up. waitFunc will be called to wait for the service +// port's socket to close. +func deleteServiceAndWait(t *testing.T, p *Proxier, svcInfo *ServiceInfo, service *v1.Service, waitFunc func(*Proxier, int) error) { + p.OnServiceDelete(service) + // Wait for the port to really close. + if err := waitFunc(p, svcInfo.proxyPort); err != nil { + t.Fatalf(err.Error()) + } + waitForProxyFinished(t, svcInfo) + if svcInfo.IsAlive() { + t.Fatalf("wrong value for IsAlive(): expected false") + } } // udpEchoServer is a simple echo server in UDP, intended for testing the proxy. @@ -210,18 +267,6 @@ func testEchoUDP(t *testing.T, address string, port int) { } } -func waitForNumProxyLoops(t *testing.T, p *Proxier, want int32) { - var got int32 - for i := 0; i < 600; i++ { - got = atomic.LoadInt32(&p.numProxyLoops) - if got == want { - return - } - time.Sleep(100 * time.Millisecond) - } - t.Errorf("expected %d ProxyLoops running, got %d", want, got) -} - func waitForNumProxyClients(t *testing.T, s *ServiceInfo, want int, timeout time.Duration) { var got int now := time.Now() @@ -242,16 +287,15 @@ func startProxier(p *Proxier, t *testing.T) { go func() { p.SyncLoop() }() - waitForNumProxyLoops(t, p, 0) p.OnServiceSynced() p.OnEndpointsSynced() } func TestTCPProxy(t *testing.T) { lb := NewLoadBalancerRR() - service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + servicePort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} lb.OnEndpointsAdd(&v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + ObjectMeta: metav1.ObjectMeta{Name: servicePort.Name, Namespace: servicePort.Namespace}, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}}, @@ -267,19 +311,22 @@ func TestTCPProxy(t *testing.T) { startProxier(p, t) defer p.shutdown() - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) - if err != nil { - t.Fatalf("error adding new service: %#v", err) - } + svcInfo := addServiceAndWaitForInfo(t, p, servicePort, &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: servicePort.Name, Namespace: servicePort.Namespace}, + Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ + Name: "p", + Port: 80, + Protocol: "TCP", + }}}, + }) testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) - waitForNumProxyLoops(t, p, 1) } func TestUDPProxy(t *testing.T) { lb := NewLoadBalancerRR() - service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + servicePort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} lb.OnEndpointsAdd(&v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + ObjectMeta: metav1.ObjectMeta{Name: servicePort.Name, Namespace: servicePort.Namespace}, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}}, @@ -295,19 +342,22 @@ func TestUDPProxy(t *testing.T) { startProxier(p, t) defer p.shutdown() - svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) - if err != nil { - t.Fatalf("error adding new service: %#v", err) - } + svcInfo := addServiceAndWaitForInfo(t, p, servicePort, &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: servicePort.Name, Namespace: servicePort.Namespace}, + Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ + Name: "p", + Port: 80, + Protocol: "UDP", + }}}, + }) testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) - waitForNumProxyLoops(t, p, 1) } func TestUDPProxyTimeout(t *testing.T) { lb := NewLoadBalancerRR() - service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + servicePort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} lb.OnEndpointsAdd(&v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + ObjectMeta: metav1.ObjectMeta{Name: servicePort.Name, Namespace: servicePort.Namespace}, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}}, @@ -323,11 +373,14 @@ func TestUDPProxyTimeout(t *testing.T) { startProxier(p, t) defer p.shutdown() - svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) - if err != nil { - t.Fatalf("error adding new service: %#v", err) - } - waitForNumProxyLoops(t, p, 1) + svcInfo := addServiceAndWaitForInfo(t, p, servicePort, &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: servicePort.Name, Namespace: servicePort.Namespace}, + Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ + Name: "p", + Port: 80, + Protocol: "UDP", + }}}, + }) testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) // When connecting to a UDP service endpoint, there should be a Conn for proxy. waitForNumProxyClients(t, svcInfo, 1, time.Second) @@ -337,17 +390,17 @@ func TestUDPProxyTimeout(t *testing.T) { func TestMultiPortProxy(t *testing.T) { lb := NewLoadBalancerRR() - serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-p"}, Port: "p"} - serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-q"}, Port: "q"} + servicePortP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-p"}, Port: "p"} + servicePortQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-q"}, Port: "q"} lb.OnEndpointsAdd(&v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, + ObjectMeta: metav1.ObjectMeta{Name: servicePortP.Name, Namespace: servicePortP.Namespace}, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Ports: []v1.EndpointPort{{Name: "p", Protocol: "TCP", Port: tcpServerPort}}, }}, }) lb.OnEndpointsAdd(&v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Name: serviceQ.Name, Namespace: serviceQ.Namespace}, + ObjectMeta: metav1.ObjectMeta{Name: servicePortQ.Name, Namespace: servicePortQ.Namespace}, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Ports: []v1.EndpointPort{{Name: "q", Protocol: "UDP", Port: udpServerPort}}, @@ -363,19 +416,25 @@ func TestMultiPortProxy(t *testing.T) { startProxier(p, t) defer p.shutdown() - svcInfoP, err := p.addServiceOnPort(serviceP, "TCP", 0, time.Second) - if err != nil { - t.Fatalf("error adding new service: %#v", err) - } - testEchoTCP(t, "127.0.0.1", svcInfoP.proxyPort) - waitForNumProxyLoops(t, p, 1) + svcInfo := addServiceAndWaitForInfo(t, p, servicePortP, &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: servicePortP.Name, Namespace: servicePortP.Namespace}, + Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ + Name: "p", + Port: 80, + Protocol: "TCP", + }}}, + }) + testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) - svcInfoQ, err := p.addServiceOnPort(serviceQ, "UDP", 0, time.Second) - if err != nil { - t.Fatalf("error adding new service: %#v", err) - } - testEchoUDP(t, "127.0.0.1", svcInfoQ.proxyPort) - waitForNumProxyLoops(t, p, 2) + svcInfo = addServiceAndWaitForInfo(t, p, servicePortQ, &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: servicePortQ.Name, Namespace: servicePortQ.Namespace}, + Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ + Name: "q", + Port: 80, + Protocol: "UDP", + }}}, + }) + testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) } func TestMultiPortOnServiceAdd(t *testing.T) { @@ -393,7 +452,7 @@ func TestMultiPortOnServiceAdd(t *testing.T) { startProxier(p, t) defer p.shutdown() - p.OnServiceAdd(&v1.Service{ + service := &v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ Name: "p", @@ -404,130 +463,23 @@ func TestMultiPortOnServiceAdd(t *testing.T) { Port: 81, Protocol: "UDP", }}}, - }) - waitForNumProxyLoops(t, p, 2) - svcInfo, exists := waitForServiceInfo(p, serviceP) - if !exists { - t.Fatalf("can't find serviceInfo for %s", serviceP) - } - if svcInfo.portal.ip.String() != "1.2.3.4" || svcInfo.portal.port != 80 || svcInfo.protocol != "TCP" { - t.Errorf("unexpected serviceInfo for %s: %#v", serviceP, svcInfo) } - svcInfo, exists = waitForServiceInfo(p, serviceQ) - if !exists { - t.Fatalf("can't find serviceInfo for %s", serviceQ) - } - if svcInfo.portal.ip.String() != "1.2.3.4" || svcInfo.portal.port != 81 || svcInfo.protocol != "UDP" { - t.Errorf("unexpected serviceInfo for %s: %#v", serviceQ, svcInfo) - } - - svcInfo, exists = p.getServiceInfo(serviceX) + // ports p and q should exist + _ = addServiceAndWaitForInfo(t, p, serviceP, service) + _ = waitForServiceInfo(t, p, serviceQ, service) + // non-existent port x should not exist + svcInfo, exists := p.getServiceInfo(serviceX) if exists { t.Fatalf("found unwanted serviceInfo for %s: %#v", serviceX, svcInfo) } } -// Helper: Stops the proxy for the named service. -func stopProxyByName(proxier *Proxier, service proxy.ServicePortName) error { - proxier.mu.Lock() - defer proxier.mu.Unlock() - info, found := proxier.serviceMap[service] - if !found { - return fmt.Errorf("unknown service: %s", service) - } - return proxier.stopProxy(service, info) -} - func TestTCPProxyStop(t *testing.T) { lb := NewLoadBalancerRR() - service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} lb.OnEndpointsAdd(&v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}}, - }}, - }) - - fexec := makeFakeExec() - - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) - if err != nil { - t.Fatal(err) - } - startProxier(p, t) - defer p.shutdown() - - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) - if err != nil { - t.Fatalf("error adding new service: %#v", err) - } - if !svcInfo.IsAlive() { - t.Fatalf("wrong value for IsAlive(): expected true") - } - conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort)) - if err != nil { - t.Fatalf("error connecting to proxy: %v", err) - } - conn.Close() - waitForNumProxyLoops(t, p, 1) - - stopProxyByName(p, service) - if svcInfo.IsAlive() { - t.Fatalf("wrong value for IsAlive(): expected false") - } - // Wait for the port to really close. - if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { - t.Fatalf(err.Error()) - } - waitForNumProxyLoops(t, p, 0) -} - -func TestUDPProxyStop(t *testing.T) { - lb := NewLoadBalancerRR() - service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsAdd(&v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}}, - }}, - }) - - fexec := makeFakeExec() - - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) - if err != nil { - t.Fatal(err) - } - startProxier(p, t) - defer p.shutdown() - - svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) - if err != nil { - t.Fatalf("error adding new service: %#v", err) - } - conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort)) - if err != nil { - t.Fatalf("error connecting to proxy: %v", err) - } - conn.Close() - waitForNumProxyLoops(t, p, 1) - - stopProxyByName(p, service) - // Wait for the port to really close. - if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { - t.Fatalf(err.Error()) - } - waitForNumProxyLoops(t, p, 0) -} - -func TestTCPProxyUpdateDelete(t *testing.T) { - lb := NewLoadBalancerRR() - servicePortName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsAdd(&v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Namespace: servicePortName.Namespace, Name: servicePortName.Name}, + ObjectMeta: metav1.ObjectMeta{Namespace: serviceP.Namespace, Name: serviceP.Name}, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}}, @@ -544,7 +496,85 @@ func TestTCPProxyUpdateDelete(t *testing.T) { defer p.shutdown() service := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: servicePortName.Name, Namespace: servicePortName.Namespace}, + ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, + Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ + Name: "p", + Port: 80, + Protocol: "TCP", + }}}, + } + svcInfo := addServiceAndWaitForInfo(t, p, serviceP, service) + conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort)) + if err != nil { + t.Fatalf("error connecting to proxy: %v", err) + } + conn.Close() + + // Wait for the port to really close. + deleteServiceAndWait(t, p, svcInfo, service, waitForClosedPortTCP) +} + +func TestUDPProxyStop(t *testing.T) { + lb := NewLoadBalancerRR() + serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + lb.OnEndpointsAdd(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Namespace: serviceP.Namespace, Name: serviceP.Name}, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}}, + }}, + }) + + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + if err != nil { + t.Fatal(err) + } + startProxier(p, t) + defer p.shutdown() + + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, + Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ + Name: "p", + Port: 80, + Protocol: "UDP", + }}}, + } + svcInfo := addServiceAndWaitForInfo(t, p, serviceP, service) + conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort)) + if err != nil { + t.Fatalf("error connecting to proxy: %v", err) + } + conn.Close() + + // Wait for the port to really close. + deleteServiceAndWait(t, p, svcInfo, service, waitForClosedPortUDP) +} + +func TestTCPProxyUpdateDelete(t *testing.T) { + lb := NewLoadBalancerRR() + serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + lb.OnEndpointsAdd(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Namespace: serviceP.Namespace, Name: serviceP.Name}, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}}, + }}, + }) + + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + if err != nil { + t.Fatal(err) + } + startProxier(p, t) + defer p.shutdown() + + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ Name: "p", Port: 9997, @@ -552,134 +582,27 @@ func TestTCPProxyUpdateDelete(t *testing.T) { }}}, } - p.OnServiceAdd(service) - waitForNumProxyLoops(t, p, 1) - p.OnServiceDelete(service) - if err := waitForClosedPortTCP(p, int(service.Spec.Ports[0].Port)); err != nil { - t.Fatalf(err.Error()) - } - waitForNumProxyLoops(t, p, 0) -} - -func TestUDPProxyUpdateDelete(t *testing.T) { - lb := NewLoadBalancerRR() - service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsAdd(&v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}}, - }}, - }) - - fexec := makeFakeExec() - - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) - if err != nil { - t.Fatal(err) - } - startProxier(p, t) - defer p.shutdown() - - svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) - if err != nil { - t.Fatalf("error adding new service: %#v", err) - } - conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort)) - if err != nil { - t.Fatalf("error connecting to proxy: %v", err) - } - conn.Close() - waitForNumProxyLoops(t, p, 1) - - p.OnServiceDelete(&v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ - Name: "p", - Port: int32(svcInfo.proxyPort), - Protocol: "UDP", - }}}, - }) - if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { - t.Fatalf(err.Error()) - } - waitForNumProxyLoops(t, p, 0) -} - -func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { - lb := NewLoadBalancerRR() - service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - endpoint := &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}}, - }}, - } - lb.OnEndpointsAdd(endpoint) - - fexec := makeFakeExec() - - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) - if err != nil { - t.Fatal(err) - } - startProxier(p, t) - defer p.shutdown() - - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) - if err != nil { - t.Fatalf("error adding new service: %#v", err) - } + svcInfo := addServiceAndWaitForInfo(t, p, serviceP, service) conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort)) if err != nil { t.Fatalf("error connecting to proxy: %v", err) } conn.Close() - waitForNumProxyLoops(t, p, 1) - p.OnServiceDelete(&v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ - Name: "p", - Port: int32(svcInfo.proxyPort), - Protocol: "TCP", - }}}, - }) - if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { - t.Fatalf(err.Error()) - } - waitForNumProxyLoops(t, p, 0) - - // need to add endpoint here because it got clean up during service delete - lb.OnEndpointsAdd(endpoint) - p.OnServiceAdd(&v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ - Name: "p", - Port: int32(svcInfo.proxyPort), - Protocol: "TCP", - }}}, - }) - svcInfo, exists := waitForServiceInfo(p, service) - if !exists { - t.Fatalf("can't find serviceInfo for %s", service) - } - testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) - waitForNumProxyLoops(t, p, 1) + // Wait for the port to really close. + deleteServiceAndWait(t, p, svcInfo, service, waitForClosedPortTCP) } -func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { +func TestUDPProxyUpdateDelete(t *testing.T) { lb := NewLoadBalancerRR() - service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - endpoint := &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + lb.OnEndpointsAdd(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Namespace: serviceP.Namespace, Name: serviceP.Name}, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}}, }}, - } - lb.OnEndpointsAdd(endpoint) + }) fexec := makeFakeExec() @@ -690,53 +613,123 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { startProxier(p, t) defer p.shutdown() - svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) - if err != nil { - t.Fatalf("error adding new service: %#v", err) + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, + Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ + Name: "p", + Port: 9997, + Protocol: "UDP", + }}}, } + + svcInfo := addServiceAndWaitForInfo(t, p, serviceP, service) conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort)) if err != nil { t.Fatalf("error connecting to proxy: %v", err) } conn.Close() - waitForNumProxyLoops(t, p, 1) - p.OnServiceDelete(&v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + // Wait for the port to really close. + deleteServiceAndWait(t, p, svcInfo, service, waitForClosedPortUDP) +} + +func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { + lb := NewLoadBalancerRR() + serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + endpoint := &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}}, + }}, + } + lb.OnEndpointsAdd(endpoint) + + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + if err != nil { + t.Fatal(err) + } + startProxier(p, t) + defer p.shutdown() + + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ Name: "p", - Port: int32(svcInfo.proxyPort), - Protocol: "UDP", + Port: 9997, + Protocol: "TCP", }}}, - }) - if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { - t.Fatalf(err.Error()) } - waitForNumProxyLoops(t, p, 0) + + svcInfo := addServiceAndWaitForInfo(t, p, serviceP, service) + conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort)) + if err != nil { + t.Fatalf("error connecting to proxy: %v", err) + } + conn.Close() + + // Wait for the port to really close. + deleteServiceAndWait(t, p, svcInfo, service, waitForClosedPortTCP) // need to add endpoint here because it got clean up during service delete lb.OnEndpointsAdd(endpoint) - p.OnServiceAdd(&v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + svcInfo = addServiceAndWaitForInfo(t, p, serviceP, service) + testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) +} + +func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { + lb := NewLoadBalancerRR() + serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + endpoint := &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}}, + }}, + } + lb.OnEndpointsAdd(endpoint) + + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + if err != nil { + t.Fatal(err) + } + startProxier(p, t) + defer p.shutdown() + + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ Name: "p", - Port: int32(svcInfo.proxyPort), + Port: 9997, Protocol: "UDP", }}}, - }) - svcInfo, exists := waitForServiceInfo(p, service) - if !exists { - t.Fatalf("can't find serviceInfo") } + + svcInfo := addServiceAndWaitForInfo(t, p, serviceP, service) + conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort)) + if err != nil { + t.Fatalf("error connecting to proxy: %v", err) + } + conn.Close() + + // Wait for the port to really close. + deleteServiceAndWait(t, p, svcInfo, service, waitForClosedPortUDP) + + // need to add endpoint here because it got clean up during service delete + lb.OnEndpointsAdd(endpoint) + svcInfo = addServiceAndWaitForInfo(t, p, serviceP, service) testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) - waitForNumProxyLoops(t, p, 1) } func TestTCPProxyUpdatePort(t *testing.T) { lb := NewLoadBalancerRR() - service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} lb.OnEndpointsAdd(&v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}}, @@ -752,40 +745,36 @@ func TestTCPProxyUpdatePort(t *testing.T) { startProxier(p, t) defer p.shutdown() - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) - if err != nil { - t.Fatalf("error adding new service: %#v", err) - } - testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) - waitForNumProxyLoops(t, p, 1) - - p.OnServiceAdd(&v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + origPort := 99 + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ Name: "p", - Port: 99, + Port: int32(origPort), Protocol: "TCP", }}}, - }) + } + + svcInfo := addServiceAndWaitForInfo(t, p, serviceP, service) + testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) + + service.Spec.Ports[0].Port = 100 + p.OnServiceAdd(service) // Wait for the socket to actually get free. - if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { + if err := waitForClosedPortTCP(p, origPort); err != nil { t.Fatalf(err.Error()) } - svcInfo, exists := waitForServiceInfo(p, service) - if !exists { - t.Fatalf("can't find serviceInfo") - } + waitForProxyFinished(t, svcInfo) + + svcInfo = addServiceAndWaitForInfo(t, p, serviceP, service) testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) - // This is a bit async, but this should be sufficient. - time.Sleep(500 * time.Millisecond) - waitForNumProxyLoops(t, p, 1) } func TestUDPProxyUpdatePort(t *testing.T) { lb := NewLoadBalancerRR() - service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} lb.OnEndpointsAdd(&v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Ports: []v1.EndpointPort{{Name: "p", Port: udpServerPort}}, @@ -801,37 +790,36 @@ func TestUDPProxyUpdatePort(t *testing.T) { startProxier(p, t) defer p.shutdown() - svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) - if err != nil { - t.Fatalf("error adding new service: %#v", err) - } - waitForNumProxyLoops(t, p, 1) - - p.OnServiceAdd(&v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + origPort := 99 + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ Name: "p", - Port: 99, + Port: int32(origPort), Protocol: "UDP", }}}, - }) + } + + svcInfo := addServiceAndWaitForInfo(t, p, serviceP, service) + testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) + + service.Spec.Ports[0].Port = 100 + p.OnServiceAdd(service) // Wait for the socket to actually get free. - if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { + if err := waitForClosedPortUDP(p, origPort); err != nil { t.Fatalf(err.Error()) } - svcInfo, exists := waitForServiceInfo(p, service) - if !exists { - t.Fatalf("can't find serviceInfo") - } + waitForProxyFinished(t, svcInfo) + + svcInfo = addServiceAndWaitForInfo(t, p, serviceP, service) testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) - waitForNumProxyLoops(t, p, 1) } func TestProxyUpdatePublicIPs(t *testing.T) { lb := NewLoadBalancerRR() - service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} lb.OnEndpointsAdd(&v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}}, @@ -847,44 +835,34 @@ func TestProxyUpdatePublicIPs(t *testing.T) { startProxier(p, t) defer p.shutdown() - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) - if err != nil { - t.Fatalf("error adding new service: %#v", err) + origPort := 9997 + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, + Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ + Name: "p", + Port: int32(origPort), + Protocol: "TCP", + }}}, } - testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) - waitForNumProxyLoops(t, p, 1) - p.OnServiceAdd(&v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Spec: v1.ServiceSpec{ - Ports: []v1.ServicePort{{ - Name: "p", - Port: int32(svcInfo.portal.port), - Protocol: "TCP", - }}, - ClusterIP: svcInfo.portal.ip.String(), - ExternalIPs: []string{"4.3.2.1"}, - }, - }) + svcInfo := addServiceAndWaitForInfo(t, p, serviceP, service) + testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) + + service.Spec.ExternalIPs = []string{"4.3.2.1"} + svcInfo = addServiceAndWaitForInfo(t, p, serviceP, service) // Wait for the socket to actually get free. - if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { + if err := waitForClosedPortTCP(p, origPort); err != nil { t.Fatalf(err.Error()) } - svcInfo, exists := waitForServiceInfo(p, service) - if !exists { - t.Fatalf("can't find serviceInfo") - } + waitForProxyFinished(t, svcInfo) testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) - // This is a bit async, but this should be sufficient. - time.Sleep(500 * time.Millisecond) - waitForNumProxyLoops(t, p, 1) } func TestProxyUpdatePortal(t *testing.T) { lb := NewLoadBalancerRR() - service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + servicePort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} endpoint := &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + ObjectMeta: metav1.ObjectMeta{Name: servicePort.Name, Namespace: servicePort.Namespace}, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}}, @@ -901,36 +879,30 @@ func TestProxyUpdatePortal(t *testing.T) { startProxier(p, t) defer p.shutdown() - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) - if err != nil { - t.Fatalf("error adding new service: %#v", err) - } - testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) - waitForNumProxyLoops(t, p, 1) - svcv0 := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + ObjectMeta: metav1.ObjectMeta{Name: servicePort.Name, Namespace: servicePort.Namespace}, Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ Name: "p", - Port: int32(svcInfo.proxyPort), + Port: 9997, Protocol: "TCP", }}}, } + svcInfo := addServiceAndWaitForInfo(t, p, servicePort, svcv0) + testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) + svcv1 := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Spec: v1.ServiceSpec{ClusterIP: "", Ports: []v1.ServicePort{{ - Name: "p", - Port: int32(svcInfo.proxyPort), - Protocol: "TCP", - }}}, + ObjectMeta: svcv0.ObjectMeta, + Spec: v1.ServiceSpec{ClusterIP: "", Ports: []v1.ServicePort{ + svcv0.Spec.Ports[0], + }}, } p.OnServiceUpdate(svcv0, svcv1) // Wait for the service to be removed because it had an empty ClusterIP var exists bool for i := 0; i < 50; i++ { - _, exists = p.getServiceInfo(service) + _, exists = p.getServiceInfo(servicePort) if !exists { break } @@ -939,37 +911,30 @@ func TestProxyUpdatePortal(t *testing.T) { if exists { t.Fatalf("service with empty ClusterIP should not be included in the proxy") } + waitForProxyFinished(t, svcInfo) svcv2 := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Spec: v1.ServiceSpec{ClusterIP: "None", Ports: []v1.ServicePort{{ - Name: "p", - Port: int32(svcInfo.proxyPort), - Protocol: "TCP", - }}}, + ObjectMeta: svcv0.ObjectMeta, + Spec: v1.ServiceSpec{ClusterIP: "None", Ports: []v1.ServicePort{ + svcv0.Spec.Ports[0], + }}, } p.OnServiceUpdate(svcv1, svcv2) - _, exists = p.getServiceInfo(service) + _, exists = p.getServiceInfo(servicePort) if exists { t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy") } svcv3 := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ - Name: "p", - Port: int32(svcInfo.proxyPort), - Protocol: "TCP", - }}}, + ObjectMeta: svcv0.ObjectMeta, + Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{ + svcv0.Spec.Ports[0], + }}, } - p.OnServiceUpdate(svcv2, svcv3) lb.OnEndpointsAdd(endpoint) - svcInfo, exists = waitForServiceInfo(p, service) - if !exists { - t.Fatalf("service with ClusterIP set not found in the proxy") - } + p.OnServiceUpdate(svcv2, svcv3) + svcInfo = waitForServiceInfo(t, p, servicePort, svcv3) testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) - waitForNumProxyLoops(t, p, 1) } type fakeRunner struct{}