diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go index 455c8f14836..422f4f03f16 100644 --- a/pkg/proxy/config/api_test.go +++ b/pkg/proxy/config/api_test.go @@ -22,6 +22,7 @@ import ( "time" v1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" @@ -29,6 +30,7 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" ktesting "k8s.io/client-go/testing" + utilpointer "k8s.io/utils/pointer" ) func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) { @@ -81,71 +83,84 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) { } func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) { - endpoints1v1 := &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e1"}, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{ - {IP: "1.2.3.4"}, + tcp := v1.ProtocolTCP + endpoints1v1 := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e1"}, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{{ + Addresses: []string{ + "1.2.3.4", }, - Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + }}, + Ports: []discoveryv1.EndpointPort{{ + Port: utilpointer.Int32(8080), + Protocol: &tcp, }}, } - endpoints1v2 := &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e1"}, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{ - {IP: "1.2.3.4"}, - {IP: "4.3.2.1"}, + endpoints1v2 := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e1"}, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{{ + Addresses: []string{ + "1.2.3.4", + "4.3.2.1", }, - Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + }}, + Ports: []discoveryv1.EndpointPort{{ + Port: utilpointer.Int32(8080), + Protocol: &tcp, }}, } - endpoints2 := &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e2"}, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{ - {IP: "5.6.7.8"}, + endpoints2 := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e2"}, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{{ + Addresses: []string{ + "5.6.7.8", }, - Ports: []v1.EndpointPort{{Port: 80, Protocol: "TCP"}}, + }}, + Ports: []discoveryv1.EndpointPort{{ + Port: utilpointer.Int32(8080), + Protocol: &tcp, }}, } // Setup fake api client. client := fake.NewSimpleClientset() fakeWatch := watch.NewFake() - client.PrependWatchReactor("endpoints", ktesting.DefaultWatchReactor(fakeWatch, nil)) + client.PrependWatchReactor("endpointslices", ktesting.DefaultWatchReactor(fakeWatch, nil)) stopCh := make(chan struct{}) defer close(stopCh) - handler := NewEndpointsHandlerMock() + handler := NewEndpointSliceHandlerMock() sharedInformers := informers.NewSharedInformerFactory(client, time.Minute) - endpointsConfig := NewEndpointsConfig(sharedInformers.Core().V1().Endpoints(), time.Minute) - endpointsConfig.RegisterEventHandler(handler) + endpointsliceConfig := NewEndpointSliceConfig(sharedInformers.Discovery().V1().EndpointSlices(), time.Minute) + endpointsliceConfig.RegisterEventHandler(handler) go sharedInformers.Start(stopCh) - go endpointsConfig.Run(stopCh) + go endpointsliceConfig.Run(stopCh) // Add the first endpoints fakeWatch.Add(endpoints1v1) - handler.ValidateEndpoints(t, []*v1.Endpoints{endpoints1v1}) + handler.ValidateEndpointSlices(t, []*discoveryv1.EndpointSlice{endpoints1v1}) // Add another endpoints fakeWatch.Add(endpoints2) - handler.ValidateEndpoints(t, []*v1.Endpoints{endpoints1v1, endpoints2}) + handler.ValidateEndpointSlices(t, []*discoveryv1.EndpointSlice{endpoints1v1, endpoints2}) // Modify endpoints1 fakeWatch.Modify(endpoints1v2) - handler.ValidateEndpoints(t, []*v1.Endpoints{endpoints1v2, endpoints2}) + handler.ValidateEndpointSlices(t, []*discoveryv1.EndpointSlice{endpoints1v2, endpoints2}) // Delete endpoints1 fakeWatch.Delete(endpoints1v2) - handler.ValidateEndpoints(t, []*v1.Endpoints{endpoints2}) + handler.ValidateEndpointSlices(t, []*discoveryv1.EndpointSlice{endpoints2}) // Delete endpoints2 fakeWatch.Delete(endpoints2) - handler.ValidateEndpoints(t, []*v1.Endpoints{}) + handler.ValidateEndpointSlices(t, []*discoveryv1.EndpointSlice{}) } func TestInitialSync(t *testing.T) { @@ -157,10 +172,10 @@ func TestInitialSync(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 10}}}, } - eps1 := &v1.Endpoints{ + eps1 := &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, } - eps2 := &v1.Endpoints{ + eps2 := &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, } @@ -168,7 +183,7 @@ func TestInitialSync(t *testing.T) { {Name: svc1.Name, Namespace: svc1.Namespace}: svc1, {Name: svc2.Name, Namespace: svc2.Namespace}: svc2, } - expectedEpsState := map[types.NamespacedName]*v1.Endpoints{ + expectedEpsState := map[types.NamespacedName]*discoveryv1.EndpointSlice{ {Name: eps1.Name, Namespace: eps1.Namespace}: eps1, {Name: eps2.Name, Namespace: eps2.Namespace}: eps2, } @@ -181,8 +196,8 @@ func TestInitialSync(t *testing.T) { svcHandler := NewServiceHandlerMock() svcConfig.RegisterEventHandler(svcHandler) - epsConfig := NewEndpointsConfig(sharedInformers.Core().V1().Endpoints(), 0) - epsHandler := NewEndpointsHandlerMock() + epsConfig := NewEndpointSliceConfig(sharedInformers.Discovery().V1().EndpointSlices(), 0) + epsHandler := NewEndpointSliceHandlerMock() epsConfig.RegisterEventHandler(epsHandler) stopCh := make(chan struct{}) @@ -226,7 +241,7 @@ func TestInitialSync(t *testing.T) { } gotEps := <-epsHandler.updated - gotEpsState := make(map[types.NamespacedName]*v1.Endpoints, len(gotEps)) + gotEpsState := make(map[types.NamespacedName]*discoveryv1.EndpointSlice, len(gotEps)) for _, eps := range gotEps { gotEpsState[types.NamespacedName{Namespace: eps.Namespace, Name: eps.Name}] = eps } diff --git a/pkg/proxy/config/config_test.go b/pkg/proxy/config/config_test.go index 531b3b1a089..0d91cbd8152 100644 --- a/pkg/proxy/config/config_test.go +++ b/pkg/proxy/config/config_test.go @@ -24,6 +24,7 @@ import ( "time" "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" @@ -31,6 +32,7 @@ import ( informers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" ktesting "k8s.io/client-go/testing" + utilpointer "k8s.io/utils/pointer" ) type sortedServices []*v1.Service @@ -128,96 +130,96 @@ func (h *ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []* } } -type sortedEndpoints []*v1.Endpoints +type sortedEndpointSlices []*discoveryv1.EndpointSlice -func (s sortedEndpoints) Len() int { +func (s sortedEndpointSlices) Len() int { return len(s) } -func (s sortedEndpoints) Swap(i, j int) { +func (s sortedEndpointSlices) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -func (s sortedEndpoints) Less(i, j int) bool { +func (s sortedEndpointSlices) Less(i, j int) bool { return s[i].Name < s[j].Name } -type EndpointsHandlerMock struct { +type EndpointSliceHandlerMock struct { lock sync.Mutex - state map[types.NamespacedName]*v1.Endpoints + state map[types.NamespacedName]*discoveryv1.EndpointSlice synced bool - updated chan []*v1.Endpoints - process func([]*v1.Endpoints) + updated chan []*discoveryv1.EndpointSlice + process func([]*discoveryv1.EndpointSlice) } -func NewEndpointsHandlerMock() *EndpointsHandlerMock { - ehm := &EndpointsHandlerMock{ - state: make(map[types.NamespacedName]*v1.Endpoints), - updated: make(chan []*v1.Endpoints, 5), +func NewEndpointSliceHandlerMock() *EndpointSliceHandlerMock { + ehm := &EndpointSliceHandlerMock{ + state: make(map[types.NamespacedName]*discoveryv1.EndpointSlice), + updated: make(chan []*discoveryv1.EndpointSlice, 5), } - ehm.process = func(endpoints []*v1.Endpoints) { + ehm.process = func(endpoints []*discoveryv1.EndpointSlice) { ehm.updated <- endpoints } return ehm } -func (h *EndpointsHandlerMock) OnEndpointsAdd(endpoints *v1.Endpoints) { +func (h *EndpointSliceHandlerMock) OnEndpointSliceAdd(slice *discoveryv1.EndpointSlice) { h.lock.Lock() defer h.lock.Unlock() - namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - h.state[namespacedName] = endpoints - h.sendEndpoints() + namespacedName := types.NamespacedName{Namespace: slice.Namespace, Name: slice.Name} + h.state[namespacedName] = slice + h.sendEndpointSlices() } -func (h *EndpointsHandlerMock) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) { +func (h *EndpointSliceHandlerMock) OnEndpointSliceUpdate(oldSlice, slice *discoveryv1.EndpointSlice) { h.lock.Lock() defer h.lock.Unlock() - namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - h.state[namespacedName] = endpoints - h.sendEndpoints() + namespacedName := types.NamespacedName{Namespace: slice.Namespace, Name: slice.Name} + h.state[namespacedName] = slice + h.sendEndpointSlices() } -func (h *EndpointsHandlerMock) OnEndpointsDelete(endpoints *v1.Endpoints) { +func (h *EndpointSliceHandlerMock) OnEndpointSliceDelete(slice *discoveryv1.EndpointSlice) { h.lock.Lock() defer h.lock.Unlock() - namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} + namespacedName := types.NamespacedName{Namespace: slice.Namespace, Name: slice.Name} delete(h.state, namespacedName) - h.sendEndpoints() + h.sendEndpointSlices() } -func (h *EndpointsHandlerMock) OnEndpointsSynced() { +func (h *EndpointSliceHandlerMock) OnEndpointSlicesSynced() { h.lock.Lock() defer h.lock.Unlock() h.synced = true - h.sendEndpoints() + h.sendEndpointSlices() } -func (h *EndpointsHandlerMock) sendEndpoints() { +func (h *EndpointSliceHandlerMock) sendEndpointSlices() { if !h.synced { return } - endpoints := make([]*v1.Endpoints, 0, len(h.state)) + slices := make([]*discoveryv1.EndpointSlice, 0, len(h.state)) for _, eps := range h.state { - endpoints = append(endpoints, eps) + slices = append(slices, eps) } - sort.Sort(sortedEndpoints(endpoints)) - h.process(endpoints) + sort.Sort(sortedEndpointSlices(slices)) + h.process(slices) } -func (h *EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints []*v1.Endpoints) { - // We might get 1 or more updates for N endpoint updates, because we - // over write older snapshots of endpoints from the producer go-routine +func (h *EndpointSliceHandlerMock) ValidateEndpointSlices(t *testing.T, expectedSlices []*discoveryv1.EndpointSlice) { + // We might get 1 or more updates for N endpointslice updates, because we + // over write older snapshots of endpointslices from the producer go-routine // if the consumer falls behind. Unittests will hard timeout in 5m. - var endpoints []*v1.Endpoints + var slices []*discoveryv1.EndpointSlice for { select { - case endpoints = <-h.updated: - if reflect.DeepEqual(endpoints, expectedEndpoints) { + case slices = <-h.updated: + if reflect.DeepEqual(slices, expectedSlices) { return } // Unittests will hard timeout in 5m with a stack trace, prevent that // and surface a clearer reason for failure. case <-time.After(wait.ForeverTestTimeout): - t.Errorf("Timed out. Expected %#v, Got %#v", expectedEndpoints, endpoints) + t.Errorf("Timed out. Expected %#v, Got %#v", expectedSlices, slices) return } } @@ -320,113 +322,129 @@ func TestNewServicesMultipleHandlersAddedAndNotified(t *testing.T) { func TestNewEndpointsMultipleHandlersAddedAndNotified(t *testing.T) { client := fake.NewSimpleClientset() fakeWatch := watch.NewFake() - client.PrependWatchReactor("endpoints", ktesting.DefaultWatchReactor(fakeWatch, nil)) + client.PrependWatchReactor("endpointslices", ktesting.DefaultWatchReactor(fakeWatch, nil)) stopCh := make(chan struct{}) defer close(stopCh) sharedInformers := informers.NewSharedInformerFactory(client, time.Minute) - config := NewEndpointsConfig(sharedInformers.Core().V1().Endpoints(), time.Minute) - handler := NewEndpointsHandlerMock() - handler2 := NewEndpointsHandlerMock() + config := NewEndpointSliceConfig(sharedInformers.Discovery().V1().EndpointSlices(), time.Minute) + handler := NewEndpointSliceHandlerMock() + handler2 := NewEndpointSliceHandlerMock() config.RegisterEventHandler(handler) config.RegisterEventHandler(handler2) go sharedInformers.Start(stopCh) go config.Run(stopCh) - endpoints1 := &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "1.1.1.1"}, {IP: "2.2.2.2"}}, - Ports: []v1.EndpointPort{{Port: 80}}, + endpoints1 := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{{ + Addresses: []string{"1.1.1.1"}, + }, { + Addresses: []string{"2.2.2.2"}, }}, + Ports: []discoveryv1.EndpointPort{{Port: utilpointer.Int32(80)}}, } - endpoints2 := &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "3.3.3.3"}, {IP: "4.4.4.4"}}, - Ports: []v1.EndpointPort{{Port: 80}}, + endpoints2 := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{{ + Addresses: []string{"3.3.3.3"}, + }, { + Addresses: []string{"4.4.4.4"}, }}, + Ports: []discoveryv1.EndpointPort{{Port: utilpointer.Int32(80)}}, } fakeWatch.Add(endpoints1) fakeWatch.Add(endpoints2) - endpoints := []*v1.Endpoints{endpoints2, endpoints1} - handler.ValidateEndpoints(t, endpoints) - handler2.ValidateEndpoints(t, endpoints) + endpoints := []*discoveryv1.EndpointSlice{endpoints2, endpoints1} + handler.ValidateEndpointSlices(t, endpoints) + handler2.ValidateEndpointSlices(t, endpoints) } func TestNewEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) { client := fake.NewSimpleClientset() fakeWatch := watch.NewFake() - client.PrependWatchReactor("endpoints", ktesting.DefaultWatchReactor(fakeWatch, nil)) + client.PrependWatchReactor("endpointslices", ktesting.DefaultWatchReactor(fakeWatch, nil)) stopCh := make(chan struct{}) defer close(stopCh) sharedInformers := informers.NewSharedInformerFactory(client, time.Minute) - config := NewEndpointsConfig(sharedInformers.Core().V1().Endpoints(), time.Minute) - handler := NewEndpointsHandlerMock() - handler2 := NewEndpointsHandlerMock() + config := NewEndpointSliceConfig(sharedInformers.Discovery().V1().EndpointSlices(), time.Minute) + handler := NewEndpointSliceHandlerMock() + handler2 := NewEndpointSliceHandlerMock() config.RegisterEventHandler(handler) config.RegisterEventHandler(handler2) go sharedInformers.Start(stopCh) go config.Run(stopCh) - endpoints1 := &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "1.1.1.1"}, {IP: "2.2.2.2"}}, - Ports: []v1.EndpointPort{{Port: 80}}, + endpoints1 := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{{ + Addresses: []string{"1.1.1.1"}, + }, { + Addresses: []string{"2.2.2.2"}, }}, + Ports: []discoveryv1.EndpointPort{{Port: utilpointer.Int32(80)}}, } - endpoints2 := &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "3.3.3.3"}, {IP: "4.4.4.4"}}, - Ports: []v1.EndpointPort{{Port: 80}}, + endpoints2 := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{{ + Addresses: []string{"3.3.3.3"}, + }, { + Addresses: []string{"4.4.4.4"}, }}, + Ports: []discoveryv1.EndpointPort{{Port: utilpointer.Int32(80)}}, } fakeWatch.Add(endpoints1) fakeWatch.Add(endpoints2) - endpoints := []*v1.Endpoints{endpoints2, endpoints1} - handler.ValidateEndpoints(t, endpoints) - handler2.ValidateEndpoints(t, endpoints) + endpoints := []*discoveryv1.EndpointSlice{endpoints2, endpoints1} + handler.ValidateEndpointSlices(t, endpoints) + handler2.ValidateEndpointSlices(t, endpoints) // Add one more - endpoints3 := &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foobar"}, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "5.5.5.5"}, {IP: "6.6.6.6"}}, - Ports: []v1.EndpointPort{{Port: 80}}, + endpoints3 := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foobar"}, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{{ + Addresses: []string{"5.5.5.5"}, + }, { + Addresses: []string{"6.6.6.6"}, }}, + Ports: []discoveryv1.EndpointPort{{Port: utilpointer.Int32(80)}}, } fakeWatch.Add(endpoints3) - endpoints = []*v1.Endpoints{endpoints2, endpoints1, endpoints3} - handler.ValidateEndpoints(t, endpoints) - handler2.ValidateEndpoints(t, endpoints) + endpoints = []*discoveryv1.EndpointSlice{endpoints2, endpoints1, endpoints3} + handler.ValidateEndpointSlices(t, endpoints) + handler2.ValidateEndpointSlices(t, endpoints) // Update the "foo" service with new endpoints - endpoints1v2 := &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "7.7.7.7"}}, - Ports: []v1.EndpointPort{{Port: 80}}, + endpoints1v2 := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{{ + Addresses: []string{"7.7.7.7"}, }}, + Ports: []discoveryv1.EndpointPort{{Port: utilpointer.Int32(80)}}, } fakeWatch.Modify(endpoints1v2) - endpoints = []*v1.Endpoints{endpoints2, endpoints1v2, endpoints3} - handler.ValidateEndpoints(t, endpoints) - handler2.ValidateEndpoints(t, endpoints) + endpoints = []*discoveryv1.EndpointSlice{endpoints2, endpoints1v2, endpoints3} + handler.ValidateEndpointSlices(t, endpoints) + handler2.ValidateEndpointSlices(t, endpoints) // Remove "bar" endpoints fakeWatch.Delete(endpoints2) - endpoints = []*v1.Endpoints{endpoints1v2, endpoints3} - handler.ValidateEndpoints(t, endpoints) - handler2.ValidateEndpoints(t, endpoints) + endpoints = []*discoveryv1.EndpointSlice{endpoints1v2, endpoints3} + handler.ValidateEndpointSlices(t, endpoints) + handler2.ValidateEndpointSlices(t, endpoints) } // TODO: Add a unittest for interrupts getting processed in a timely manner.