diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 0b753163a40..e42e31341e3 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -470,7 +470,7 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx if err != nil { return nil, fmt.Errorf("unable to create proxier: %v", err) } - serviceHandler = proxierUserspace + serviceEventHandler = proxierUserspace proxier = proxierUserspace } // Remove artifacts from the pure-iptables Proxier, if not on Windows. diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index d1fcc2dc981..5032577b81f 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -52,8 +52,6 @@ type ServiceInfo struct { Timeout time.Duration // ActiveClients is the cache of active UDP clients being proxied by this proxy for this service ActiveClients *ClientCache - // ServiceRef is a full object reference to the the service described by this ServiceInfo - ServiceRef api.ObjectReference isAliveAtomic int32 // Only access this with atomic ops portal portal @@ -358,7 +356,7 @@ func (proxier *Proxier) setServiceInfo(service proxy.ServicePortName, info *Serv // addServiceOnPort starts listening for a new service, returning the ServiceInfo. // Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP // connections, for now. -func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, serviceRef api.ObjectReference, protocol api.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) { +func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) { sock, err := proxier.makeProxySocket(protocol, proxier.listenIP, proxyPort) if err != nil { return nil, err @@ -376,7 +374,6 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, serviceR si := &ServiceInfo{ Timeout: timeout, ActiveClients: newClientCache(), - ServiceRef: serviceRef, isAliveAtomic: 1, proxyPort: portNum, @@ -398,108 +395,124 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, serviceR return si, nil } -// OnServiceUpdate manages the active set of service proxies. -// Active service proxies are reinitialized if found in the update set or -// shutdown if missing from the update set. -func (proxier *Proxier) OnServiceUpdate(services []*api.Service) { - glog.V(4).Infof("Received update notice: %+v", services) - activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set - for _, service := range services { - // if ClusterIP is "None" or empty, skip proxying - if !helper.IsServiceIPSet(service) { - glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP) +func (proxier *Proxier) mergeService(service *api.Service) sets.String { + if service == nil { + return nil + } + svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + if !helper.IsServiceIPSet(service) { + glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP) + return nil + } + existingPorts := sets.NewString() + for i := range service.Spec.Ports { + servicePort := &service.Spec.Ports[i] + serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} + existingPorts.Insert(servicePort.Name) + info, exists := proxier.getServiceInfo(serviceName) + // TODO: check health of the socket? What if ProxyLoop exited? + if exists && sameConfig(info, service, servicePort) { + // Nothing changed. + continue + } + if exists { + glog.V(4).Infof("Something changed for service %q: stopping it", serviceName) + if err := proxier.closePortal(serviceName, info); err != nil { + glog.Errorf("Failed to close portal for %q: %v", serviceName, err) + } + if err := proxier.stopProxy(serviceName, info); err != nil { + glog.Errorf("Failed to stop service %q: %v", serviceName, err) + } + } + proxyPort, err := proxier.proxyPorts.AllocateNext() + if err != nil { + glog.Errorf("failed to allocate proxy port for service %q: %v", serviceName, err) continue } - // TODO: should this just be ref.GetReference? - svcGVK := service.GetObjectKind().GroupVersionKind() - svcRef := api.ObjectReference{ - Kind: svcGVK.Kind, - Namespace: service.Namespace, - Name: service.Name, - UID: service.UID, - APIVersion: svcGVK.GroupVersion().String(), - ResourceVersion: service.ResourceVersion, + serviceIP := net.ParseIP(service.Spec.ClusterIP) + glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol) + info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout) + if err != nil { + glog.Errorf("Failed to start proxy for %q: %v", serviceName, err) + continue } + info.portal.ip = serviceIP + info.portal.port = int(servicePort.Port) + info.externalIPs = service.Spec.ExternalIPs + // Deep-copy in case the service instance changes + info.loadBalancerStatus = *helper.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer) + info.nodePort = int(servicePort.NodePort) + info.sessionAffinityType = service.Spec.SessionAffinity + glog.V(4).Infof("info: %#v", info) - for i := range service.Spec.Ports { - servicePort := &service.Spec.Ports[i] - serviceName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: servicePort.Name} - activeServices[serviceName] = true - serviceIP := net.ParseIP(service.Spec.ClusterIP) - info, exists := proxier.getServiceInfo(serviceName) - // TODO: check health of the socket? What if ProxyLoop exited? - if exists && sameConfig(info, service, servicePort) { - // Nothing changed. - continue - } - if exists { - glog.V(4).Infof("Something changed for service %q: stopping it", serviceName) - err := proxier.closePortal(serviceName, info) - if err != nil { - glog.Errorf("Failed to close portal for %q: %v", serviceName, err) - } - err = proxier.stopProxy(serviceName, info) - if err != nil { - glog.Errorf("Failed to stop service %q: %v", serviceName, err) - } - } - - proxyPort, err := proxier.proxyPorts.AllocateNext() - if err != nil { - glog.Errorf("failed to allocate proxy port for service %q: %v", serviceName, err) - continue - } - - glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol) - info, err = proxier.addServiceOnPort(serviceName, svcRef, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout) - if err != nil { - glog.Errorf("Failed to start proxy for %q: %v", serviceName, err) - continue - } - info.portal.ip = serviceIP - info.portal.port = int(servicePort.Port) - info.externalIPs = service.Spec.ExternalIPs - // Deep-copy in case the service instance changes - info.loadBalancerStatus = *helper.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer) - info.nodePort = int(servicePort.NodePort) - info.sessionAffinityType = service.Spec.SessionAffinity - glog.V(4).Infof("info: %#v", info) - - err = proxier.openPortal(serviceName, info) - if err != nil { - glog.Errorf("Failed to open portal for %q: %v", serviceName, err) - } - proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeMinutes) + if err := proxier.openPortal(serviceName, info); err != nil { + glog.Errorf("Failed to open portal for %q: %v", serviceName, err) } + proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeMinutes) + } + + return existingPorts +} + +func (proxier *Proxier) unmergeService(service *api.Service, existingPorts sets.String) { + if service == nil { + return + } + svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + if !helper.IsServiceIPSet(service) { + glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP) + return } staleUDPServices := sets.NewString() proxier.mu.Lock() defer proxier.mu.Unlock() - for name, info := range proxier.serviceMap { - if !activeServices[name] { - glog.V(1).Infof("Stopping service %q", name) - - if proxier.serviceMap[name].protocol == api.ProtocolUDP { - staleUDPServices.Insert(proxier.serviceMap[name].portal.ip.String()) - } - - err := proxier.closePortal(name, info) - if err != nil { - glog.Errorf("Failed to close portal for %q: %v", name, err) - } - err = proxier.stopProxyInternal(name, info) - if err != nil { - glog.Errorf("Failed to stop service %q: %v", name, err) - } - proxier.loadBalancer.DeleteService(name) + for i := range service.Spec.Ports { + servicePort := &service.Spec.Ports[i] + if existingPorts.Has(servicePort.Name) { + continue } - } + serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} + glog.V(1).Infof("Stopping service %q", serviceName) + info, exists := proxier.serviceMap[serviceName] + if !exists { + glog.Errorf("Service %q is being removed but doesn't exist", serviceName) + continue + } + + if proxier.serviceMap[serviceName].protocol == api.ProtocolUDP { + staleUDPServices.Insert(proxier.serviceMap[serviceName].portal.ip.String()) + } + + if err := proxier.closePortal(serviceName, info); err != nil { + glog.Errorf("Failed to close portal for %q: %v", serviceName, err) + } + if err := proxier.stopProxyInternal(serviceName, info); err != nil { + glog.Errorf("Failed to stop service %q: %v", serviceName, err) + } + proxier.loadBalancer.DeleteService(serviceName) + } utilproxy.DeleteServiceConnections(proxier.exec, staleUDPServices.List()) } +func (proxier *Proxier) OnServiceAdd(service *api.Service) { + _ = proxier.mergeService(service) +} + +func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) { + existingPorts := proxier.mergeService(service) + proxier.unmergeService(oldService, existingPorts) +} + +func (proxier *Proxier) OnServiceDelete(service *api.Service) { + proxier.unmergeService(service, sets.NewString()) +} + +func (proxier *Proxier) OnServiceSynced() { +} + func sameConfig(info *ServiceInfo, service *api.Service, port *api.ServicePort) bool { if info.protocol != port.Protocol || info.portal.port != int(port.Port) || info.nodePort != int(port.NodePort) { return false diff --git a/pkg/proxy/userspace/proxier_test.go b/pkg/proxy/userspace/proxier_test.go index f00fa30162a..b497318a979 100644 --- a/pkg/proxy/userspace/proxier_test.go +++ b/pkg/proxy/userspace/proxier_test.go @@ -218,8 +218,7 @@ func TestTCPProxy(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} - svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second) + svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -246,8 +245,7 @@ func TestUDPProxy(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} - svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second) + svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -274,8 +272,7 @@ func TestUDPProxyTimeout(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} - svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second) + svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -314,16 +311,14 @@ func TestMultiPortProxy(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - serviceRefP := api.ObjectReference{Name: serviceP.Name, Namespace: serviceP.Namespace, Kind: "Service", APIVersion: "v1"} - svcInfoP, err := p.addServiceOnPort(serviceP, serviceRefP, "TCP", 0, time.Second) + svcInfoP, err := p.addServiceOnPort(serviceP, "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } testEchoTCP(t, "127.0.0.1", svcInfoP.proxyPort) waitForNumProxyLoops(t, p, 1) - serviceRefQ := api.ObjectReference{Name: serviceQ.Name, Namespace: serviceQ.Namespace, Kind: "Service", APIVersion: "v1"} - svcInfoQ, err := p.addServiceOnPort(serviceQ, serviceRefQ, "UDP", 0, time.Second) + svcInfoQ, err := p.addServiceOnPort(serviceQ, "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -331,7 +326,7 @@ func TestMultiPortProxy(t *testing.T) { waitForNumProxyLoops(t, p, 2) } -func TestMultiPortOnServiceUpdate(t *testing.T) { +func TestMultiPortOnServiceAdd(t *testing.T) { lb := NewLoadBalancerRR() serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "q"} @@ -345,7 +340,7 @@ func TestMultiPortOnServiceUpdate(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - p.OnServiceUpdate([]*api.Service{{ + p.OnServiceAdd(&api.Service{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Name: "p", @@ -356,7 +351,7 @@ func TestMultiPortOnServiceUpdate(t *testing.T) { Port: 81, Protocol: "UDP", }}}, - }}) + }) waitForNumProxyLoops(t, p, 2) svcInfo, exists := p.getServiceInfo(serviceP) if !exists { @@ -408,8 +403,7 @@ func TestTCPProxyStop(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} - svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second) + svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -453,8 +447,7 @@ func TestUDPProxyStop(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} - svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second) + svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -492,8 +485,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} - svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second) + svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -504,7 +496,14 @@ func TestTCPProxyUpdateDelete(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{}) + p.OnServiceDelete(&api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ + Name: "p", + Port: int32(svcInfo.proxyPort), + Protocol: "TCP", + }}}, + }) if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } @@ -530,8 +529,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} - svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second) + svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -542,7 +540,14 @@ func TestUDPProxyUpdateDelete(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{}) + p.OnServiceDelete(&api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ + Name: "p", + Port: int32(svcInfo.proxyPort), + Protocol: "UDP", + }}}, + }) if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } @@ -569,8 +574,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} - svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second) + svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -581,7 +585,14 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{}) + p.OnServiceDelete(&api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ + Name: "p", + Port: int32(svcInfo.proxyPort), + Protocol: "TCP", + }}}, + }) if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } @@ -589,14 +600,14 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { // need to add endpoint here because it got clean up during service delete lb.OnEndpointsAdd(endpoint) - p.OnServiceUpdate([]*api.Service{{ + p.OnServiceAdd(&api.Service{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Name: "p", Port: int32(svcInfo.proxyPort), Protocol: "TCP", }}}, - }}) + }) svcInfo, exists := p.getServiceInfo(service) if !exists { t.Fatalf("can't find serviceInfo for %s", service) @@ -625,8 +636,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} - svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second) + svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -637,7 +647,14 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{}) + p.OnServiceDelete(&api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ + Name: "p", + Port: int32(svcInfo.proxyPort), + Protocol: "UDP", + }}}, + }) if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } @@ -645,14 +662,14 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { // need to add endpoint here because it got clean up during service delete lb.OnEndpointsAdd(endpoint) - p.OnServiceUpdate([]*api.Service{{ + p.OnServiceAdd(&api.Service{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Name: "p", Port: int32(svcInfo.proxyPort), Protocol: "UDP", }}}, - }}) + }) svcInfo, exists := p.getServiceInfo(service) if !exists { t.Fatalf("can't find serviceInfo") @@ -680,22 +697,21 @@ func TestTCPProxyUpdatePort(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} - svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second) + svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{{ + p.OnServiceAdd(&api.Service{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Name: "p", Port: 99, Protocol: "TCP", }}}, - }}) + }) // Wait for the socket to actually get free. if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) @@ -729,21 +745,20 @@ func TestUDPProxyUpdatePort(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} - svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second) + svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{{ + p.OnServiceAdd(&api.Service{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Name: "p", Port: 99, Protocol: "UDP", }}}, - }}) + }) // Wait for the socket to actually get free. if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) @@ -775,15 +790,14 @@ func TestProxyUpdatePublicIPs(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} - svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second) + svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{{ + p.OnServiceAdd(&api.Service{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ Ports: []api.ServicePort{{ @@ -794,7 +808,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) { ClusterIP: svcInfo.portal.ip.String(), ExternalIPs: []string{"4.3.2.1"}, }, - }}) + }) // Wait for the socket to actually get free. if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) @@ -829,48 +843,59 @@ func TestProxyUpdatePortal(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} - svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second) + svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{ - Name: "p", - Port: int32(svcInfo.proxyPort), - Protocol: "TCP", - }}}, - }}) - _, exists := p.getServiceInfo(service) - if exists { - t.Fatalf("service with empty ClusterIP should not be included in the proxy") - } - - p.OnServiceUpdate([]*api.Service{{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{ - Name: "p", - Port: int32(svcInfo.proxyPort), - Protocol: "TCP", - }}}, - }}) - _, exists = p.getServiceInfo(service) - if exists { - t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy") - } - - p.OnServiceUpdate([]*api.Service{{ + svcv0 := &api.Service{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Name: "p", Port: int32(svcInfo.proxyPort), Protocol: "TCP", }}}, - }}) + } + + svcv1 := &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{ + Name: "p", + Port: int32(svcInfo.proxyPort), + Protocol: "TCP", + }}}, + } + p.OnServiceUpdate(svcv0, svcv1) + _, exists := p.getServiceInfo(service) + if exists { + t.Fatalf("service with empty ClusterIP should not be included in the proxy") + } + + svcv2 := &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{ + Name: "p", + Port: int32(svcInfo.proxyPort), + Protocol: "TCP", + }}}, + } + p.OnServiceUpdate(svcv1, svcv2) + _, exists = p.getServiceInfo(service) + if exists { + t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy") + } + + svcv3 := &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ + Name: "p", + Port: int32(svcInfo.proxyPort), + Protocol: "TCP", + }}}, + } + p.OnServiceUpdate(svcv2, svcv3) lb.OnEndpointsAdd(endpoint) svcInfo, exists = p.getServiceInfo(service) if !exists {