Merge pull request #12440 from BenTheElder/proxy_config_handler_refactor

Refactor `pkg/proxy/config`'s ServiceConfigHandler and EndpointsConfigHandler.
This commit is contained in:
Alex Robinson 2015-08-10 09:44:38 -07:00
commit c5e221dca7
7 changed files with 62 additions and 62 deletions

View File

@ -57,17 +57,17 @@ type EndpointsUpdate struct {
// ServiceConfigHandler is an abstract interface of objects which receive update notifications for the set of services.
type ServiceConfigHandler interface {
// OnUpdate gets called when a configuration has been changed by one of the sources.
// OnServiceUpdate gets called when a configuration has been changed by one of the sources.
// This is the union of all the configuration sources.
OnUpdate(services []api.Service)
OnServiceUpdate(services []api.Service)
}
// EndpointsConfigHandler is an abstract interface of objects which receive update notifications for the set of endpoints.
type EndpointsConfigHandler interface {
// OnUpdate gets called when endpoints configuration is changed for a given
// OnEndpointsUpdate gets called when endpoints configuration is changed for a given
// service on any of the configuration sources. An example is when a new
// service comes up, or when containers come up or down for an existing service.
OnUpdate(endpoints []api.Endpoints)
OnEndpointsUpdate(endpoints []api.Endpoints)
}
// EndpointsConfig tracks a set of endpoints configurations.
@ -91,7 +91,7 @@ func NewEndpointsConfig() *EndpointsConfig {
func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) {
c.bcaster.Add(config.ListenerFunc(func(instance interface{}) {
handler.OnUpdate(instance.([]api.Endpoints))
handler.OnEndpointsUpdate(instance.([]api.Endpoints))
}))
}
@ -189,7 +189,7 @@ func NewServiceConfig() *ServiceConfig {
func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {
c.bcaster.Add(config.ListenerFunc(func(instance interface{}) {
handler.OnUpdate(instance.([]api.Service))
handler.OnServiceUpdate(instance.([]api.Service))
}))
}

View File

@ -57,7 +57,7 @@ func NewServiceHandlerMock() *ServiceHandlerMock {
return &ServiceHandlerMock{services: make([]api.Service, 0)}
}
func (h *ServiceHandlerMock) OnUpdate(services []api.Service) {
func (h *ServiceHandlerMock) OnServiceUpdate(services []api.Service) {
sort.Sort(sortedServices(services))
h.services = services
h.updated.Done()
@ -95,7 +95,7 @@ func NewEndpointsHandlerMock() *EndpointsHandlerMock {
return &EndpointsHandlerMock{endpoints: make([]api.Endpoints, 0)}
}
func (h *EndpointsHandlerMock) OnUpdate(endpoints []api.Endpoints) {
func (h *EndpointsHandlerMock) OnEndpointsUpdate(endpoints []api.Endpoints) {
sort.Sort(sortedEndpoints(endpoints))
h.endpoints = endpoints
h.updated.Done()

View File

@ -25,10 +25,10 @@ import (
// ProxyProvider is the interface provided by proxier implementations.
type ProxyProvider interface {
// OnUpdate manages the active set of service proxies.
// OnServiceUpdate manages the active set of service proxies.
// Active service proxies are reinitialized if found in the update set or
// removed if missing from the update set.
OnUpdate(services []api.Service)
OnServiceUpdate(services []api.Service)
// SyncLoop runs periodic work.
// This is expected to run as a goroutine or as the main loop of the app.
// It does not return.

View File

@ -265,7 +265,7 @@ const udpIdleTimeout = 1 * time.Second
// OnUpdate manages the active set of service proxies.
// Active service proxies are reinitialized if found in the update set or
// shutdown if missing from the update set.
func (proxier *Proxier) OnUpdate(services []api.Service) {
func (proxier *Proxier) OnServiceUpdate(services []api.Service) {
glog.V(4).Infof("Received update notice: %+v", services)
activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set
for i := range services {

View File

@ -213,7 +213,7 @@ func waitForNumProxyLoops(t *testing.T, p *Proxier, want int32) {
func TestTCPProxy(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
lb.OnEndpointsUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
@ -240,7 +240,7 @@ func TestTCPProxy(t *testing.T) {
func TestUDPProxy(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
lb.OnEndpointsUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
@ -268,7 +268,7 @@ func TestMultiPortProxy(t *testing.T) {
lb := NewLoadBalancerRR()
serviceP := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo-p"}, "p"}
serviceQ := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo-q"}, "q"}
lb.OnUpdate([]api.Endpoints{{
lb.OnEndpointsUpdate([]api.Endpoints{{
ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
@ -303,7 +303,7 @@ func TestMultiPortProxy(t *testing.T) {
waitForNumProxyLoops(t, p, 2)
}
func TestMultiPortOnUpdate(t *testing.T) {
func TestMultiPortOnServiceUpdate(t *testing.T) {
lb := NewLoadBalancerRR()
serviceP := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
serviceQ := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "q"}
@ -315,7 +315,7 @@ func TestMultiPortOnUpdate(t *testing.T) {
}
waitForNumProxyLoops(t, p, 0)
p.OnUpdate([]api.Service{{
p.OnServiceUpdate([]api.Service{{
ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
Name: "p",
@ -362,7 +362,7 @@ func stopProxyByName(proxier *Proxier, service proxy.ServicePortName) error {
func TestTCPProxyStop(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
lb.OnEndpointsUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Subsets: []api.EndpointSubset{{
@ -400,7 +400,7 @@ func TestTCPProxyStop(t *testing.T) {
func TestUDPProxyStop(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
lb.OnEndpointsUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Subsets: []api.EndpointSubset{{
@ -438,7 +438,7 @@ func TestUDPProxyStop(t *testing.T) {
func TestTCPProxyUpdateDelete(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
lb.OnEndpointsUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Subsets: []api.EndpointSubset{{
@ -465,7 +465,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
conn.Close()
waitForNumProxyLoops(t, p, 1)
p.OnUpdate([]api.Service{})
p.OnServiceUpdate([]api.Service{})
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
}
@ -475,7 +475,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
func TestUDPProxyUpdateDelete(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
lb.OnEndpointsUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Subsets: []api.EndpointSubset{{
@ -502,7 +502,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
conn.Close()
waitForNumProxyLoops(t, p, 1)
p.OnUpdate([]api.Service{})
p.OnServiceUpdate([]api.Service{})
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
}
@ -512,7 +512,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
lb.OnEndpointsUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
@ -539,13 +539,13 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
conn.Close()
waitForNumProxyLoops(t, p, 1)
p.OnUpdate([]api.Service{})
p.OnServiceUpdate([]api.Service{})
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
}
waitForNumProxyLoops(t, p, 0)
p.OnUpdate([]api.Service{{
p.OnServiceUpdate([]api.Service{{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
Name: "p",
@ -564,7 +564,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
lb.OnEndpointsUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
@ -591,13 +591,13 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
conn.Close()
waitForNumProxyLoops(t, p, 1)
p.OnUpdate([]api.Service{})
p.OnServiceUpdate([]api.Service{})
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
}
waitForNumProxyLoops(t, p, 0)
p.OnUpdate([]api.Service{{
p.OnServiceUpdate([]api.Service{{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
Name: "p",
@ -616,7 +616,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
func TestTCPProxyUpdatePort(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
lb.OnEndpointsUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
@ -639,7 +639,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1)
p.OnUpdate([]api.Service{{
p.OnServiceUpdate([]api.Service{{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
Name: "p",
@ -664,7 +664,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
func TestUDPProxyUpdatePort(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
lb.OnEndpointsUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
@ -686,7 +686,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
}
waitForNumProxyLoops(t, p, 1)
p.OnUpdate([]api.Service{{
p.OnServiceUpdate([]api.Service{{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
Name: "p",
@ -709,7 +709,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
func TestProxyUpdatePublicIPs(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
lb.OnEndpointsUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
@ -732,7 +732,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1)
p.OnUpdate([]api.Service{{
p.OnServiceUpdate([]api.Service{{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{{
@ -761,7 +761,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
func TestProxyUpdatePortal(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
lb.OnEndpointsUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
@ -784,7 +784,7 @@ func TestProxyUpdatePortal(t *testing.T) {
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1)
p.OnUpdate([]api.Service{{
p.OnServiceUpdate([]api.Service{{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{
Name: "p",
@ -797,7 +797,7 @@ func TestProxyUpdatePortal(t *testing.T) {
t.Fatalf("service with empty ClusterIP should not be included in the proxy")
}
p.OnUpdate([]api.Service{{
p.OnServiceUpdate([]api.Service{{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{
Name: "p",
@ -810,7 +810,7 @@ func TestProxyUpdatePortal(t *testing.T) {
t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy")
}
p.OnUpdate([]api.Service{{
p.OnServiceUpdate([]api.Service{{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
Name: "p",

View File

@ -223,10 +223,10 @@ func (lb *LoadBalancerRR) updateAffinityMap(svcPort proxy.ServicePortName, newEn
}
}
// OnUpdate manages the registered service endpoints.
// OnEndpointsUpdate manages the registered service endpoints.
// Registered endpoints are updated if found in the update set or
// unregistered if missing from the update set.
func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) {
func (lb *LoadBalancerRR) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
registeredEndpoints := make(map[proxy.ServicePortName]bool)
lb.lock.Lock()
defer lb.lock.Unlock()
@ -262,7 +262,7 @@ func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) {
if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) {
glog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints)
lb.updateAffinityMap(svcPort, newEndpoints)
// OnUpdate can be called without NewService being called externally.
// OnEndpointsUpdate can be called without NewService being called externally.
// To be safe we will call it here. A new service will only be created
// if one does not already exist. The affinity will be updated
// later, once NewService is called.

View File

@ -67,7 +67,7 @@ func TestFilterWorks(t *testing.T) {
func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
var endpoints []api.Endpoints
loadBalancer.OnUpdate(endpoints)
loadBalancer.OnEndpointsUpdate(endpoints)
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "foo"}, "does-not-exist"}
endpoint, err := loadBalancer.NextEndpoint(service, nil)
if err == nil {
@ -103,7 +103,7 @@ func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
Ports: []api.EndpointPort{{Name: "p", Port: 40}},
}},
}
loadBalancer.OnUpdate(endpoints)
loadBalancer.OnEndpointsUpdate(endpoints)
expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
@ -141,7 +141,7 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) {
Ports: []api.EndpointPort{{Name: "p", Port: 1}, {Name: "p", Port: 2}, {Name: "p", Port: 3}},
}},
}
loadBalancer.OnUpdate(endpoints)
loadBalancer.OnEndpointsUpdate(endpoints)
shuffledEndpoints := loadBalancer.services[service].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") {
@ -175,7 +175,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsMultiplePorts(t *testing.T) {
},
},
}
loadBalancer.OnUpdate(endpoints)
loadBalancer.OnEndpointsUpdate(endpoints)
shuffledEndpoints := loadBalancer.services[serviceP].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint1:1", "endpoint2:1", "endpoint3:3") {
@ -222,7 +222,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
},
},
}
loadBalancer.OnUpdate(endpoints)
loadBalancer.OnEndpointsUpdate(endpoints)
shuffledEndpoints := loadBalancer.services[serviceP].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint1:1", "endpoint2:2", "endpoint3:3") {
@ -257,7 +257,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
},
},
}
loadBalancer.OnUpdate(endpoints)
loadBalancer.OnEndpointsUpdate(endpoints)
shuffledEndpoints = loadBalancer.services[serviceP].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint4:4", "endpoint5:5") {
@ -279,7 +279,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
// Clear endpoints
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: nil}
loadBalancer.OnUpdate(endpoints)
loadBalancer.OnEndpointsUpdate(endpoints)
endpoint, err = loadBalancer.NextEndpoint(serviceP, nil)
if err == nil || len(endpoint) != 0 {
@ -314,7 +314,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
},
},
}
loadBalancer.OnUpdate(endpoints)
loadBalancer.OnEndpointsUpdate(endpoints)
shuffledFooEndpoints := loadBalancer.services[fooServiceP].endpoints
expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[1], nil)
@ -330,7 +330,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[1], nil)
// Then update the configuration by removing foo
loadBalancer.OnUpdate(endpoints[1:])
loadBalancer.OnEndpointsUpdate(endpoints[1:])
endpoint, err = loadBalancer.NextEndpoint(fooServiceP, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
@ -351,7 +351,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) {
t.Errorf("Didn't fail with non-existent service")
}
// Call NewService() before OnUpdate()
// Call NewService() before OnEndpointsUpdate()
loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
@ -362,7 +362,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) {
{Addresses: []api.EndpointAddress{{IP: "endpoint3"}}, Ports: []api.EndpointPort{{Port: 3}}},
},
}
loadBalancer.OnUpdate(endpoints)
loadBalancer.OnEndpointsUpdate(endpoints)
client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
@ -408,7 +408,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) {
t.Errorf("Didn't fail with non-existent service")
}
// Call OnUpdate() before NewService()
// Call OnEndpointsUpdate() before NewService()
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
@ -417,7 +417,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) {
{Addresses: []api.EndpointAddress{{IP: "endpoint2"}}, Ports: []api.EndpointPort{{Port: 2}}},
},
}
loadBalancer.OnUpdate(endpoints)
loadBalancer.OnEndpointsUpdate(endpoints)
loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0)
client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
@ -481,7 +481,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
},
},
}
loadBalancer.OnUpdate(endpoints)
loadBalancer.OnEndpointsUpdate(endpoints)
shuffledEndpoints := loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
client1Endpoint := shuffledEndpoints[0]
@ -501,7 +501,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
},
},
}
loadBalancer.OnUpdate(endpoints)
loadBalancer.OnEndpointsUpdate(endpoints)
shuffledEndpoints = loadBalancer.services[service].endpoints
if client1Endpoint == "endpoint:3" {
client1Endpoint = shuffledEndpoints[0]
@ -523,7 +523,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
},
},
}
loadBalancer.OnUpdate(endpoints)
loadBalancer.OnEndpointsUpdate(endpoints)
shuffledEndpoints = loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, client1Endpoint, client1)
expectEndpoint(t, loadBalancer, service, client2Endpoint, client2)
@ -555,7 +555,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
},
},
}
loadBalancer.OnUpdate(endpoints)
loadBalancer.OnEndpointsUpdate(endpoints)
shuffledEndpoints := loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
@ -575,7 +575,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
},
},
}
loadBalancer.OnUpdate(endpoints)
loadBalancer.OnEndpointsUpdate(endpoints)
shuffledEndpoints = loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
@ -586,7 +586,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
// Clear endpoints
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil}
loadBalancer.OnUpdate(endpoints)
loadBalancer.OnEndpointsUpdate(endpoints)
endpoint, err = loadBalancer.NextEndpoint(service, nil)
if err == nil || len(endpoint) != 0 {
@ -626,7 +626,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
},
},
}
loadBalancer.OnUpdate(endpoints)
loadBalancer.OnEndpointsUpdate(endpoints)
shuffledFooEndpoints := loadBalancer.services[fooService].endpoints
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
@ -648,7 +648,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
// Then update the configuration by removing foo
loadBalancer.OnUpdate(endpoints[1:])
loadBalancer.OnEndpointsUpdate(endpoints[1:])
endpoint, err = loadBalancer.NextEndpoint(fooService, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")