Merge pull request #114749 from danwinship/drop-endpointshandler

Drop unused EndpointsHandler / EndpointsConfig from pkg/proxy/config
This commit is contained in:
Kubernetes Prow Robot 2023-01-05 03:58:09 -08:00 committed by GitHub
commit d9af380f91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 168 additions and 264 deletions

View File

@ -21,6 +21,7 @@ import (
"time" "time"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1" v1core "k8s.io/client-go/kubernetes/typed/core/v1"
@ -45,7 +46,6 @@ type HollowProxy struct {
} }
type FakeProxier struct { type FakeProxier struct {
proxyconfig.NoopEndpointSliceHandler
proxyconfig.NoopNodeHandler proxyconfig.NoopNodeHandler
} }
@ -57,10 +57,10 @@ func (*FakeProxier) OnServiceAdd(service *v1.Service) {}
func (*FakeProxier) OnServiceUpdate(oldService, service *v1.Service) {} func (*FakeProxier) OnServiceUpdate(oldService, service *v1.Service) {}
func (*FakeProxier) OnServiceDelete(service *v1.Service) {} func (*FakeProxier) OnServiceDelete(service *v1.Service) {}
func (*FakeProxier) OnServiceSynced() {} func (*FakeProxier) OnServiceSynced() {}
func (*FakeProxier) OnEndpointsAdd(endpoints *v1.Endpoints) {} func (*FakeProxier) OnEndpointSliceAdd(slice *discoveryv1.EndpointSlice) {}
func (*FakeProxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {} func (*FakeProxier) OnEndpointSliceUpdate(oldSlice, slice *discoveryv1.EndpointSlice) {}
func (*FakeProxier) OnEndpointsDelete(endpoints *v1.Endpoints) {} func (*FakeProxier) OnEndpointSliceDelete(slice *discoveryv1.EndpointSlice) {}
func (*FakeProxier) OnEndpointsSynced() {} func (*FakeProxier) OnEndpointSlicesSynced() {}
func NewHollowProxyOrDie( func NewHollowProxyOrDie(
nodeName string, nodeName string,

View File

@ -22,6 +22,7 @@ import (
"time" "time"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
@ -29,6 +30,7 @@ import (
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
ktesting "k8s.io/client-go/testing" ktesting "k8s.io/client-go/testing"
utilpointer "k8s.io/utils/pointer"
) )
func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) { func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
@ -81,71 +83,84 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
} }
func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) { func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
endpoints1v1 := &v1.Endpoints{ tcp := v1.ProtocolTCP
endpoints1v1 := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e1"},
Subsets: []v1.EndpointSubset{{ AddressType: discoveryv1.AddressTypeIPv4,
Addresses: []v1.EndpointAddress{ Endpoints: []discoveryv1.Endpoint{{
{IP: "1.2.3.4"}, Addresses: []string{
"1.2.3.4",
}, },
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}}, }},
Ports: []discoveryv1.EndpointPort{{
Port: utilpointer.Int32(8080),
Protocol: &tcp,
}}, }},
} }
endpoints1v2 := &v1.Endpoints{ endpoints1v2 := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e1"}, ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e1"},
Subsets: []v1.EndpointSubset{{ AddressType: discoveryv1.AddressTypeIPv4,
Addresses: []v1.EndpointAddress{ Endpoints: []discoveryv1.Endpoint{{
{IP: "1.2.3.4"}, Addresses: []string{
{IP: "4.3.2.1"}, "1.2.3.4",
"4.3.2.1",
}, },
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}}, }},
Ports: []discoveryv1.EndpointPort{{
Port: utilpointer.Int32(8080),
Protocol: &tcp,
}}, }},
} }
endpoints2 := &v1.Endpoints{ endpoints2 := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e2"}, ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e2"},
Subsets: []v1.EndpointSubset{{ AddressType: discoveryv1.AddressTypeIPv4,
Addresses: []v1.EndpointAddress{ Endpoints: []discoveryv1.Endpoint{{
{IP: "5.6.7.8"}, Addresses: []string{
"5.6.7.8",
}, },
Ports: []v1.EndpointPort{{Port: 80, Protocol: "TCP"}}, }},
Ports: []discoveryv1.EndpointPort{{
Port: utilpointer.Int32(8080),
Protocol: &tcp,
}}, }},
} }
// Setup fake api client. // Setup fake api client.
client := fake.NewSimpleClientset() client := fake.NewSimpleClientset()
fakeWatch := watch.NewFake() fakeWatch := watch.NewFake()
client.PrependWatchReactor("endpoints", ktesting.DefaultWatchReactor(fakeWatch, nil)) client.PrependWatchReactor("endpointslices", ktesting.DefaultWatchReactor(fakeWatch, nil))
stopCh := make(chan struct{}) stopCh := make(chan struct{})
defer close(stopCh) defer close(stopCh)
handler := NewEndpointsHandlerMock() handler := NewEndpointSliceHandlerMock()
sharedInformers := informers.NewSharedInformerFactory(client, time.Minute) sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
endpointsConfig := NewEndpointsConfig(sharedInformers.Core().V1().Endpoints(), time.Minute) endpointsliceConfig := NewEndpointSliceConfig(sharedInformers.Discovery().V1().EndpointSlices(), time.Minute)
endpointsConfig.RegisterEventHandler(handler) endpointsliceConfig.RegisterEventHandler(handler)
go sharedInformers.Start(stopCh) go sharedInformers.Start(stopCh)
go endpointsConfig.Run(stopCh) go endpointsliceConfig.Run(stopCh)
// Add the first endpoints // Add the first endpoints
fakeWatch.Add(endpoints1v1) fakeWatch.Add(endpoints1v1)
handler.ValidateEndpoints(t, []*v1.Endpoints{endpoints1v1}) handler.ValidateEndpointSlices(t, []*discoveryv1.EndpointSlice{endpoints1v1})
// Add another endpoints // Add another endpoints
fakeWatch.Add(endpoints2) fakeWatch.Add(endpoints2)
handler.ValidateEndpoints(t, []*v1.Endpoints{endpoints1v1, endpoints2}) handler.ValidateEndpointSlices(t, []*discoveryv1.EndpointSlice{endpoints1v1, endpoints2})
// Modify endpoints1 // Modify endpoints1
fakeWatch.Modify(endpoints1v2) fakeWatch.Modify(endpoints1v2)
handler.ValidateEndpoints(t, []*v1.Endpoints{endpoints1v2, endpoints2}) handler.ValidateEndpointSlices(t, []*discoveryv1.EndpointSlice{endpoints1v2, endpoints2})
// Delete endpoints1 // Delete endpoints1
fakeWatch.Delete(endpoints1v2) fakeWatch.Delete(endpoints1v2)
handler.ValidateEndpoints(t, []*v1.Endpoints{endpoints2}) handler.ValidateEndpointSlices(t, []*discoveryv1.EndpointSlice{endpoints2})
// Delete endpoints2 // Delete endpoints2
fakeWatch.Delete(endpoints2) fakeWatch.Delete(endpoints2)
handler.ValidateEndpoints(t, []*v1.Endpoints{}) handler.ValidateEndpointSlices(t, []*discoveryv1.EndpointSlice{})
} }
func TestInitialSync(t *testing.T) { func TestInitialSync(t *testing.T) {
@ -157,10 +172,10 @@ func TestInitialSync(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 10}}}, Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 10}}},
} }
eps1 := &v1.Endpoints{ eps1 := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
} }
eps2 := &v1.Endpoints{ eps2 := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
} }
@ -168,7 +183,7 @@ func TestInitialSync(t *testing.T) {
{Name: svc1.Name, Namespace: svc1.Namespace}: svc1, {Name: svc1.Name, Namespace: svc1.Namespace}: svc1,
{Name: svc2.Name, Namespace: svc2.Namespace}: svc2, {Name: svc2.Name, Namespace: svc2.Namespace}: svc2,
} }
expectedEpsState := map[types.NamespacedName]*v1.Endpoints{ expectedEpsState := map[types.NamespacedName]*discoveryv1.EndpointSlice{
{Name: eps1.Name, Namespace: eps1.Namespace}: eps1, {Name: eps1.Name, Namespace: eps1.Namespace}: eps1,
{Name: eps2.Name, Namespace: eps2.Namespace}: eps2, {Name: eps2.Name, Namespace: eps2.Namespace}: eps2,
} }
@ -181,8 +196,8 @@ func TestInitialSync(t *testing.T) {
svcHandler := NewServiceHandlerMock() svcHandler := NewServiceHandlerMock()
svcConfig.RegisterEventHandler(svcHandler) svcConfig.RegisterEventHandler(svcHandler)
epsConfig := NewEndpointsConfig(sharedInformers.Core().V1().Endpoints(), 0) epsConfig := NewEndpointSliceConfig(sharedInformers.Discovery().V1().EndpointSlices(), 0)
epsHandler := NewEndpointsHandlerMock() epsHandler := NewEndpointSliceHandlerMock()
epsConfig.RegisterEventHandler(epsHandler) epsConfig.RegisterEventHandler(epsHandler)
stopCh := make(chan struct{}) stopCh := make(chan struct{})
@ -226,7 +241,7 @@ func TestInitialSync(t *testing.T) {
} }
gotEps := <-epsHandler.updated gotEps := <-epsHandler.updated
gotEpsState := make(map[types.NamespacedName]*v1.Endpoints, len(gotEps)) gotEpsState := make(map[types.NamespacedName]*discoveryv1.EndpointSlice, len(gotEps))
for _, eps := range gotEps { for _, eps := range gotEps {
gotEpsState[types.NamespacedName{Namespace: eps.Namespace, Name: eps.Name}] = eps gotEpsState[types.NamespacedName{Namespace: eps.Namespace, Name: eps.Name}] = eps
} }

View File

@ -46,25 +46,6 @@ type ServiceHandler interface {
OnServiceSynced() OnServiceSynced()
} }
// EndpointsHandler is an abstract interface of objects which receive
// notifications about endpoints object changes. This is not a required
// sub-interface of proxy.Provider, and proxy implementations should
// not implement it unless they can't handle EndpointSlices.
type EndpointsHandler interface {
// OnEndpointsAdd is called whenever creation of new endpoints object
// is observed.
OnEndpointsAdd(endpoints *v1.Endpoints)
// OnEndpointsUpdate is called whenever modification of an existing
// endpoints object is observed.
OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints)
// OnEndpointsDelete is called whenever deletion of an existing endpoints
// object is observed.
OnEndpointsDelete(endpoints *v1.Endpoints)
// OnEndpointsSynced is called once all the initial event handlers were
// called and the state is fully propagated to local cache.
OnEndpointsSynced()
}
// EndpointSliceHandler is an abstract interface of objects which receive // EndpointSliceHandler is an abstract interface of objects which receive
// notifications about endpoint slice object changes. // notifications about endpoint slice object changes.
type EndpointSliceHandler interface { type EndpointSliceHandler interface {
@ -82,116 +63,6 @@ type EndpointSliceHandler interface {
OnEndpointSlicesSynced() OnEndpointSlicesSynced()
} }
// NoopEndpointSliceHandler is a noop handler for proxiers that have not yet
// implemented a full EndpointSliceHandler.
type NoopEndpointSliceHandler struct{}
// OnEndpointSliceAdd is a noop handler for EndpointSlice creates.
func (*NoopEndpointSliceHandler) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {}
// OnEndpointSliceUpdate is a noop handler for EndpointSlice updates.
func (*NoopEndpointSliceHandler) OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice *discovery.EndpointSlice) {
}
// OnEndpointSliceDelete is a noop handler for EndpointSlice deletes.
func (*NoopEndpointSliceHandler) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {}
// OnEndpointSlicesSynced is a noop handler for EndpointSlice syncs.
func (*NoopEndpointSliceHandler) OnEndpointSlicesSynced() {}
var _ EndpointSliceHandler = &NoopEndpointSliceHandler{}
// EndpointsConfig tracks a set of endpoints configurations.
type EndpointsConfig struct {
listerSynced cache.InformerSynced
eventHandlers []EndpointsHandler
}
// NewEndpointsConfig creates a new EndpointsConfig.
func NewEndpointsConfig(endpointsInformer coreinformers.EndpointsInformer, resyncPeriod time.Duration) *EndpointsConfig {
result := &EndpointsConfig{
listerSynced: endpointsInformer.Informer().HasSynced,
}
endpointsInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: result.handleAddEndpoints,
UpdateFunc: result.handleUpdateEndpoints,
DeleteFunc: result.handleDeleteEndpoints,
},
resyncPeriod,
)
return result
}
// RegisterEventHandler registers a handler which is called on every endpoints change.
func (c *EndpointsConfig) RegisterEventHandler(handler EndpointsHandler) {
c.eventHandlers = append(c.eventHandlers, handler)
}
// Run waits for cache synced and invokes handlers after syncing.
func (c *EndpointsConfig) Run(stopCh <-chan struct{}) {
klog.InfoS("Starting endpoints config controller")
if !cache.WaitForNamedCacheSync("endpoints config", stopCh, c.listerSynced) {
return
}
for i := range c.eventHandlers {
klog.V(3).InfoS("Calling handler.OnEndpointsSynced()")
c.eventHandlers[i].OnEndpointsSynced()
}
}
func (c *EndpointsConfig) handleAddEndpoints(obj interface{}) {
endpoints, ok := obj.(*v1.Endpoints)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
return
}
for i := range c.eventHandlers {
klog.V(4).InfoS("Calling handler.OnEndpointsAdd")
c.eventHandlers[i].OnEndpointsAdd(endpoints)
}
}
func (c *EndpointsConfig) handleUpdateEndpoints(oldObj, newObj interface{}) {
oldEndpoints, ok := oldObj.(*v1.Endpoints)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
return
}
endpoints, ok := newObj.(*v1.Endpoints)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
return
}
for i := range c.eventHandlers {
klog.V(4).InfoS("Calling handler.OnEndpointsUpdate")
c.eventHandlers[i].OnEndpointsUpdate(oldEndpoints, endpoints)
}
}
func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) {
endpoints, ok := obj.(*v1.Endpoints)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
return
}
if endpoints, ok = tombstone.Obj.(*v1.Endpoints); !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
return
}
}
for i := range c.eventHandlers {
klog.V(4).InfoS("Calling handler.OnEndpointsDelete")
c.eventHandlers[i].OnEndpointsDelete(endpoints)
}
}
// EndpointSliceConfig tracks a set of endpoints configurations. // EndpointSliceConfig tracks a set of endpoints configurations.
type EndpointSliceConfig struct { type EndpointSliceConfig struct {
listerSynced cache.InformerSynced listerSynced cache.InformerSynced

View File

@ -24,6 +24,7 @@ import (
"time" "time"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
@ -31,6 +32,7 @@ import (
informers "k8s.io/client-go/informers" informers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
ktesting "k8s.io/client-go/testing" ktesting "k8s.io/client-go/testing"
utilpointer "k8s.io/utils/pointer"
) )
type sortedServices []*v1.Service type sortedServices []*v1.Service
@ -128,96 +130,96 @@ func (h *ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []*
} }
} }
type sortedEndpoints []*v1.Endpoints type sortedEndpointSlices []*discoveryv1.EndpointSlice
func (s sortedEndpoints) Len() int { func (s sortedEndpointSlices) Len() int {
return len(s) return len(s)
} }
func (s sortedEndpoints) Swap(i, j int) { func (s sortedEndpointSlices) Swap(i, j int) {
s[i], s[j] = s[j], s[i] s[i], s[j] = s[j], s[i]
} }
func (s sortedEndpoints) Less(i, j int) bool { func (s sortedEndpointSlices) Less(i, j int) bool {
return s[i].Name < s[j].Name return s[i].Name < s[j].Name
} }
type EndpointsHandlerMock struct { type EndpointSliceHandlerMock struct {
lock sync.Mutex lock sync.Mutex
state map[types.NamespacedName]*v1.Endpoints state map[types.NamespacedName]*discoveryv1.EndpointSlice
synced bool synced bool
updated chan []*v1.Endpoints updated chan []*discoveryv1.EndpointSlice
process func([]*v1.Endpoints) process func([]*discoveryv1.EndpointSlice)
} }
func NewEndpointsHandlerMock() *EndpointsHandlerMock { func NewEndpointSliceHandlerMock() *EndpointSliceHandlerMock {
ehm := &EndpointsHandlerMock{ ehm := &EndpointSliceHandlerMock{
state: make(map[types.NamespacedName]*v1.Endpoints), state: make(map[types.NamespacedName]*discoveryv1.EndpointSlice),
updated: make(chan []*v1.Endpoints, 5), updated: make(chan []*discoveryv1.EndpointSlice, 5),
} }
ehm.process = func(endpoints []*v1.Endpoints) { ehm.process = func(endpoints []*discoveryv1.EndpointSlice) {
ehm.updated <- endpoints ehm.updated <- endpoints
} }
return ehm return ehm
} }
func (h *EndpointsHandlerMock) OnEndpointsAdd(endpoints *v1.Endpoints) { func (h *EndpointSliceHandlerMock) OnEndpointSliceAdd(slice *discoveryv1.EndpointSlice) {
h.lock.Lock() h.lock.Lock()
defer h.lock.Unlock() defer h.lock.Unlock()
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} namespacedName := types.NamespacedName{Namespace: slice.Namespace, Name: slice.Name}
h.state[namespacedName] = endpoints h.state[namespacedName] = slice
h.sendEndpoints() h.sendEndpointSlices()
} }
func (h *EndpointsHandlerMock) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) { func (h *EndpointSliceHandlerMock) OnEndpointSliceUpdate(oldSlice, slice *discoveryv1.EndpointSlice) {
h.lock.Lock() h.lock.Lock()
defer h.lock.Unlock() defer h.lock.Unlock()
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} namespacedName := types.NamespacedName{Namespace: slice.Namespace, Name: slice.Name}
h.state[namespacedName] = endpoints h.state[namespacedName] = slice
h.sendEndpoints() h.sendEndpointSlices()
} }
func (h *EndpointsHandlerMock) OnEndpointsDelete(endpoints *v1.Endpoints) { func (h *EndpointSliceHandlerMock) OnEndpointSliceDelete(slice *discoveryv1.EndpointSlice) {
h.lock.Lock() h.lock.Lock()
defer h.lock.Unlock() defer h.lock.Unlock()
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} namespacedName := types.NamespacedName{Namespace: slice.Namespace, Name: slice.Name}
delete(h.state, namespacedName) delete(h.state, namespacedName)
h.sendEndpoints() h.sendEndpointSlices()
} }
func (h *EndpointsHandlerMock) OnEndpointsSynced() { func (h *EndpointSliceHandlerMock) OnEndpointSlicesSynced() {
h.lock.Lock() h.lock.Lock()
defer h.lock.Unlock() defer h.lock.Unlock()
h.synced = true h.synced = true
h.sendEndpoints() h.sendEndpointSlices()
} }
func (h *EndpointsHandlerMock) sendEndpoints() { func (h *EndpointSliceHandlerMock) sendEndpointSlices() {
if !h.synced { if !h.synced {
return return
} }
endpoints := make([]*v1.Endpoints, 0, len(h.state)) slices := make([]*discoveryv1.EndpointSlice, 0, len(h.state))
for _, eps := range h.state { for _, eps := range h.state {
endpoints = append(endpoints, eps) slices = append(slices, eps)
} }
sort.Sort(sortedEndpoints(endpoints)) sort.Sort(sortedEndpointSlices(slices))
h.process(endpoints) h.process(slices)
} }
func (h *EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints []*v1.Endpoints) { func (h *EndpointSliceHandlerMock) ValidateEndpointSlices(t *testing.T, expectedSlices []*discoveryv1.EndpointSlice) {
// We might get 1 or more updates for N endpoint updates, because we // We might get 1 or more updates for N endpointslice updates, because we
// over write older snapshots of endpoints from the producer go-routine // over write older snapshots of endpointslices from the producer go-routine
// if the consumer falls behind. Unittests will hard timeout in 5m. // if the consumer falls behind. Unittests will hard timeout in 5m.
var endpoints []*v1.Endpoints var slices []*discoveryv1.EndpointSlice
for { for {
select { select {
case endpoints = <-h.updated: case slices = <-h.updated:
if reflect.DeepEqual(endpoints, expectedEndpoints) { if reflect.DeepEqual(slices, expectedSlices) {
return return
} }
// Unittests will hard timeout in 5m with a stack trace, prevent that // Unittests will hard timeout in 5m with a stack trace, prevent that
// and surface a clearer reason for failure. // and surface a clearer reason for failure.
case <-time.After(wait.ForeverTestTimeout): case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Timed out. Expected %#v, Got %#v", expectedEndpoints, endpoints) t.Errorf("Timed out. Expected %#v, Got %#v", expectedSlices, slices)
return return
} }
} }
@ -320,113 +322,129 @@ func TestNewServicesMultipleHandlersAddedAndNotified(t *testing.T) {
func TestNewEndpointsMultipleHandlersAddedAndNotified(t *testing.T) { func TestNewEndpointsMultipleHandlersAddedAndNotified(t *testing.T) {
client := fake.NewSimpleClientset() client := fake.NewSimpleClientset()
fakeWatch := watch.NewFake() fakeWatch := watch.NewFake()
client.PrependWatchReactor("endpoints", ktesting.DefaultWatchReactor(fakeWatch, nil)) client.PrependWatchReactor("endpointslices", ktesting.DefaultWatchReactor(fakeWatch, nil))
stopCh := make(chan struct{}) stopCh := make(chan struct{})
defer close(stopCh) defer close(stopCh)
sharedInformers := informers.NewSharedInformerFactory(client, time.Minute) sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
config := NewEndpointsConfig(sharedInformers.Core().V1().Endpoints(), time.Minute) config := NewEndpointSliceConfig(sharedInformers.Discovery().V1().EndpointSlices(), time.Minute)
handler := NewEndpointsHandlerMock() handler := NewEndpointSliceHandlerMock()
handler2 := NewEndpointsHandlerMock() handler2 := NewEndpointSliceHandlerMock()
config.RegisterEventHandler(handler) config.RegisterEventHandler(handler)
config.RegisterEventHandler(handler2) config.RegisterEventHandler(handler2)
go sharedInformers.Start(stopCh) go sharedInformers.Start(stopCh)
go config.Run(stopCh) go config.Run(stopCh)
endpoints1 := &v1.Endpoints{ endpoints1 := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Subsets: []v1.EndpointSubset{{ AddressType: discoveryv1.AddressTypeIPv4,
Addresses: []v1.EndpointAddress{{IP: "1.1.1.1"}, {IP: "2.2.2.2"}}, Endpoints: []discoveryv1.Endpoint{{
Ports: []v1.EndpointPort{{Port: 80}}, Addresses: []string{"1.1.1.1"},
}, {
Addresses: []string{"2.2.2.2"},
}}, }},
Ports: []discoveryv1.EndpointPort{{Port: utilpointer.Int32(80)}},
} }
endpoints2 := &v1.Endpoints{ endpoints2 := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Subsets: []v1.EndpointSubset{{ AddressType: discoveryv1.AddressTypeIPv4,
Addresses: []v1.EndpointAddress{{IP: "3.3.3.3"}, {IP: "4.4.4.4"}}, Endpoints: []discoveryv1.Endpoint{{
Ports: []v1.EndpointPort{{Port: 80}}, Addresses: []string{"3.3.3.3"},
}, {
Addresses: []string{"4.4.4.4"},
}}, }},
Ports: []discoveryv1.EndpointPort{{Port: utilpointer.Int32(80)}},
} }
fakeWatch.Add(endpoints1) fakeWatch.Add(endpoints1)
fakeWatch.Add(endpoints2) fakeWatch.Add(endpoints2)
endpoints := []*v1.Endpoints{endpoints2, endpoints1} endpoints := []*discoveryv1.EndpointSlice{endpoints2, endpoints1}
handler.ValidateEndpoints(t, endpoints) handler.ValidateEndpointSlices(t, endpoints)
handler2.ValidateEndpoints(t, endpoints) handler2.ValidateEndpointSlices(t, endpoints)
} }
func TestNewEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) { func TestNewEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) {
client := fake.NewSimpleClientset() client := fake.NewSimpleClientset()
fakeWatch := watch.NewFake() fakeWatch := watch.NewFake()
client.PrependWatchReactor("endpoints", ktesting.DefaultWatchReactor(fakeWatch, nil)) client.PrependWatchReactor("endpointslices", ktesting.DefaultWatchReactor(fakeWatch, nil))
stopCh := make(chan struct{}) stopCh := make(chan struct{})
defer close(stopCh) defer close(stopCh)
sharedInformers := informers.NewSharedInformerFactory(client, time.Minute) sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
config := NewEndpointsConfig(sharedInformers.Core().V1().Endpoints(), time.Minute) config := NewEndpointSliceConfig(sharedInformers.Discovery().V1().EndpointSlices(), time.Minute)
handler := NewEndpointsHandlerMock() handler := NewEndpointSliceHandlerMock()
handler2 := NewEndpointsHandlerMock() handler2 := NewEndpointSliceHandlerMock()
config.RegisterEventHandler(handler) config.RegisterEventHandler(handler)
config.RegisterEventHandler(handler2) config.RegisterEventHandler(handler2)
go sharedInformers.Start(stopCh) go sharedInformers.Start(stopCh)
go config.Run(stopCh) go config.Run(stopCh)
endpoints1 := &v1.Endpoints{ endpoints1 := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Subsets: []v1.EndpointSubset{{ AddressType: discoveryv1.AddressTypeIPv4,
Addresses: []v1.EndpointAddress{{IP: "1.1.1.1"}, {IP: "2.2.2.2"}}, Endpoints: []discoveryv1.Endpoint{{
Ports: []v1.EndpointPort{{Port: 80}}, Addresses: []string{"1.1.1.1"},
}, {
Addresses: []string{"2.2.2.2"},
}}, }},
Ports: []discoveryv1.EndpointPort{{Port: utilpointer.Int32(80)}},
} }
endpoints2 := &v1.Endpoints{ endpoints2 := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Subsets: []v1.EndpointSubset{{ AddressType: discoveryv1.AddressTypeIPv4,
Addresses: []v1.EndpointAddress{{IP: "3.3.3.3"}, {IP: "4.4.4.4"}}, Endpoints: []discoveryv1.Endpoint{{
Ports: []v1.EndpointPort{{Port: 80}}, Addresses: []string{"3.3.3.3"},
}, {
Addresses: []string{"4.4.4.4"},
}}, }},
Ports: []discoveryv1.EndpointPort{{Port: utilpointer.Int32(80)}},
} }
fakeWatch.Add(endpoints1) fakeWatch.Add(endpoints1)
fakeWatch.Add(endpoints2) fakeWatch.Add(endpoints2)
endpoints := []*v1.Endpoints{endpoints2, endpoints1} endpoints := []*discoveryv1.EndpointSlice{endpoints2, endpoints1}
handler.ValidateEndpoints(t, endpoints) handler.ValidateEndpointSlices(t, endpoints)
handler2.ValidateEndpoints(t, endpoints) handler2.ValidateEndpointSlices(t, endpoints)
// Add one more // Add one more
endpoints3 := &v1.Endpoints{ endpoints3 := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foobar"}, ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foobar"},
Subsets: []v1.EndpointSubset{{ AddressType: discoveryv1.AddressTypeIPv4,
Addresses: []v1.EndpointAddress{{IP: "5.5.5.5"}, {IP: "6.6.6.6"}}, Endpoints: []discoveryv1.Endpoint{{
Ports: []v1.EndpointPort{{Port: 80}}, Addresses: []string{"5.5.5.5"},
}, {
Addresses: []string{"6.6.6.6"},
}}, }},
Ports: []discoveryv1.EndpointPort{{Port: utilpointer.Int32(80)}},
} }
fakeWatch.Add(endpoints3) fakeWatch.Add(endpoints3)
endpoints = []*v1.Endpoints{endpoints2, endpoints1, endpoints3} endpoints = []*discoveryv1.EndpointSlice{endpoints2, endpoints1, endpoints3}
handler.ValidateEndpoints(t, endpoints) handler.ValidateEndpointSlices(t, endpoints)
handler2.ValidateEndpoints(t, endpoints) handler2.ValidateEndpointSlices(t, endpoints)
// Update the "foo" service with new endpoints // Update the "foo" service with new endpoints
endpoints1v2 := &v1.Endpoints{ endpoints1v2 := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Subsets: []v1.EndpointSubset{{ AddressType: discoveryv1.AddressTypeIPv4,
Addresses: []v1.EndpointAddress{{IP: "7.7.7.7"}}, Endpoints: []discoveryv1.Endpoint{{
Ports: []v1.EndpointPort{{Port: 80}}, Addresses: []string{"7.7.7.7"},
}}, }},
Ports: []discoveryv1.EndpointPort{{Port: utilpointer.Int32(80)}},
} }
fakeWatch.Modify(endpoints1v2) fakeWatch.Modify(endpoints1v2)
endpoints = []*v1.Endpoints{endpoints2, endpoints1v2, endpoints3} endpoints = []*discoveryv1.EndpointSlice{endpoints2, endpoints1v2, endpoints3}
handler.ValidateEndpoints(t, endpoints) handler.ValidateEndpointSlices(t, endpoints)
handler2.ValidateEndpoints(t, endpoints) handler2.ValidateEndpointSlices(t, endpoints)
// Remove "bar" endpoints // Remove "bar" endpoints
fakeWatch.Delete(endpoints2) fakeWatch.Delete(endpoints2)
endpoints = []*v1.Endpoints{endpoints1v2, endpoints3} endpoints = []*discoveryv1.EndpointSlice{endpoints1v2, endpoints3}
handler.ValidateEndpoints(t, endpoints) handler.ValidateEndpointSlices(t, endpoints)
handler2.ValidateEndpoints(t, endpoints) handler2.ValidateEndpointSlices(t, endpoints)
} }
// TODO: Add a unittest for interrupts getting processed in a timely manner. // TODO: Add a unittest for interrupts getting processed in a timely manner.