Belatedly port pkg/proxy/config Endpoints unit tests to EndpointSlice

This commit is contained in:
Dan Winship 2022-12-30 19:51:35 -05:00
parent 00aae4c10c
commit df7e4a9432
2 changed files with 159 additions and 126 deletions

View File

@ -22,6 +22,7 @@ import (
"time"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
@ -29,6 +30,7 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
ktesting "k8s.io/client-go/testing"
utilpointer "k8s.io/utils/pointer"
)
func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
@ -81,71 +83,84 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
}
func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
endpoints1v1 := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e1"},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{
{IP: "1.2.3.4"},
tcp := v1.ProtocolTCP
endpoints1v1 := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e1"},
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: []discoveryv1.Endpoint{{
Addresses: []string{
"1.2.3.4",
},
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
}},
Ports: []discoveryv1.EndpointPort{{
Port: utilpointer.Int32(8080),
Protocol: &tcp,
}},
}
endpoints1v2 := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e1"},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{
{IP: "1.2.3.4"},
{IP: "4.3.2.1"},
endpoints1v2 := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e1"},
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: []discoveryv1.Endpoint{{
Addresses: []string{
"1.2.3.4",
"4.3.2.1",
},
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
}},
Ports: []discoveryv1.EndpointPort{{
Port: utilpointer.Int32(8080),
Protocol: &tcp,
}},
}
endpoints2 := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e2"},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{
{IP: "5.6.7.8"},
endpoints2 := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e2"},
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: []discoveryv1.Endpoint{{
Addresses: []string{
"5.6.7.8",
},
Ports: []v1.EndpointPort{{Port: 80, Protocol: "TCP"}},
}},
Ports: []discoveryv1.EndpointPort{{
Port: utilpointer.Int32(8080),
Protocol: &tcp,
}},
}
// Setup fake api client.
client := fake.NewSimpleClientset()
fakeWatch := watch.NewFake()
client.PrependWatchReactor("endpoints", ktesting.DefaultWatchReactor(fakeWatch, nil))
client.PrependWatchReactor("endpointslices", ktesting.DefaultWatchReactor(fakeWatch, nil))
stopCh := make(chan struct{})
defer close(stopCh)
handler := NewEndpointsHandlerMock()
handler := NewEndpointSliceHandlerMock()
sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
endpointsConfig := NewEndpointsConfig(sharedInformers.Core().V1().Endpoints(), time.Minute)
endpointsConfig.RegisterEventHandler(handler)
endpointsliceConfig := NewEndpointSliceConfig(sharedInformers.Discovery().V1().EndpointSlices(), time.Minute)
endpointsliceConfig.RegisterEventHandler(handler)
go sharedInformers.Start(stopCh)
go endpointsConfig.Run(stopCh)
go endpointsliceConfig.Run(stopCh)
// Add the first endpoints
fakeWatch.Add(endpoints1v1)
handler.ValidateEndpoints(t, []*v1.Endpoints{endpoints1v1})
handler.ValidateEndpointSlices(t, []*discoveryv1.EndpointSlice{endpoints1v1})
// Add another endpoints
fakeWatch.Add(endpoints2)
handler.ValidateEndpoints(t, []*v1.Endpoints{endpoints1v1, endpoints2})
handler.ValidateEndpointSlices(t, []*discoveryv1.EndpointSlice{endpoints1v1, endpoints2})
// Modify endpoints1
fakeWatch.Modify(endpoints1v2)
handler.ValidateEndpoints(t, []*v1.Endpoints{endpoints1v2, endpoints2})
handler.ValidateEndpointSlices(t, []*discoveryv1.EndpointSlice{endpoints1v2, endpoints2})
// Delete endpoints1
fakeWatch.Delete(endpoints1v2)
handler.ValidateEndpoints(t, []*v1.Endpoints{endpoints2})
handler.ValidateEndpointSlices(t, []*discoveryv1.EndpointSlice{endpoints2})
// Delete endpoints2
fakeWatch.Delete(endpoints2)
handler.ValidateEndpoints(t, []*v1.Endpoints{})
handler.ValidateEndpointSlices(t, []*discoveryv1.EndpointSlice{})
}
func TestInitialSync(t *testing.T) {
@ -157,10 +172,10 @@ func TestInitialSync(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 10}}},
}
eps1 := &v1.Endpoints{
eps1 := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
}
eps2 := &v1.Endpoints{
eps2 := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
}
@ -168,7 +183,7 @@ func TestInitialSync(t *testing.T) {
{Name: svc1.Name, Namespace: svc1.Namespace}: svc1,
{Name: svc2.Name, Namespace: svc2.Namespace}: svc2,
}
expectedEpsState := map[types.NamespacedName]*v1.Endpoints{
expectedEpsState := map[types.NamespacedName]*discoveryv1.EndpointSlice{
{Name: eps1.Name, Namespace: eps1.Namespace}: eps1,
{Name: eps2.Name, Namespace: eps2.Namespace}: eps2,
}
@ -181,8 +196,8 @@ func TestInitialSync(t *testing.T) {
svcHandler := NewServiceHandlerMock()
svcConfig.RegisterEventHandler(svcHandler)
epsConfig := NewEndpointsConfig(sharedInformers.Core().V1().Endpoints(), 0)
epsHandler := NewEndpointsHandlerMock()
epsConfig := NewEndpointSliceConfig(sharedInformers.Discovery().V1().EndpointSlices(), 0)
epsHandler := NewEndpointSliceHandlerMock()
epsConfig.RegisterEventHandler(epsHandler)
stopCh := make(chan struct{})
@ -226,7 +241,7 @@ func TestInitialSync(t *testing.T) {
}
gotEps := <-epsHandler.updated
gotEpsState := make(map[types.NamespacedName]*v1.Endpoints, len(gotEps))
gotEpsState := make(map[types.NamespacedName]*discoveryv1.EndpointSlice, len(gotEps))
for _, eps := range gotEps {
gotEpsState[types.NamespacedName{Namespace: eps.Namespace, Name: eps.Name}] = eps
}

View File

@ -24,6 +24,7 @@ import (
"time"
"k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
@ -31,6 +32,7 @@ import (
informers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
ktesting "k8s.io/client-go/testing"
utilpointer "k8s.io/utils/pointer"
)
type sortedServices []*v1.Service
@ -128,96 +130,96 @@ func (h *ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []*
}
}
type sortedEndpoints []*v1.Endpoints
type sortedEndpointSlices []*discoveryv1.EndpointSlice
func (s sortedEndpoints) Len() int {
func (s sortedEndpointSlices) Len() int {
return len(s)
}
func (s sortedEndpoints) Swap(i, j int) {
func (s sortedEndpointSlices) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s sortedEndpoints) Less(i, j int) bool {
func (s sortedEndpointSlices) Less(i, j int) bool {
return s[i].Name < s[j].Name
}
type EndpointsHandlerMock struct {
type EndpointSliceHandlerMock struct {
lock sync.Mutex
state map[types.NamespacedName]*v1.Endpoints
state map[types.NamespacedName]*discoveryv1.EndpointSlice
synced bool
updated chan []*v1.Endpoints
process func([]*v1.Endpoints)
updated chan []*discoveryv1.EndpointSlice
process func([]*discoveryv1.EndpointSlice)
}
func NewEndpointsHandlerMock() *EndpointsHandlerMock {
ehm := &EndpointsHandlerMock{
state: make(map[types.NamespacedName]*v1.Endpoints),
updated: make(chan []*v1.Endpoints, 5),
func NewEndpointSliceHandlerMock() *EndpointSliceHandlerMock {
ehm := &EndpointSliceHandlerMock{
state: make(map[types.NamespacedName]*discoveryv1.EndpointSlice),
updated: make(chan []*discoveryv1.EndpointSlice, 5),
}
ehm.process = func(endpoints []*v1.Endpoints) {
ehm.process = func(endpoints []*discoveryv1.EndpointSlice) {
ehm.updated <- endpoints
}
return ehm
}
func (h *EndpointsHandlerMock) OnEndpointsAdd(endpoints *v1.Endpoints) {
func (h *EndpointSliceHandlerMock) OnEndpointSliceAdd(slice *discoveryv1.EndpointSlice) {
h.lock.Lock()
defer h.lock.Unlock()
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
h.state[namespacedName] = endpoints
h.sendEndpoints()
namespacedName := types.NamespacedName{Namespace: slice.Namespace, Name: slice.Name}
h.state[namespacedName] = slice
h.sendEndpointSlices()
}
func (h *EndpointsHandlerMock) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
func (h *EndpointSliceHandlerMock) OnEndpointSliceUpdate(oldSlice, slice *discoveryv1.EndpointSlice) {
h.lock.Lock()
defer h.lock.Unlock()
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
h.state[namespacedName] = endpoints
h.sendEndpoints()
namespacedName := types.NamespacedName{Namespace: slice.Namespace, Name: slice.Name}
h.state[namespacedName] = slice
h.sendEndpointSlices()
}
func (h *EndpointsHandlerMock) OnEndpointsDelete(endpoints *v1.Endpoints) {
func (h *EndpointSliceHandlerMock) OnEndpointSliceDelete(slice *discoveryv1.EndpointSlice) {
h.lock.Lock()
defer h.lock.Unlock()
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
namespacedName := types.NamespacedName{Namespace: slice.Namespace, Name: slice.Name}
delete(h.state, namespacedName)
h.sendEndpoints()
h.sendEndpointSlices()
}
func (h *EndpointsHandlerMock) OnEndpointsSynced() {
func (h *EndpointSliceHandlerMock) OnEndpointSlicesSynced() {
h.lock.Lock()
defer h.lock.Unlock()
h.synced = true
h.sendEndpoints()
h.sendEndpointSlices()
}
func (h *EndpointsHandlerMock) sendEndpoints() {
func (h *EndpointSliceHandlerMock) sendEndpointSlices() {
if !h.synced {
return
}
endpoints := make([]*v1.Endpoints, 0, len(h.state))
slices := make([]*discoveryv1.EndpointSlice, 0, len(h.state))
for _, eps := range h.state {
endpoints = append(endpoints, eps)
slices = append(slices, eps)
}
sort.Sort(sortedEndpoints(endpoints))
h.process(endpoints)
sort.Sort(sortedEndpointSlices(slices))
h.process(slices)
}
func (h *EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints []*v1.Endpoints) {
// We might get 1 or more updates for N endpoint updates, because we
// over write older snapshots of endpoints from the producer go-routine
func (h *EndpointSliceHandlerMock) ValidateEndpointSlices(t *testing.T, expectedSlices []*discoveryv1.EndpointSlice) {
// We might get 1 or more updates for N endpointslice updates, because we
// over write older snapshots of endpointslices from the producer go-routine
// if the consumer falls behind. Unittests will hard timeout in 5m.
var endpoints []*v1.Endpoints
var slices []*discoveryv1.EndpointSlice
for {
select {
case endpoints = <-h.updated:
if reflect.DeepEqual(endpoints, expectedEndpoints) {
case slices = <-h.updated:
if reflect.DeepEqual(slices, expectedSlices) {
return
}
// Unittests will hard timeout in 5m with a stack trace, prevent that
// and surface a clearer reason for failure.
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Timed out. Expected %#v, Got %#v", expectedEndpoints, endpoints)
t.Errorf("Timed out. Expected %#v, Got %#v", expectedSlices, slices)
return
}
}
@ -320,113 +322,129 @@ func TestNewServicesMultipleHandlersAddedAndNotified(t *testing.T) {
func TestNewEndpointsMultipleHandlersAddedAndNotified(t *testing.T) {
client := fake.NewSimpleClientset()
fakeWatch := watch.NewFake()
client.PrependWatchReactor("endpoints", ktesting.DefaultWatchReactor(fakeWatch, nil))
client.PrependWatchReactor("endpointslices", ktesting.DefaultWatchReactor(fakeWatch, nil))
stopCh := make(chan struct{})
defer close(stopCh)
sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
config := NewEndpointsConfig(sharedInformers.Core().V1().Endpoints(), time.Minute)
handler := NewEndpointsHandlerMock()
handler2 := NewEndpointsHandlerMock()
config := NewEndpointSliceConfig(sharedInformers.Discovery().V1().EndpointSlices(), time.Minute)
handler := NewEndpointSliceHandlerMock()
handler2 := NewEndpointSliceHandlerMock()
config.RegisterEventHandler(handler)
config.RegisterEventHandler(handler2)
go sharedInformers.Start(stopCh)
go config.Run(stopCh)
endpoints1 := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "1.1.1.1"}, {IP: "2.2.2.2"}},
Ports: []v1.EndpointPort{{Port: 80}},
endpoints1 := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: []discoveryv1.Endpoint{{
Addresses: []string{"1.1.1.1"},
}, {
Addresses: []string{"2.2.2.2"},
}},
Ports: []discoveryv1.EndpointPort{{Port: utilpointer.Int32(80)}},
}
endpoints2 := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "3.3.3.3"}, {IP: "4.4.4.4"}},
Ports: []v1.EndpointPort{{Port: 80}},
endpoints2 := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: []discoveryv1.Endpoint{{
Addresses: []string{"3.3.3.3"},
}, {
Addresses: []string{"4.4.4.4"},
}},
Ports: []discoveryv1.EndpointPort{{Port: utilpointer.Int32(80)}},
}
fakeWatch.Add(endpoints1)
fakeWatch.Add(endpoints2)
endpoints := []*v1.Endpoints{endpoints2, endpoints1}
handler.ValidateEndpoints(t, endpoints)
handler2.ValidateEndpoints(t, endpoints)
endpoints := []*discoveryv1.EndpointSlice{endpoints2, endpoints1}
handler.ValidateEndpointSlices(t, endpoints)
handler2.ValidateEndpointSlices(t, endpoints)
}
func TestNewEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) {
client := fake.NewSimpleClientset()
fakeWatch := watch.NewFake()
client.PrependWatchReactor("endpoints", ktesting.DefaultWatchReactor(fakeWatch, nil))
client.PrependWatchReactor("endpointslices", ktesting.DefaultWatchReactor(fakeWatch, nil))
stopCh := make(chan struct{})
defer close(stopCh)
sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
config := NewEndpointsConfig(sharedInformers.Core().V1().Endpoints(), time.Minute)
handler := NewEndpointsHandlerMock()
handler2 := NewEndpointsHandlerMock()
config := NewEndpointSliceConfig(sharedInformers.Discovery().V1().EndpointSlices(), time.Minute)
handler := NewEndpointSliceHandlerMock()
handler2 := NewEndpointSliceHandlerMock()
config.RegisterEventHandler(handler)
config.RegisterEventHandler(handler2)
go sharedInformers.Start(stopCh)
go config.Run(stopCh)
endpoints1 := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "1.1.1.1"}, {IP: "2.2.2.2"}},
Ports: []v1.EndpointPort{{Port: 80}},
endpoints1 := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: []discoveryv1.Endpoint{{
Addresses: []string{"1.1.1.1"},
}, {
Addresses: []string{"2.2.2.2"},
}},
Ports: []discoveryv1.EndpointPort{{Port: utilpointer.Int32(80)}},
}
endpoints2 := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "3.3.3.3"}, {IP: "4.4.4.4"}},
Ports: []v1.EndpointPort{{Port: 80}},
endpoints2 := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: []discoveryv1.Endpoint{{
Addresses: []string{"3.3.3.3"},
}, {
Addresses: []string{"4.4.4.4"},
}},
Ports: []discoveryv1.EndpointPort{{Port: utilpointer.Int32(80)}},
}
fakeWatch.Add(endpoints1)
fakeWatch.Add(endpoints2)
endpoints := []*v1.Endpoints{endpoints2, endpoints1}
handler.ValidateEndpoints(t, endpoints)
handler2.ValidateEndpoints(t, endpoints)
endpoints := []*discoveryv1.EndpointSlice{endpoints2, endpoints1}
handler.ValidateEndpointSlices(t, endpoints)
handler2.ValidateEndpointSlices(t, endpoints)
// Add one more
endpoints3 := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foobar"},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "5.5.5.5"}, {IP: "6.6.6.6"}},
Ports: []v1.EndpointPort{{Port: 80}},
endpoints3 := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foobar"},
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: []discoveryv1.Endpoint{{
Addresses: []string{"5.5.5.5"},
}, {
Addresses: []string{"6.6.6.6"},
}},
Ports: []discoveryv1.EndpointPort{{Port: utilpointer.Int32(80)}},
}
fakeWatch.Add(endpoints3)
endpoints = []*v1.Endpoints{endpoints2, endpoints1, endpoints3}
handler.ValidateEndpoints(t, endpoints)
handler2.ValidateEndpoints(t, endpoints)
endpoints = []*discoveryv1.EndpointSlice{endpoints2, endpoints1, endpoints3}
handler.ValidateEndpointSlices(t, endpoints)
handler2.ValidateEndpointSlices(t, endpoints)
// Update the "foo" service with new endpoints
endpoints1v2 := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "7.7.7.7"}},
Ports: []v1.EndpointPort{{Port: 80}},
endpoints1v2 := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: []discoveryv1.Endpoint{{
Addresses: []string{"7.7.7.7"},
}},
Ports: []discoveryv1.EndpointPort{{Port: utilpointer.Int32(80)}},
}
fakeWatch.Modify(endpoints1v2)
endpoints = []*v1.Endpoints{endpoints2, endpoints1v2, endpoints3}
handler.ValidateEndpoints(t, endpoints)
handler2.ValidateEndpoints(t, endpoints)
endpoints = []*discoveryv1.EndpointSlice{endpoints2, endpoints1v2, endpoints3}
handler.ValidateEndpointSlices(t, endpoints)
handler2.ValidateEndpointSlices(t, endpoints)
// Remove "bar" endpoints
fakeWatch.Delete(endpoints2)
endpoints = []*v1.Endpoints{endpoints1v2, endpoints3}
handler.ValidateEndpoints(t, endpoints)
handler2.ValidateEndpoints(t, endpoints)
endpoints = []*discoveryv1.EndpointSlice{endpoints1v2, endpoints3}
handler.ValidateEndpointSlices(t, endpoints)
handler2.ValidateEndpointSlices(t, endpoints)
}
// TODO: Add a unittest for interrupts getting processed in a timely manner.