From df7e4a9432695e0543428321b94fb57fa35b5faa Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Fri, 30 Dec 2022 19:51:35 -0500 Subject: [PATCH 1/3] Belatedly port pkg/proxy/config Endpoints unit tests to EndpointSlice --- pkg/proxy/config/api_test.go | 85 ++++++++------ pkg/proxy/config/config_test.go | 200 +++++++++++++++++--------------- 2 files changed, 159 insertions(+), 126 deletions(-) diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go index 455c8f14836..422f4f03f16 100644 --- a/pkg/proxy/config/api_test.go +++ b/pkg/proxy/config/api_test.go @@ -22,6 +22,7 @@ import ( "time" v1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" @@ -29,6 +30,7 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" ktesting "k8s.io/client-go/testing" + utilpointer "k8s.io/utils/pointer" ) func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) { @@ -81,71 +83,84 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) { } func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) { - endpoints1v1 := &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e1"}, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{ - {IP: "1.2.3.4"}, + tcp := v1.ProtocolTCP + endpoints1v1 := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e1"}, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{{ + Addresses: []string{ + "1.2.3.4", }, - Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + }}, + Ports: []discoveryv1.EndpointPort{{ + Port: utilpointer.Int32(8080), + Protocol: &tcp, }}, } - endpoints1v2 := &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e1"}, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{ - {IP: "1.2.3.4"}, - {IP: "4.3.2.1"}, + endpoints1v2 := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e1"}, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{{ + Addresses: []string{ + "1.2.3.4", + "4.3.2.1", }, - Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + }}, + Ports: []discoveryv1.EndpointPort{{ + Port: utilpointer.Int32(8080), + Protocol: &tcp, }}, } - endpoints2 := &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e2"}, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{ - {IP: "5.6.7.8"}, + endpoints2 := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e2"}, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{{ + Addresses: []string{ + "5.6.7.8", }, - Ports: []v1.EndpointPort{{Port: 80, Protocol: "TCP"}}, + }}, + Ports: []discoveryv1.EndpointPort{{ + Port: utilpointer.Int32(8080), + Protocol: &tcp, }}, } // Setup fake api client. client := fake.NewSimpleClientset() fakeWatch := watch.NewFake() - client.PrependWatchReactor("endpoints", ktesting.DefaultWatchReactor(fakeWatch, nil)) + client.PrependWatchReactor("endpointslices", ktesting.DefaultWatchReactor(fakeWatch, nil)) stopCh := make(chan struct{}) defer close(stopCh) - handler := NewEndpointsHandlerMock() + handler := NewEndpointSliceHandlerMock() sharedInformers := informers.NewSharedInformerFactory(client, time.Minute) - endpointsConfig := NewEndpointsConfig(sharedInformers.Core().V1().Endpoints(), time.Minute) - endpointsConfig.RegisterEventHandler(handler) + endpointsliceConfig := NewEndpointSliceConfig(sharedInformers.Discovery().V1().EndpointSlices(), time.Minute) + endpointsliceConfig.RegisterEventHandler(handler) go sharedInformers.Start(stopCh) - go endpointsConfig.Run(stopCh) + go endpointsliceConfig.Run(stopCh) // Add the first endpoints fakeWatch.Add(endpoints1v1) - handler.ValidateEndpoints(t, []*v1.Endpoints{endpoints1v1}) + handler.ValidateEndpointSlices(t, []*discoveryv1.EndpointSlice{endpoints1v1}) // Add another endpoints fakeWatch.Add(endpoints2) - handler.ValidateEndpoints(t, []*v1.Endpoints{endpoints1v1, endpoints2}) + handler.ValidateEndpointSlices(t, []*discoveryv1.EndpointSlice{endpoints1v1, endpoints2}) // Modify endpoints1 fakeWatch.Modify(endpoints1v2) - handler.ValidateEndpoints(t, []*v1.Endpoints{endpoints1v2, endpoints2}) + handler.ValidateEndpointSlices(t, []*discoveryv1.EndpointSlice{endpoints1v2, endpoints2}) // Delete endpoints1 fakeWatch.Delete(endpoints1v2) - handler.ValidateEndpoints(t, []*v1.Endpoints{endpoints2}) + handler.ValidateEndpointSlices(t, []*discoveryv1.EndpointSlice{endpoints2}) // Delete endpoints2 fakeWatch.Delete(endpoints2) - handler.ValidateEndpoints(t, []*v1.Endpoints{}) + handler.ValidateEndpointSlices(t, []*discoveryv1.EndpointSlice{}) } func TestInitialSync(t *testing.T) { @@ -157,10 +172,10 @@ func TestInitialSync(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 10}}}, } - eps1 := &v1.Endpoints{ + eps1 := &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, } - eps2 := &v1.Endpoints{ + eps2 := &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, } @@ -168,7 +183,7 @@ func TestInitialSync(t *testing.T) { {Name: svc1.Name, Namespace: svc1.Namespace}: svc1, {Name: svc2.Name, Namespace: svc2.Namespace}: svc2, } - expectedEpsState := map[types.NamespacedName]*v1.Endpoints{ + expectedEpsState := map[types.NamespacedName]*discoveryv1.EndpointSlice{ {Name: eps1.Name, Namespace: eps1.Namespace}: eps1, {Name: eps2.Name, Namespace: eps2.Namespace}: eps2, } @@ -181,8 +196,8 @@ func TestInitialSync(t *testing.T) { svcHandler := NewServiceHandlerMock() svcConfig.RegisterEventHandler(svcHandler) - epsConfig := NewEndpointsConfig(sharedInformers.Core().V1().Endpoints(), 0) - epsHandler := NewEndpointsHandlerMock() + epsConfig := NewEndpointSliceConfig(sharedInformers.Discovery().V1().EndpointSlices(), 0) + epsHandler := NewEndpointSliceHandlerMock() epsConfig.RegisterEventHandler(epsHandler) stopCh := make(chan struct{}) @@ -226,7 +241,7 @@ func TestInitialSync(t *testing.T) { } gotEps := <-epsHandler.updated - gotEpsState := make(map[types.NamespacedName]*v1.Endpoints, len(gotEps)) + gotEpsState := make(map[types.NamespacedName]*discoveryv1.EndpointSlice, len(gotEps)) for _, eps := range gotEps { gotEpsState[types.NamespacedName{Namespace: eps.Namespace, Name: eps.Name}] = eps } diff --git a/pkg/proxy/config/config_test.go b/pkg/proxy/config/config_test.go index 531b3b1a089..0d91cbd8152 100644 --- a/pkg/proxy/config/config_test.go +++ b/pkg/proxy/config/config_test.go @@ -24,6 +24,7 @@ import ( "time" "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" @@ -31,6 +32,7 @@ import ( informers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" ktesting "k8s.io/client-go/testing" + utilpointer "k8s.io/utils/pointer" ) type sortedServices []*v1.Service @@ -128,96 +130,96 @@ func (h *ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []* } } -type sortedEndpoints []*v1.Endpoints +type sortedEndpointSlices []*discoveryv1.EndpointSlice -func (s sortedEndpoints) Len() int { +func (s sortedEndpointSlices) Len() int { return len(s) } -func (s sortedEndpoints) Swap(i, j int) { +func (s sortedEndpointSlices) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -func (s sortedEndpoints) Less(i, j int) bool { +func (s sortedEndpointSlices) Less(i, j int) bool { return s[i].Name < s[j].Name } -type EndpointsHandlerMock struct { +type EndpointSliceHandlerMock struct { lock sync.Mutex - state map[types.NamespacedName]*v1.Endpoints + state map[types.NamespacedName]*discoveryv1.EndpointSlice synced bool - updated chan []*v1.Endpoints - process func([]*v1.Endpoints) + updated chan []*discoveryv1.EndpointSlice + process func([]*discoveryv1.EndpointSlice) } -func NewEndpointsHandlerMock() *EndpointsHandlerMock { - ehm := &EndpointsHandlerMock{ - state: make(map[types.NamespacedName]*v1.Endpoints), - updated: make(chan []*v1.Endpoints, 5), +func NewEndpointSliceHandlerMock() *EndpointSliceHandlerMock { + ehm := &EndpointSliceHandlerMock{ + state: make(map[types.NamespacedName]*discoveryv1.EndpointSlice), + updated: make(chan []*discoveryv1.EndpointSlice, 5), } - ehm.process = func(endpoints []*v1.Endpoints) { + ehm.process = func(endpoints []*discoveryv1.EndpointSlice) { ehm.updated <- endpoints } return ehm } -func (h *EndpointsHandlerMock) OnEndpointsAdd(endpoints *v1.Endpoints) { +func (h *EndpointSliceHandlerMock) OnEndpointSliceAdd(slice *discoveryv1.EndpointSlice) { h.lock.Lock() defer h.lock.Unlock() - namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - h.state[namespacedName] = endpoints - h.sendEndpoints() + namespacedName := types.NamespacedName{Namespace: slice.Namespace, Name: slice.Name} + h.state[namespacedName] = slice + h.sendEndpointSlices() } -func (h *EndpointsHandlerMock) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) { +func (h *EndpointSliceHandlerMock) OnEndpointSliceUpdate(oldSlice, slice *discoveryv1.EndpointSlice) { h.lock.Lock() defer h.lock.Unlock() - namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - h.state[namespacedName] = endpoints - h.sendEndpoints() + namespacedName := types.NamespacedName{Namespace: slice.Namespace, Name: slice.Name} + h.state[namespacedName] = slice + h.sendEndpointSlices() } -func (h *EndpointsHandlerMock) OnEndpointsDelete(endpoints *v1.Endpoints) { +func (h *EndpointSliceHandlerMock) OnEndpointSliceDelete(slice *discoveryv1.EndpointSlice) { h.lock.Lock() defer h.lock.Unlock() - namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} + namespacedName := types.NamespacedName{Namespace: slice.Namespace, Name: slice.Name} delete(h.state, namespacedName) - h.sendEndpoints() + h.sendEndpointSlices() } -func (h *EndpointsHandlerMock) OnEndpointsSynced() { +func (h *EndpointSliceHandlerMock) OnEndpointSlicesSynced() { h.lock.Lock() defer h.lock.Unlock() h.synced = true - h.sendEndpoints() + h.sendEndpointSlices() } -func (h *EndpointsHandlerMock) sendEndpoints() { +func (h *EndpointSliceHandlerMock) sendEndpointSlices() { if !h.synced { return } - endpoints := make([]*v1.Endpoints, 0, len(h.state)) + slices := make([]*discoveryv1.EndpointSlice, 0, len(h.state)) for _, eps := range h.state { - endpoints = append(endpoints, eps) + slices = append(slices, eps) } - sort.Sort(sortedEndpoints(endpoints)) - h.process(endpoints) + sort.Sort(sortedEndpointSlices(slices)) + h.process(slices) } -func (h *EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints []*v1.Endpoints) { - // We might get 1 or more updates for N endpoint updates, because we - // over write older snapshots of endpoints from the producer go-routine +func (h *EndpointSliceHandlerMock) ValidateEndpointSlices(t *testing.T, expectedSlices []*discoveryv1.EndpointSlice) { + // We might get 1 or more updates for N endpointslice updates, because we + // over write older snapshots of endpointslices from the producer go-routine // if the consumer falls behind. Unittests will hard timeout in 5m. - var endpoints []*v1.Endpoints + var slices []*discoveryv1.EndpointSlice for { select { - case endpoints = <-h.updated: - if reflect.DeepEqual(endpoints, expectedEndpoints) { + case slices = <-h.updated: + if reflect.DeepEqual(slices, expectedSlices) { return } // Unittests will hard timeout in 5m with a stack trace, prevent that // and surface a clearer reason for failure. case <-time.After(wait.ForeverTestTimeout): - t.Errorf("Timed out. Expected %#v, Got %#v", expectedEndpoints, endpoints) + t.Errorf("Timed out. Expected %#v, Got %#v", expectedSlices, slices) return } } @@ -320,113 +322,129 @@ func TestNewServicesMultipleHandlersAddedAndNotified(t *testing.T) { func TestNewEndpointsMultipleHandlersAddedAndNotified(t *testing.T) { client := fake.NewSimpleClientset() fakeWatch := watch.NewFake() - client.PrependWatchReactor("endpoints", ktesting.DefaultWatchReactor(fakeWatch, nil)) + client.PrependWatchReactor("endpointslices", ktesting.DefaultWatchReactor(fakeWatch, nil)) stopCh := make(chan struct{}) defer close(stopCh) sharedInformers := informers.NewSharedInformerFactory(client, time.Minute) - config := NewEndpointsConfig(sharedInformers.Core().V1().Endpoints(), time.Minute) - handler := NewEndpointsHandlerMock() - handler2 := NewEndpointsHandlerMock() + config := NewEndpointSliceConfig(sharedInformers.Discovery().V1().EndpointSlices(), time.Minute) + handler := NewEndpointSliceHandlerMock() + handler2 := NewEndpointSliceHandlerMock() config.RegisterEventHandler(handler) config.RegisterEventHandler(handler2) go sharedInformers.Start(stopCh) go config.Run(stopCh) - endpoints1 := &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "1.1.1.1"}, {IP: "2.2.2.2"}}, - Ports: []v1.EndpointPort{{Port: 80}}, + endpoints1 := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{{ + Addresses: []string{"1.1.1.1"}, + }, { + Addresses: []string{"2.2.2.2"}, }}, + Ports: []discoveryv1.EndpointPort{{Port: utilpointer.Int32(80)}}, } - endpoints2 := &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "3.3.3.3"}, {IP: "4.4.4.4"}}, - Ports: []v1.EndpointPort{{Port: 80}}, + endpoints2 := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{{ + Addresses: []string{"3.3.3.3"}, + }, { + Addresses: []string{"4.4.4.4"}, }}, + Ports: []discoveryv1.EndpointPort{{Port: utilpointer.Int32(80)}}, } fakeWatch.Add(endpoints1) fakeWatch.Add(endpoints2) - endpoints := []*v1.Endpoints{endpoints2, endpoints1} - handler.ValidateEndpoints(t, endpoints) - handler2.ValidateEndpoints(t, endpoints) + endpoints := []*discoveryv1.EndpointSlice{endpoints2, endpoints1} + handler.ValidateEndpointSlices(t, endpoints) + handler2.ValidateEndpointSlices(t, endpoints) } func TestNewEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) { client := fake.NewSimpleClientset() fakeWatch := watch.NewFake() - client.PrependWatchReactor("endpoints", ktesting.DefaultWatchReactor(fakeWatch, nil)) + client.PrependWatchReactor("endpointslices", ktesting.DefaultWatchReactor(fakeWatch, nil)) stopCh := make(chan struct{}) defer close(stopCh) sharedInformers := informers.NewSharedInformerFactory(client, time.Minute) - config := NewEndpointsConfig(sharedInformers.Core().V1().Endpoints(), time.Minute) - handler := NewEndpointsHandlerMock() - handler2 := NewEndpointsHandlerMock() + config := NewEndpointSliceConfig(sharedInformers.Discovery().V1().EndpointSlices(), time.Minute) + handler := NewEndpointSliceHandlerMock() + handler2 := NewEndpointSliceHandlerMock() config.RegisterEventHandler(handler) config.RegisterEventHandler(handler2) go sharedInformers.Start(stopCh) go config.Run(stopCh) - endpoints1 := &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "1.1.1.1"}, {IP: "2.2.2.2"}}, - Ports: []v1.EndpointPort{{Port: 80}}, + endpoints1 := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{{ + Addresses: []string{"1.1.1.1"}, + }, { + Addresses: []string{"2.2.2.2"}, }}, + Ports: []discoveryv1.EndpointPort{{Port: utilpointer.Int32(80)}}, } - endpoints2 := &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "3.3.3.3"}, {IP: "4.4.4.4"}}, - Ports: []v1.EndpointPort{{Port: 80}}, + endpoints2 := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{{ + Addresses: []string{"3.3.3.3"}, + }, { + Addresses: []string{"4.4.4.4"}, }}, + Ports: []discoveryv1.EndpointPort{{Port: utilpointer.Int32(80)}}, } fakeWatch.Add(endpoints1) fakeWatch.Add(endpoints2) - endpoints := []*v1.Endpoints{endpoints2, endpoints1} - handler.ValidateEndpoints(t, endpoints) - handler2.ValidateEndpoints(t, endpoints) + endpoints := []*discoveryv1.EndpointSlice{endpoints2, endpoints1} + handler.ValidateEndpointSlices(t, endpoints) + handler2.ValidateEndpointSlices(t, endpoints) // Add one more - endpoints3 := &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foobar"}, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "5.5.5.5"}, {IP: "6.6.6.6"}}, - Ports: []v1.EndpointPort{{Port: 80}}, + endpoints3 := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foobar"}, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{{ + Addresses: []string{"5.5.5.5"}, + }, { + Addresses: []string{"6.6.6.6"}, }}, + Ports: []discoveryv1.EndpointPort{{Port: utilpointer.Int32(80)}}, } fakeWatch.Add(endpoints3) - endpoints = []*v1.Endpoints{endpoints2, endpoints1, endpoints3} - handler.ValidateEndpoints(t, endpoints) - handler2.ValidateEndpoints(t, endpoints) + endpoints = []*discoveryv1.EndpointSlice{endpoints2, endpoints1, endpoints3} + handler.ValidateEndpointSlices(t, endpoints) + handler2.ValidateEndpointSlices(t, endpoints) // Update the "foo" service with new endpoints - endpoints1v2 := &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "7.7.7.7"}}, - Ports: []v1.EndpointPort{{Port: 80}}, + endpoints1v2 := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{{ + Addresses: []string{"7.7.7.7"}, }}, + Ports: []discoveryv1.EndpointPort{{Port: utilpointer.Int32(80)}}, } fakeWatch.Modify(endpoints1v2) - endpoints = []*v1.Endpoints{endpoints2, endpoints1v2, endpoints3} - handler.ValidateEndpoints(t, endpoints) - handler2.ValidateEndpoints(t, endpoints) + endpoints = []*discoveryv1.EndpointSlice{endpoints2, endpoints1v2, endpoints3} + handler.ValidateEndpointSlices(t, endpoints) + handler2.ValidateEndpointSlices(t, endpoints) // Remove "bar" endpoints fakeWatch.Delete(endpoints2) - endpoints = []*v1.Endpoints{endpoints1v2, endpoints3} - handler.ValidateEndpoints(t, endpoints) - handler2.ValidateEndpoints(t, endpoints) + endpoints = []*discoveryv1.EndpointSlice{endpoints1v2, endpoints3} + handler.ValidateEndpointSlices(t, endpoints) + handler2.ValidateEndpointSlices(t, endpoints) } // TODO: Add a unittest for interrupts getting processed in a timely manner. From 3da93e1fe44e331093ada6df73b7b3be902d3cc7 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Sat, 31 Dec 2022 10:06:55 -0500 Subject: [PATCH 2/3] "Port" HollowProxy from Endpoints to EndpointSlice --- pkg/kubemark/hollow_proxy.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/kubemark/hollow_proxy.go b/pkg/kubemark/hollow_proxy.go index bbd4bd04d5d..4b179a1e414 100644 --- a/pkg/kubemark/hollow_proxy.go +++ b/pkg/kubemark/hollow_proxy.go @@ -21,6 +21,7 @@ import ( "time" v1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/types" clientset "k8s.io/client-go/kubernetes" v1core "k8s.io/client-go/kubernetes/typed/core/v1" @@ -45,7 +46,6 @@ type HollowProxy struct { } type FakeProxier struct { - proxyconfig.NoopEndpointSliceHandler proxyconfig.NoopNodeHandler } @@ -53,14 +53,14 @@ func (*FakeProxier) Sync() {} func (*FakeProxier) SyncLoop() { select {} } -func (*FakeProxier) OnServiceAdd(service *v1.Service) {} -func (*FakeProxier) OnServiceUpdate(oldService, service *v1.Service) {} -func (*FakeProxier) OnServiceDelete(service *v1.Service) {} -func (*FakeProxier) OnServiceSynced() {} -func (*FakeProxier) OnEndpointsAdd(endpoints *v1.Endpoints) {} -func (*FakeProxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {} -func (*FakeProxier) OnEndpointsDelete(endpoints *v1.Endpoints) {} -func (*FakeProxier) OnEndpointsSynced() {} +func (*FakeProxier) OnServiceAdd(service *v1.Service) {} +func (*FakeProxier) OnServiceUpdate(oldService, service *v1.Service) {} +func (*FakeProxier) OnServiceDelete(service *v1.Service) {} +func (*FakeProxier) OnServiceSynced() {} +func (*FakeProxier) OnEndpointSliceAdd(slice *discoveryv1.EndpointSlice) {} +func (*FakeProxier) OnEndpointSliceUpdate(oldSlice, slice *discoveryv1.EndpointSlice) {} +func (*FakeProxier) OnEndpointSliceDelete(slice *discoveryv1.EndpointSlice) {} +func (*FakeProxier) OnEndpointSlicesSynced() {} func NewHollowProxyOrDie( nodeName string, From 2ea105df63ab0e1d0ec4d94652e32990fc06f66a Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Fri, 30 Dec 2022 19:53:33 -0500 Subject: [PATCH 3/3] Drop unused EndpointsHandler / EndpointsConfig from pkg/proxy/config (Also NoopEndpointSliceHandler since it's no longer possible for a proxy implementation to no-op EndpointSlice handling.) --- pkg/proxy/config/config.go | 129 ------------------------------------- 1 file changed, 129 deletions(-) diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 065b499c84b..090062c46ba 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -46,25 +46,6 @@ type ServiceHandler interface { OnServiceSynced() } -// EndpointsHandler is an abstract interface of objects which receive -// notifications about endpoints object changes. This is not a required -// sub-interface of proxy.Provider, and proxy implementations should -// not implement it unless they can't handle EndpointSlices. -type EndpointsHandler interface { - // OnEndpointsAdd is called whenever creation of new endpoints object - // is observed. - OnEndpointsAdd(endpoints *v1.Endpoints) - // OnEndpointsUpdate is called whenever modification of an existing - // endpoints object is observed. - OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) - // OnEndpointsDelete is called whenever deletion of an existing endpoints - // object is observed. - OnEndpointsDelete(endpoints *v1.Endpoints) - // OnEndpointsSynced is called once all the initial event handlers were - // called and the state is fully propagated to local cache. - OnEndpointsSynced() -} - // EndpointSliceHandler is an abstract interface of objects which receive // notifications about endpoint slice object changes. type EndpointSliceHandler interface { @@ -82,116 +63,6 @@ type EndpointSliceHandler interface { OnEndpointSlicesSynced() } -// NoopEndpointSliceHandler is a noop handler for proxiers that have not yet -// implemented a full EndpointSliceHandler. -type NoopEndpointSliceHandler struct{} - -// OnEndpointSliceAdd is a noop handler for EndpointSlice creates. -func (*NoopEndpointSliceHandler) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {} - -// OnEndpointSliceUpdate is a noop handler for EndpointSlice updates. -func (*NoopEndpointSliceHandler) OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice *discovery.EndpointSlice) { -} - -// OnEndpointSliceDelete is a noop handler for EndpointSlice deletes. -func (*NoopEndpointSliceHandler) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {} - -// OnEndpointSlicesSynced is a noop handler for EndpointSlice syncs. -func (*NoopEndpointSliceHandler) OnEndpointSlicesSynced() {} - -var _ EndpointSliceHandler = &NoopEndpointSliceHandler{} - -// EndpointsConfig tracks a set of endpoints configurations. -type EndpointsConfig struct { - listerSynced cache.InformerSynced - eventHandlers []EndpointsHandler -} - -// NewEndpointsConfig creates a new EndpointsConfig. -func NewEndpointsConfig(endpointsInformer coreinformers.EndpointsInformer, resyncPeriod time.Duration) *EndpointsConfig { - result := &EndpointsConfig{ - listerSynced: endpointsInformer.Informer().HasSynced, - } - - endpointsInformer.Informer().AddEventHandlerWithResyncPeriod( - cache.ResourceEventHandlerFuncs{ - AddFunc: result.handleAddEndpoints, - UpdateFunc: result.handleUpdateEndpoints, - DeleteFunc: result.handleDeleteEndpoints, - }, - resyncPeriod, - ) - - return result -} - -// RegisterEventHandler registers a handler which is called on every endpoints change. -func (c *EndpointsConfig) RegisterEventHandler(handler EndpointsHandler) { - c.eventHandlers = append(c.eventHandlers, handler) -} - -// Run waits for cache synced and invokes handlers after syncing. -func (c *EndpointsConfig) Run(stopCh <-chan struct{}) { - klog.InfoS("Starting endpoints config controller") - - if !cache.WaitForNamedCacheSync("endpoints config", stopCh, c.listerSynced) { - return - } - - for i := range c.eventHandlers { - klog.V(3).InfoS("Calling handler.OnEndpointsSynced()") - c.eventHandlers[i].OnEndpointsSynced() - } -} - -func (c *EndpointsConfig) handleAddEndpoints(obj interface{}) { - endpoints, ok := obj.(*v1.Endpoints) - if !ok { - utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj)) - return - } - for i := range c.eventHandlers { - klog.V(4).InfoS("Calling handler.OnEndpointsAdd") - c.eventHandlers[i].OnEndpointsAdd(endpoints) - } -} - -func (c *EndpointsConfig) handleUpdateEndpoints(oldObj, newObj interface{}) { - oldEndpoints, ok := oldObj.(*v1.Endpoints) - if !ok { - utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj)) - return - } - endpoints, ok := newObj.(*v1.Endpoints) - if !ok { - utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj)) - return - } - for i := range c.eventHandlers { - klog.V(4).InfoS("Calling handler.OnEndpointsUpdate") - c.eventHandlers[i].OnEndpointsUpdate(oldEndpoints, endpoints) - } -} - -func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) { - endpoints, ok := obj.(*v1.Endpoints) - if !ok { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj)) - return - } - if endpoints, ok = tombstone.Obj.(*v1.Endpoints); !ok { - utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj)) - return - } - } - for i := range c.eventHandlers { - klog.V(4).InfoS("Calling handler.OnEndpointsDelete") - c.eventHandlers[i].OnEndpointsDelete(endpoints) - } -} - // EndpointSliceConfig tracks a set of endpoints configurations. type EndpointSliceConfig struct { listerSynced cache.InformerSynced