Switch kube-proxy to informers

This commit is contained in:
Wojciech Tyczynski
2017-02-25 09:50:35 +01:00
parent 46dda7e32a
commit c789704e8e
6 changed files with 222 additions and 190 deletions

View File

@@ -130,22 +130,12 @@ func (h *EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints
}
}
func CreateServiceUpdate(op Operation, services ...api.Service) ServiceUpdate {
ret := ServiceUpdate{Op: op}
ret.Services = make([]api.Service, len(services))
for i, value := range services {
ret.Services[i] = value
}
return ret
func CreateServiceUpdate(op Operation, service *api.Service) ServiceUpdate {
return ServiceUpdate{Op: op, Service: service}
}
func CreateEndpointsUpdate(op Operation, endpoints ...api.Endpoints) EndpointsUpdate {
ret := EndpointsUpdate{Op: op}
ret.Endpoints = make([]api.Endpoints, len(endpoints))
for i, value := range endpoints {
ret.Endpoints[i] = value
}
return ret
func CreateEndpointsUpdate(op Operation, endpoints *api.Endpoints) EndpointsUpdate {
return EndpointsUpdate{Op: op, Endpoints: endpoints}
}
func TestNewServiceAddedAndNotified(t *testing.T) {
@@ -153,13 +143,12 @@ func TestNewServiceAddedAndNotified(t *testing.T) {
channel := config.Channel("one")
handler := NewServiceHandlerMock()
config.RegisterHandler(handler)
serviceUpdate := CreateServiceUpdate(ADD, api.Service{
serviceUpdate := CreateServiceUpdate(ADD, &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}},
})
channel <- serviceUpdate
handler.ValidateServices(t, serviceUpdate.Services)
handler.ValidateServices(t, []api.Service{*serviceUpdate.Service})
}
func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
@@ -167,34 +156,26 @@ func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
channel := config.Channel("one")
handler := NewServiceHandlerMock()
config.RegisterHandler(handler)
serviceUpdate := CreateServiceUpdate(ADD, api.Service{
serviceUpdate := CreateServiceUpdate(ADD, &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}},
})
channel <- serviceUpdate
handler.ValidateServices(t, serviceUpdate.Services)
handler.ValidateServices(t, []api.Service{*serviceUpdate.Service})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{
serviceUpdate2 := CreateServiceUpdate(ADD, &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}},
})
channel <- serviceUpdate2
services := []api.Service{serviceUpdate2.Services[0], serviceUpdate.Services[0]}
services := []api.Service{*serviceUpdate2.Service, *serviceUpdate.Service}
handler.ValidateServices(t, services)
serviceUpdate3 := CreateServiceUpdate(REMOVE, api.Service{
serviceUpdate3 := CreateServiceUpdate(REMOVE, &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
})
channel <- serviceUpdate3
services = []api.Service{serviceUpdate2.Services[0]}
handler.ValidateServices(t, services)
serviceUpdate4 := CreateServiceUpdate(SET, api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foobar"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 99}}},
})
channel <- serviceUpdate4
services = []api.Service{serviceUpdate4.Services[0]}
services = []api.Service{*serviceUpdate2.Service}
handler.ValidateServices(t, services)
}
@@ -207,17 +188,17 @@ func TestNewMultipleSourcesServicesAddedAndNotified(t *testing.T) {
}
handler := NewServiceHandlerMock()
config.RegisterHandler(handler)
serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{
serviceUpdate1 := CreateServiceUpdate(ADD, &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}},
})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{
serviceUpdate2 := CreateServiceUpdate(ADD, &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}},
})
channelOne <- serviceUpdate1
channelTwo <- serviceUpdate2
services := []api.Service{serviceUpdate2.Services[0], serviceUpdate1.Services[0]}
services := []api.Service{*serviceUpdate2.Service, *serviceUpdate1.Service}
handler.ValidateServices(t, services)
}
@@ -229,17 +210,17 @@ func TestNewMultipleSourcesServicesMultipleHandlersAddedAndNotified(t *testing.T
handler2 := NewServiceHandlerMock()
config.RegisterHandler(handler)
config.RegisterHandler(handler2)
serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{
serviceUpdate1 := CreateServiceUpdate(ADD, &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}},
})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{
serviceUpdate2 := CreateServiceUpdate(ADD, &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}},
})
channelOne <- serviceUpdate1
channelTwo <- serviceUpdate2
services := []api.Service{serviceUpdate2.Services[0], serviceUpdate1.Services[0]}
services := []api.Service{*serviceUpdate2.Service, *serviceUpdate1.Service}
handler.ValidateServices(t, services)
handler2.ValidateServices(t, services)
}
@@ -252,14 +233,14 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddedAndNotified(t *testing.
handler2 := NewEndpointsHandlerMock()
config.RegisterHandler(handler)
config.RegisterHandler(handler2)
endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{
endpointsUpdate1 := CreateEndpointsUpdate(ADD, &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.1.1.1"}, {IP: "2.2.2.2"}},
Ports: []api.EndpointPort{{Port: 80}},
}},
})
endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{
endpointsUpdate2 := CreateEndpointsUpdate(ADD, &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "3.3.3.3"}, {IP: "4.4.4.4"}},
@@ -269,7 +250,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddedAndNotified(t *testing.
channelOne <- endpointsUpdate1
channelTwo <- endpointsUpdate2
endpoints := []api.Endpoints{endpointsUpdate2.Endpoints[0], endpointsUpdate1.Endpoints[0]}
endpoints := []api.Endpoints{*endpointsUpdate2.Endpoints, *endpointsUpdate1.Endpoints}
handler.ValidateEndpoints(t, endpoints)
handler2.ValidateEndpoints(t, endpoints)
}
@@ -282,14 +263,14 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
handler2 := NewEndpointsHandlerMock()
config.RegisterHandler(handler)
config.RegisterHandler(handler2)
endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{
endpointsUpdate1 := CreateEndpointsUpdate(ADD, &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.1.1.1"}, {IP: "2.2.2.2"}},
Ports: []api.EndpointPort{{Port: 80}},
}},
})
endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{
endpointsUpdate2 := CreateEndpointsUpdate(ADD, &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "3.3.3.3"}, {IP: "4.4.4.4"}},
@@ -299,12 +280,12 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
channelOne <- endpointsUpdate1
channelTwo <- endpointsUpdate2
endpoints := []api.Endpoints{endpointsUpdate2.Endpoints[0], endpointsUpdate1.Endpoints[0]}
endpoints := []api.Endpoints{*endpointsUpdate2.Endpoints, *endpointsUpdate1.Endpoints}
handler.ValidateEndpoints(t, endpoints)
handler2.ValidateEndpoints(t, endpoints)
// Add one more
endpointsUpdate3 := CreateEndpointsUpdate(ADD, api.Endpoints{
endpointsUpdate3 := CreateEndpointsUpdate(ADD, &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foobar"},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "5.5.5.5"}, {IP: "6.6.6.6"}},
@@ -312,12 +293,12 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
}},
})
channelTwo <- endpointsUpdate3
endpoints = []api.Endpoints{endpointsUpdate2.Endpoints[0], endpointsUpdate1.Endpoints[0], endpointsUpdate3.Endpoints[0]}
endpoints = []api.Endpoints{*endpointsUpdate2.Endpoints, *endpointsUpdate1.Endpoints, *endpointsUpdate3.Endpoints}
handler.ValidateEndpoints(t, endpoints)
handler2.ValidateEndpoints(t, endpoints)
// Update the "foo" service with new endpoints
endpointsUpdate1 = CreateEndpointsUpdate(ADD, api.Endpoints{
endpointsUpdate1 = CreateEndpointsUpdate(ADD, &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "7.7.7.7"}},
@@ -325,15 +306,15 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
}},
})
channelOne <- endpointsUpdate1
endpoints = []api.Endpoints{endpointsUpdate2.Endpoints[0], endpointsUpdate1.Endpoints[0], endpointsUpdate3.Endpoints[0]}
endpoints = []api.Endpoints{*endpointsUpdate2.Endpoints, *endpointsUpdate1.Endpoints, *endpointsUpdate3.Endpoints}
handler.ValidateEndpoints(t, endpoints)
handler2.ValidateEndpoints(t, endpoints)
// Remove "bar" service
endpointsUpdate2 = CreateEndpointsUpdate(REMOVE, api.Endpoints{ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}})
endpointsUpdate2 = CreateEndpointsUpdate(REMOVE, &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}})
channelTwo <- endpointsUpdate2
endpoints = []api.Endpoints{endpointsUpdate1.Endpoints[0], endpointsUpdate3.Endpoints[0]}
endpoints = []api.Endpoints{*endpointsUpdate1.Endpoints, *endpointsUpdate3.Endpoints}
handler.ValidateEndpoints(t, endpoints)
handler2.ValidateEndpoints(t, endpoints)
}