From 6bbf2aaab74423ecc9d2956329cead57f7f67b0a Mon Sep 17 00:00:00 2001 From: BenTheElder Date: Sat, 8 Aug 2015 15:16:55 -0400 Subject: [PATCH] Refactor pkg/proxy/config's ServiceConfigHandler and EndpointsConfigHandler to have different update methods. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refactor `pkg/proxy/config`’s ServiceConfigHandler.OnUpdate and EndpointsConfigHandler.OnUpdate to different method names as they have different signatures. This will let the new proxy (https://github.com/GoogleCloudPlatform/kubernetes/issues/3760) implement both interfaces. Since we won’t need a separate loadbalancer structure (load balancing is handled in the proxy rules), we will simply handle both event types from the same object. --- pkg/proxy/config/config.go | 12 +++--- pkg/proxy/config/config_test.go | 4 +- pkg/proxy/types.go | 4 +- pkg/proxy/userspace/proxier.go | 2 +- pkg/proxy/userspace/proxier_test.go | 54 +++++++++++++------------- pkg/proxy/userspace/roundrobin.go | 6 +-- pkg/proxy/userspace/roundrobin_test.go | 42 ++++++++++---------- 7 files changed, 62 insertions(+), 62 deletions(-) diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 38925ae26bc..635ecdca805 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -57,17 +57,17 @@ type EndpointsUpdate struct { // ServiceConfigHandler is an abstract interface of objects which receive update notifications for the set of services. type ServiceConfigHandler interface { - // OnUpdate gets called when a configuration has been changed by one of the sources. + // OnServiceUpdate gets called when a configuration has been changed by one of the sources. // This is the union of all the configuration sources. - OnUpdate(services []api.Service) + OnServiceUpdate(services []api.Service) } // EndpointsConfigHandler is an abstract interface of objects which receive update notifications for the set of endpoints. type EndpointsConfigHandler interface { - // OnUpdate gets called when endpoints configuration is changed for a given + // OnEndpointsUpdate gets called when endpoints configuration is changed for a given // service on any of the configuration sources. An example is when a new // service comes up, or when containers come up or down for an existing service. - OnUpdate(endpoints []api.Endpoints) + OnEndpointsUpdate(endpoints []api.Endpoints) } // EndpointsConfig tracks a set of endpoints configurations. @@ -91,7 +91,7 @@ func NewEndpointsConfig() *EndpointsConfig { func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) { c.bcaster.Add(config.ListenerFunc(func(instance interface{}) { - handler.OnUpdate(instance.([]api.Endpoints)) + handler.OnEndpointsUpdate(instance.([]api.Endpoints)) })) } @@ -189,7 +189,7 @@ func NewServiceConfig() *ServiceConfig { func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) { c.bcaster.Add(config.ListenerFunc(func(instance interface{}) { - handler.OnUpdate(instance.([]api.Service)) + handler.OnServiceUpdate(instance.([]api.Service)) })) } diff --git a/pkg/proxy/config/config_test.go b/pkg/proxy/config/config_test.go index 7d070c6982b..8dd373b18de 100644 --- a/pkg/proxy/config/config_test.go +++ b/pkg/proxy/config/config_test.go @@ -57,7 +57,7 @@ func NewServiceHandlerMock() *ServiceHandlerMock { return &ServiceHandlerMock{services: make([]api.Service, 0)} } -func (h *ServiceHandlerMock) OnUpdate(services []api.Service) { +func (h *ServiceHandlerMock) OnServiceUpdate(services []api.Service) { sort.Sort(sortedServices(services)) h.services = services h.updated.Done() @@ -95,7 +95,7 @@ func NewEndpointsHandlerMock() *EndpointsHandlerMock { return &EndpointsHandlerMock{endpoints: make([]api.Endpoints, 0)} } -func (h *EndpointsHandlerMock) OnUpdate(endpoints []api.Endpoints) { +func (h *EndpointsHandlerMock) OnEndpointsUpdate(endpoints []api.Endpoints) { sort.Sort(sortedEndpoints(endpoints)) h.endpoints = endpoints h.updated.Done() diff --git a/pkg/proxy/types.go b/pkg/proxy/types.go index fb759db744d..a3f309db278 100644 --- a/pkg/proxy/types.go +++ b/pkg/proxy/types.go @@ -25,10 +25,10 @@ import ( // ProxyProvider is the interface provided by proxier implementations. type ProxyProvider interface { - // OnUpdate manages the active set of service proxies. + // OnServiceUpdate manages the active set of service proxies. // Active service proxies are reinitialized if found in the update set or // removed if missing from the update set. - OnUpdate(services []api.Service) + OnServiceUpdate(services []api.Service) // SyncLoop runs periodic work. // This is expected to run as a goroutine or as the main loop of the app. // It does not return. diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 4b3ab339a62..25d58bf3410 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -265,7 +265,7 @@ const udpIdleTimeout = 1 * time.Second // OnUpdate manages the active set of service proxies. // Active service proxies are reinitialized if found in the update set or // shutdown if missing from the update set. -func (proxier *Proxier) OnUpdate(services []api.Service) { +func (proxier *Proxier) OnServiceUpdate(services []api.Service) { glog.V(4).Infof("Received update notice: %+v", services) activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set for i := range services { diff --git a/pkg/proxy/userspace/proxier_test.go b/pkg/proxy/userspace/proxier_test.go index fa841d1c70c..364d41dd32f 100644 --- a/pkg/proxy/userspace/proxier_test.go +++ b/pkg/proxy/userspace/proxier_test.go @@ -213,7 +213,7 @@ func waitForNumProxyLoops(t *testing.T, p *Proxier, want int32) { func TestTCPProxy(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} - lb.OnUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ @@ -240,7 +240,7 @@ func TestTCPProxy(t *testing.T) { func TestUDPProxy(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} - lb.OnUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ @@ -268,7 +268,7 @@ func TestMultiPortProxy(t *testing.T) { lb := NewLoadBalancerRR() serviceP := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo-p"}, "p"} serviceQ := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo-q"}, "q"} - lb.OnUpdate([]api.Endpoints{{ + lb.OnEndpointsUpdate([]api.Endpoints{{ ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, @@ -303,7 +303,7 @@ func TestMultiPortProxy(t *testing.T) { waitForNumProxyLoops(t, p, 2) } -func TestMultiPortOnUpdate(t *testing.T) { +func TestMultiPortOnServiceUpdate(t *testing.T) { lb := NewLoadBalancerRR() serviceP := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} serviceQ := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "q"} @@ -315,7 +315,7 @@ func TestMultiPortOnUpdate(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - p.OnUpdate([]api.Service{{ + p.OnServiceUpdate([]api.Service{{ ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Name: "p", @@ -362,7 +362,7 @@ func stopProxyByName(proxier *Proxier, service proxy.ServicePortName) error { func TestTCPProxyStop(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} - lb.OnUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, Subsets: []api.EndpointSubset{{ @@ -400,7 +400,7 @@ func TestTCPProxyStop(t *testing.T) { func TestUDPProxyStop(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} - lb.OnUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, Subsets: []api.EndpointSubset{{ @@ -438,7 +438,7 @@ func TestUDPProxyStop(t *testing.T) { func TestTCPProxyUpdateDelete(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} - lb.OnUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, Subsets: []api.EndpointSubset{{ @@ -465,7 +465,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - p.OnUpdate([]api.Service{}) + p.OnServiceUpdate([]api.Service{}) if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } @@ -475,7 +475,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) { func TestUDPProxyUpdateDelete(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} - lb.OnUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, Subsets: []api.EndpointSubset{{ @@ -502,7 +502,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - p.OnUpdate([]api.Service{}) + p.OnServiceUpdate([]api.Service{}) if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } @@ -512,7 +512,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) { func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} - lb.OnUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ @@ -539,13 +539,13 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - p.OnUpdate([]api.Service{}) + p.OnServiceUpdate([]api.Service{}) if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } waitForNumProxyLoops(t, p, 0) - p.OnUpdate([]api.Service{{ + p.OnServiceUpdate([]api.Service{{ ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Name: "p", @@ -564,7 +564,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} - lb.OnUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ @@ -591,13 +591,13 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - p.OnUpdate([]api.Service{}) + p.OnServiceUpdate([]api.Service{}) if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } waitForNumProxyLoops(t, p, 0) - p.OnUpdate([]api.Service{{ + p.OnServiceUpdate([]api.Service{{ ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Name: "p", @@ -616,7 +616,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { func TestTCPProxyUpdatePort(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} - lb.OnUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ @@ -639,7 +639,7 @@ func TestTCPProxyUpdatePort(t *testing.T) { testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) waitForNumProxyLoops(t, p, 1) - p.OnUpdate([]api.Service{{ + p.OnServiceUpdate([]api.Service{{ ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Name: "p", @@ -664,7 +664,7 @@ func TestTCPProxyUpdatePort(t *testing.T) { func TestUDPProxyUpdatePort(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} - lb.OnUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ @@ -686,7 +686,7 @@ func TestUDPProxyUpdatePort(t *testing.T) { } waitForNumProxyLoops(t, p, 1) - p.OnUpdate([]api.Service{{ + p.OnServiceUpdate([]api.Service{{ ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Name: "p", @@ -709,7 +709,7 @@ func TestUDPProxyUpdatePort(t *testing.T) { func TestProxyUpdatePublicIPs(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} - lb.OnUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ @@ -732,7 +732,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) { testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) waitForNumProxyLoops(t, p, 1) - p.OnUpdate([]api.Service{{ + p.OnServiceUpdate([]api.Service{{ ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ Ports: []api.ServicePort{{ @@ -761,7 +761,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) { func TestProxyUpdatePortal(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"} - lb.OnUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ @@ -784,7 +784,7 @@ func TestProxyUpdatePortal(t *testing.T) { testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) waitForNumProxyLoops(t, p, 1) - p.OnUpdate([]api.Service{{ + p.OnServiceUpdate([]api.Service{{ ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{ Name: "p", @@ -797,7 +797,7 @@ func TestProxyUpdatePortal(t *testing.T) { t.Fatalf("service with empty ClusterIP should not be included in the proxy") } - p.OnUpdate([]api.Service{{ + p.OnServiceUpdate([]api.Service{{ ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{ Name: "p", @@ -810,7 +810,7 @@ func TestProxyUpdatePortal(t *testing.T) { t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy") } - p.OnUpdate([]api.Service{{ + p.OnServiceUpdate([]api.Service{{ ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Name: "p", diff --git a/pkg/proxy/userspace/roundrobin.go b/pkg/proxy/userspace/roundrobin.go index e3599878e7e..efbd18bcec7 100644 --- a/pkg/proxy/userspace/roundrobin.go +++ b/pkg/proxy/userspace/roundrobin.go @@ -223,10 +223,10 @@ func (lb *LoadBalancerRR) updateAffinityMap(svcPort proxy.ServicePortName, newEn } } -// OnUpdate manages the registered service endpoints. +// OnEndpointsUpdate manages the registered service endpoints. // Registered endpoints are updated if found in the update set or // unregistered if missing from the update set. -func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) { +func (lb *LoadBalancerRR) OnEndpointsUpdate(allEndpoints []api.Endpoints) { registeredEndpoints := make(map[proxy.ServicePortName]bool) lb.lock.Lock() defer lb.lock.Unlock() @@ -262,7 +262,7 @@ func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) { if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) { glog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints) lb.updateAffinityMap(svcPort, newEndpoints) - // OnUpdate can be called without NewService being called externally. + // OnEndpointsUpdate can be called without NewService being called externally. // To be safe we will call it here. A new service will only be created // if one does not already exist. The affinity will be updated // later, once NewService is called. diff --git a/pkg/proxy/userspace/roundrobin_test.go b/pkg/proxy/userspace/roundrobin_test.go index bb0155dffdb..6194ec74f32 100644 --- a/pkg/proxy/userspace/roundrobin_test.go +++ b/pkg/proxy/userspace/roundrobin_test.go @@ -67,7 +67,7 @@ func TestFilterWorks(t *testing.T) { func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) { loadBalancer := NewLoadBalancerRR() var endpoints []api.Endpoints - loadBalancer.OnUpdate(endpoints) + loadBalancer.OnEndpointsUpdate(endpoints) service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "foo"}, "does-not-exist"} endpoint, err := loadBalancer.NextEndpoint(service, nil) if err == nil { @@ -103,7 +103,7 @@ func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) { Ports: []api.EndpointPort{{Name: "p", Port: 40}}, }}, } - loadBalancer.OnUpdate(endpoints) + loadBalancer.OnEndpointsUpdate(endpoints) expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil) expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil) expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil) @@ -141,7 +141,7 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { Ports: []api.EndpointPort{{Name: "p", Port: 1}, {Name: "p", Port: 2}, {Name: "p", Port: 3}}, }}, } - loadBalancer.OnUpdate(endpoints) + loadBalancer.OnEndpointsUpdate(endpoints) shuffledEndpoints := loadBalancer.services[service].endpoints if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") { @@ -175,7 +175,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsMultiplePorts(t *testing.T) { }, }, } - loadBalancer.OnUpdate(endpoints) + loadBalancer.OnEndpointsUpdate(endpoints) shuffledEndpoints := loadBalancer.services[serviceP].endpoints if !stringsInSlice(shuffledEndpoints, "endpoint1:1", "endpoint2:1", "endpoint3:3") { @@ -222,7 +222,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, }, } - loadBalancer.OnUpdate(endpoints) + loadBalancer.OnEndpointsUpdate(endpoints) shuffledEndpoints := loadBalancer.services[serviceP].endpoints if !stringsInSlice(shuffledEndpoints, "endpoint1:1", "endpoint2:2", "endpoint3:3") { @@ -257,7 +257,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, }, } - loadBalancer.OnUpdate(endpoints) + loadBalancer.OnEndpointsUpdate(endpoints) shuffledEndpoints = loadBalancer.services[serviceP].endpoints if !stringsInSlice(shuffledEndpoints, "endpoint4:4", "endpoint5:5") { @@ -279,7 +279,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { // Clear endpoints endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: nil} - loadBalancer.OnUpdate(endpoints) + loadBalancer.OnEndpointsUpdate(endpoints) endpoint, err = loadBalancer.NextEndpoint(serviceP, nil) if err == nil || len(endpoint) != 0 { @@ -314,7 +314,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { }, }, } - loadBalancer.OnUpdate(endpoints) + loadBalancer.OnEndpointsUpdate(endpoints) shuffledFooEndpoints := loadBalancer.services[fooServiceP].endpoints expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[0], nil) expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[1], nil) @@ -330,7 +330,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[1], nil) // Then update the configuration by removing foo - loadBalancer.OnUpdate(endpoints[1:]) + loadBalancer.OnEndpointsUpdate(endpoints[1:]) endpoint, err = loadBalancer.NextEndpoint(fooServiceP, nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") @@ -351,7 +351,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) { t.Errorf("Didn't fail with non-existent service") } - // Call NewService() before OnUpdate() + // Call NewService() before OnEndpointsUpdate() loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ @@ -362,7 +362,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) { {Addresses: []api.EndpointAddress{{IP: "endpoint3"}}, Ports: []api.EndpointPort{{Port: 3}}}, }, } - loadBalancer.OnUpdate(endpoints) + loadBalancer.OnEndpointsUpdate(endpoints) client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} @@ -408,7 +408,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) { t.Errorf("Didn't fail with non-existent service") } - // Call OnUpdate() before NewService() + // Call OnEndpointsUpdate() before NewService() endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, @@ -417,7 +417,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) { {Addresses: []api.EndpointAddress{{IP: "endpoint2"}}, Ports: []api.EndpointPort{{Port: 2}}}, }, } - loadBalancer.OnUpdate(endpoints) + loadBalancer.OnEndpointsUpdate(endpoints) loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} @@ -481,7 +481,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { }, }, } - loadBalancer.OnUpdate(endpoints) + loadBalancer.OnEndpointsUpdate(endpoints) shuffledEndpoints := loadBalancer.services[service].endpoints expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) client1Endpoint := shuffledEndpoints[0] @@ -501,7 +501,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { }, }, } - loadBalancer.OnUpdate(endpoints) + loadBalancer.OnEndpointsUpdate(endpoints) shuffledEndpoints = loadBalancer.services[service].endpoints if client1Endpoint == "endpoint:3" { client1Endpoint = shuffledEndpoints[0] @@ -523,7 +523,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { }, }, } - loadBalancer.OnUpdate(endpoints) + loadBalancer.OnEndpointsUpdate(endpoints) shuffledEndpoints = loadBalancer.services[service].endpoints expectEndpoint(t, loadBalancer, service, client1Endpoint, client1) expectEndpoint(t, loadBalancer, service, client2Endpoint, client2) @@ -555,7 +555,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, }, } - loadBalancer.OnUpdate(endpoints) + loadBalancer.OnEndpointsUpdate(endpoints) shuffledEndpoints := loadBalancer.services[service].endpoints expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) @@ -575,7 +575,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, }, } - loadBalancer.OnUpdate(endpoints) + loadBalancer.OnEndpointsUpdate(endpoints) shuffledEndpoints = loadBalancer.services[service].endpoints expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) @@ -586,7 +586,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { // Clear endpoints endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil} - loadBalancer.OnUpdate(endpoints) + loadBalancer.OnEndpointsUpdate(endpoints) endpoint, err = loadBalancer.NextEndpoint(service, nil) if err == nil || len(endpoint) != 0 { @@ -626,7 +626,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { }, }, } - loadBalancer.OnUpdate(endpoints) + loadBalancer.OnEndpointsUpdate(endpoints) shuffledFooEndpoints := loadBalancer.services[fooService].endpoints expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1) @@ -648,7 +648,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2) // Then update the configuration by removing foo - loadBalancer.OnUpdate(endpoints[1:]) + loadBalancer.OnEndpointsUpdate(endpoints[1:]) endpoint, err = loadBalancer.NextEndpoint(fooService, nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service")