mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 04:06:03 +00:00
Introduce event handlers for Services in KubeProxy.
This commit is contained in:
parent
e91bd12b99
commit
e22476fd42
@ -18,7 +18,6 @@ package config
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"reflect"
|
"reflect"
|
||||||
"sort"
|
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -51,40 +50,34 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
|
|||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
|
|
||||||
ch := make(chan struct{})
|
handler := NewServiceHandlerMock()
|
||||||
handler := newSvcHandler(t, nil, func() { ch <- struct{}{} })
|
|
||||||
|
|
||||||
sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
|
sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
|
||||||
|
|
||||||
serviceConfig := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute)
|
serviceConfig := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute)
|
||||||
serviceConfig.RegisterHandler(handler)
|
serviceConfig.RegisterEventHandler(handler)
|
||||||
go sharedInformers.Start(stopCh)
|
go sharedInformers.Start(stopCh)
|
||||||
go serviceConfig.Run(stopCh)
|
go serviceConfig.Run(stopCh)
|
||||||
|
|
||||||
// Add the first service
|
// Add the first service
|
||||||
handler.expected = []*api.Service{service1v1}
|
|
||||||
fakeWatch.Add(service1v1)
|
fakeWatch.Add(service1v1)
|
||||||
<-ch
|
handler.ValidateServices(t, []*api.Service{service1v1})
|
||||||
|
|
||||||
// Add another service
|
// Add another service
|
||||||
handler.expected = []*api.Service{service1v1, service2}
|
|
||||||
fakeWatch.Add(service2)
|
fakeWatch.Add(service2)
|
||||||
<-ch
|
handler.ValidateServices(t, []*api.Service{service1v1, service2})
|
||||||
|
|
||||||
// Modify service1
|
// Modify service1
|
||||||
handler.expected = []*api.Service{service1v2, service2}
|
|
||||||
fakeWatch.Modify(service1v2)
|
fakeWatch.Modify(service1v2)
|
||||||
<-ch
|
handler.ValidateServices(t, []*api.Service{service1v2, service2})
|
||||||
|
|
||||||
// Delete service1
|
// Delete service1
|
||||||
handler.expected = []*api.Service{service2}
|
|
||||||
fakeWatch.Delete(service1v2)
|
fakeWatch.Delete(service1v2)
|
||||||
<-ch
|
handler.ValidateServices(t, []*api.Service{service2})
|
||||||
|
|
||||||
// Delete service2
|
// Delete service2
|
||||||
handler.expected = []*api.Service{}
|
|
||||||
fakeWatch.Delete(service2)
|
fakeWatch.Delete(service2)
|
||||||
<-ch
|
handler.ValidateServices(t, []*api.Service{})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
|
func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
|
||||||
@ -155,22 +148,17 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
|
|||||||
handler.ValidateEndpoints(t, []*api.Endpoints{})
|
handler.ValidateEndpoints(t, []*api.Endpoints{})
|
||||||
}
|
}
|
||||||
|
|
||||||
type svcHandler struct {
|
func newSvcHandler(t *testing.T, svcs []*api.Service, done func()) ServiceHandler {
|
||||||
t *testing.T
|
shm := &ServiceHandlerMock{
|
||||||
expected []*api.Service
|
state: make(map[types.NamespacedName]*api.Service),
|
||||||
done func()
|
|
||||||
}
|
|
||||||
|
|
||||||
func newSvcHandler(t *testing.T, svcs []*api.Service, done func()) *svcHandler {
|
|
||||||
return &svcHandler{t: t, expected: svcs, done: done}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *svcHandler) OnServiceUpdate(services []*api.Service) {
|
|
||||||
defer s.done()
|
|
||||||
sort.Sort(sortedServices(services))
|
|
||||||
if !reflect.DeepEqual(s.expected, services) {
|
|
||||||
s.t.Errorf("Unexpected services: %#v, expected: %#v", services, s.expected)
|
|
||||||
}
|
}
|
||||||
|
shm.process = func(services []*api.Service) {
|
||||||
|
defer done()
|
||||||
|
if !reflect.DeepEqual(services, svcs) {
|
||||||
|
t.Errorf("Unexpected services: %#v, expected: %#v", services, svcs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return shm
|
||||||
}
|
}
|
||||||
|
|
||||||
func newEpsHandler(t *testing.T, eps []*api.Endpoints, done func()) EndpointsHandler {
|
func newEpsHandler(t *testing.T, eps []*api.Endpoints, done func()) EndpointsHandler {
|
||||||
@ -213,7 +201,7 @@ func TestInitialSync(t *testing.T) {
|
|||||||
svcConfig := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), 0)
|
svcConfig := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), 0)
|
||||||
epsConfig := NewEndpointsConfig(sharedInformers.Core().InternalVersion().Endpoints(), 0)
|
epsConfig := NewEndpointsConfig(sharedInformers.Core().InternalVersion().Endpoints(), 0)
|
||||||
svcHandler := newSvcHandler(t, []*api.Service{svc2, svc1}, wg.Done)
|
svcHandler := newSvcHandler(t, []*api.Service{svc2, svc1}, wg.Done)
|
||||||
svcConfig.RegisterHandler(svcHandler)
|
svcConfig.RegisterEventHandler(svcHandler)
|
||||||
epsHandler := newEpsHandler(t, []*api.Endpoints{eps2, eps1}, wg.Done)
|
epsHandler := newEpsHandler(t, []*api.Endpoints{eps2, eps1}, wg.Done)
|
||||||
epsConfig.RegisterEventHandler(epsHandler)
|
epsConfig.RegisterEventHandler(epsHandler)
|
||||||
|
|
||||||
|
@ -32,6 +32,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// ServiceConfigHandler is an abstract interface of objects which receive update notifications for the set of services.
|
// ServiceConfigHandler is an abstract interface of objects which receive update notifications for the set of services.
|
||||||
|
// DEPRECATED: Use ServiceHandler instead - this will be removed soon.
|
||||||
type ServiceConfigHandler interface {
|
type ServiceConfigHandler interface {
|
||||||
// OnServiceUpdate gets called when a service is created, removed or changed
|
// OnServiceUpdate gets called when a service is created, removed or changed
|
||||||
// on any of the configuration sources. An example is when a new service
|
// on any of the configuration sources. An example is when a new service
|
||||||
@ -46,7 +47,24 @@ type ServiceConfigHandler interface {
|
|||||||
OnServiceUpdate(services []*api.Service)
|
OnServiceUpdate(services []*api.Service)
|
||||||
}
|
}
|
||||||
|
|
||||||
// EndpointsHandler is an abstract interface o objects which receive
|
// ServiceHandler is an abstract interface of objects whic receive
|
||||||
|
// notifications about service object changes.
|
||||||
|
type ServiceHandler interface {
|
||||||
|
// OnServiceAdd is called whenever creation of new service object
|
||||||
|
// is observed.
|
||||||
|
OnServiceAdd(service *api.Service)
|
||||||
|
// OnServiceUpdate is called whenever modification of an existing
|
||||||
|
// service object is observed.
|
||||||
|
OnServiceUpdate(oldService, service *api.Service)
|
||||||
|
// OnServiceDelete is called whenever deletion of an existing service
|
||||||
|
// object is observed.
|
||||||
|
OnServiceDelete(service *api.Service)
|
||||||
|
// OnServiceSynced is called once all the initial even handlers were
|
||||||
|
// called and the state is fully propagated to local cache.
|
||||||
|
OnServiceSynced()
|
||||||
|
}
|
||||||
|
|
||||||
|
// EndpointsHandler is an abstract interface of objects which receive
|
||||||
// notifications about endpoints object changes.
|
// notifications about endpoints object changes.
|
||||||
type EndpointsHandler interface {
|
type EndpointsHandler interface {
|
||||||
// OnEndpointsAdd is called whenever creation of new endpoints object
|
// OnEndpointsAdd is called whenever creation of new endpoints object
|
||||||
@ -157,7 +175,7 @@ func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
for i := range c.eventHandlers {
|
for i := range c.eventHandlers {
|
||||||
glog.V(4).Infof("Calling handler.OnEndpointsUpdate")
|
glog.V(4).Infof("Calling handler.OnEndpointsDelete")
|
||||||
c.eventHandlers[i].OnEndpointsDelete(endpoints)
|
c.eventHandlers[i].OnEndpointsDelete(endpoints)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -165,9 +183,11 @@ func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) {
|
|||||||
// ServiceConfig tracks a set of service configurations.
|
// ServiceConfig tracks a set of service configurations.
|
||||||
// It accepts "set", "add" and "remove" operations of services via channels, and invokes registered handlers on change.
|
// It accepts "set", "add" and "remove" operations of services via channels, and invokes registered handlers on change.
|
||||||
type ServiceConfig struct {
|
type ServiceConfig struct {
|
||||||
lister listers.ServiceLister
|
lister listers.ServiceLister
|
||||||
listerSynced cache.InformerSynced
|
listerSynced cache.InformerSynced
|
||||||
handlers []ServiceConfigHandler
|
eventHandlers []ServiceHandler
|
||||||
|
// TODO: Remove as soon as we migrate everything to event handlers.
|
||||||
|
handlers []ServiceConfigHandler
|
||||||
// updates channel is used to trigger registered handlers
|
// updates channel is used to trigger registered handlers
|
||||||
updates chan struct{}
|
updates chan struct{}
|
||||||
stop chan struct{}
|
stop chan struct{}
|
||||||
@ -199,10 +219,16 @@ func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPerio
|
|||||||
}
|
}
|
||||||
|
|
||||||
// RegisterHandler registers a handler which is called on every services change.
|
// RegisterHandler registers a handler which is called on every services change.
|
||||||
|
// DEPRECATED: Use RegisterEventHandler instead - this will be removed soon.
|
||||||
func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {
|
func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {
|
||||||
c.handlers = append(c.handlers, handler)
|
c.handlers = append(c.handlers, handler)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RegisterEventHandler registers a handler which is called on every service change.
|
||||||
|
func (c *ServiceConfig) RegisterEventHandler(handler ServiceHandler) {
|
||||||
|
c.eventHandlers = append(c.eventHandlers, handler)
|
||||||
|
}
|
||||||
|
|
||||||
// Run starts the goroutine responsible for calling
|
// Run starts the goroutine responsible for calling
|
||||||
// registered handlers.
|
// registered handlers.
|
||||||
func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
|
func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
|
||||||
@ -217,6 +243,10 @@ func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
|
|||||||
|
|
||||||
// We have synced informers. Now we can start delivering updates
|
// We have synced informers. Now we can start delivering updates
|
||||||
// to the registered handler.
|
// to the registered handler.
|
||||||
|
for i := range c.eventHandlers {
|
||||||
|
glog.V(3).Infof("Calling handler.OnServiceSynced()")
|
||||||
|
c.eventHandlers[i].OnServiceSynced()
|
||||||
|
}
|
||||||
go func() {
|
go func() {
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
for {
|
for {
|
||||||
@ -241,24 +271,60 @@ func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Close updates channel when stopCh is closed.
|
// Close updates channel when stopCh is closed.
|
||||||
go func() {
|
|
||||||
<-stopCh
|
|
||||||
close(c.stop)
|
|
||||||
}()
|
|
||||||
|
|
||||||
<-stopCh
|
<-stopCh
|
||||||
|
close(c.stop)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ServiceConfig) handleAddService(_ interface{}) {
|
func (c *ServiceConfig) handleAddService(obj interface{}) {
|
||||||
|
service, ok := obj.(*api.Service)
|
||||||
|
if !ok {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for i := range c.eventHandlers {
|
||||||
|
glog.V(4).Infof("Calling handler.OnServiceAdd")
|
||||||
|
c.eventHandlers[i].OnServiceAdd(service)
|
||||||
|
}
|
||||||
c.dispatchUpdate()
|
c.dispatchUpdate()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ServiceConfig) handleUpdateService(_, _ interface{}) {
|
func (c *ServiceConfig) handleUpdateService(oldObj, newObj interface{}) {
|
||||||
|
oldService, ok := oldObj.(*api.Service)
|
||||||
|
if !ok {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
service, ok := newObj.(*api.Service)
|
||||||
|
if !ok {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for i := range c.eventHandlers {
|
||||||
|
glog.V(4).Infof("Calling handler.OnServiceUpdate")
|
||||||
|
c.eventHandlers[i].OnServiceUpdate(oldService, service)
|
||||||
|
}
|
||||||
c.dispatchUpdate()
|
c.dispatchUpdate()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ServiceConfig) handleDeleteService(_ interface{}) {
|
func (c *ServiceConfig) handleDeleteService(obj interface{}) {
|
||||||
|
service, ok := obj.(*api.Service)
|
||||||
|
if !ok {
|
||||||
|
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||||
|
if !ok {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if service, ok = tombstone.Obj.(*api.Service); !ok {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for i := range c.eventHandlers {
|
||||||
|
glog.V(4).Infof("Calling handler.OnServiceDelete")
|
||||||
|
c.eventHandlers[i].OnServiceDelete(service)
|
||||||
|
}
|
||||||
c.dispatchUpdate()
|
c.dispatchUpdate()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -272,12 +338,3 @@ func (c *ServiceConfig) dispatchUpdate() {
|
|||||||
glog.V(4).Infof("Service handler already has a pending interrupt.")
|
glog.V(4).Infof("Service handler already has a pending interrupt.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// watchForUpdates invokes bcaster.Notify() with the latest version of an object
|
|
||||||
// when changes occur.
|
|
||||||
func watchForUpdates(bcaster *config.Broadcaster, accessor config.Accessor, updates <-chan struct{}) {
|
|
||||||
for true {
|
|
||||||
<-updates
|
|
||||||
bcaster.Notify(accessor.MergedState())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -46,16 +46,66 @@ func (s sortedServices) Less(i, j int) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type ServiceHandlerMock struct {
|
type ServiceHandlerMock struct {
|
||||||
|
lock sync.Mutex
|
||||||
|
|
||||||
|
state map[types.NamespacedName]*api.Service
|
||||||
|
synced bool
|
||||||
updated chan []*api.Service
|
updated chan []*api.Service
|
||||||
|
process func([]*api.Service)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewServiceHandlerMock() *ServiceHandlerMock {
|
func NewServiceHandlerMock() *ServiceHandlerMock {
|
||||||
return &ServiceHandlerMock{updated: make(chan []*api.Service, 5)}
|
shm := &ServiceHandlerMock{
|
||||||
|
state: make(map[types.NamespacedName]*api.Service),
|
||||||
|
updated: make(chan []*api.Service, 5),
|
||||||
|
}
|
||||||
|
shm.process = func(services []*api.Service) {
|
||||||
|
shm.updated <- services
|
||||||
|
}
|
||||||
|
return shm
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *ServiceHandlerMock) OnServiceUpdate(services []*api.Service) {
|
func (h *ServiceHandlerMock) OnServiceAdd(service *api.Service) {
|
||||||
|
h.lock.Lock()
|
||||||
|
defer h.lock.Unlock()
|
||||||
|
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||||
|
h.state[namespacedName] = service
|
||||||
|
h.sendServices()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *ServiceHandlerMock) OnServiceUpdate(oldService, service *api.Service) {
|
||||||
|
h.lock.Lock()
|
||||||
|
defer h.lock.Unlock()
|
||||||
|
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||||
|
h.state[namespacedName] = service
|
||||||
|
h.sendServices()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *ServiceHandlerMock) OnServiceDelete(service *api.Service) {
|
||||||
|
h.lock.Lock()
|
||||||
|
defer h.lock.Unlock()
|
||||||
|
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||||
|
delete(h.state, namespacedName)
|
||||||
|
h.sendServices()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *ServiceHandlerMock) OnServiceSynced() {
|
||||||
|
h.lock.Lock()
|
||||||
|
defer h.lock.Unlock()
|
||||||
|
h.synced = true
|
||||||
|
h.sendServices()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *ServiceHandlerMock) sendServices() {
|
||||||
|
if !h.synced {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
services := make([]*api.Service, 0, len(h.state))
|
||||||
|
for _, svc := range h.state {
|
||||||
|
services = append(services, svc)
|
||||||
|
}
|
||||||
sort.Sort(sortedServices(services))
|
sort.Sort(sortedServices(services))
|
||||||
h.updated <- services
|
h.process(services)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []*api.Service) {
|
func (h *ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []*api.Service) {
|
||||||
@ -185,7 +235,7 @@ func TestNewServiceAddedAndNotified(t *testing.T) {
|
|||||||
|
|
||||||
config := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute)
|
config := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute)
|
||||||
handler := NewServiceHandlerMock()
|
handler := NewServiceHandlerMock()
|
||||||
config.RegisterHandler(handler)
|
config.RegisterEventHandler(handler)
|
||||||
go sharedInformers.Start(stopCh)
|
go sharedInformers.Start(stopCh)
|
||||||
go config.Run(stopCh)
|
go config.Run(stopCh)
|
||||||
|
|
||||||
@ -209,7 +259,7 @@ func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
|
|||||||
|
|
||||||
config := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute)
|
config := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute)
|
||||||
handler := NewServiceHandlerMock()
|
handler := NewServiceHandlerMock()
|
||||||
config.RegisterHandler(handler)
|
config.RegisterEventHandler(handler)
|
||||||
go sharedInformers.Start(stopCh)
|
go sharedInformers.Start(stopCh)
|
||||||
go config.Run(stopCh)
|
go config.Run(stopCh)
|
||||||
|
|
||||||
@ -246,8 +296,8 @@ func TestNewServicesMultipleHandlersAddedAndNotified(t *testing.T) {
|
|||||||
config := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute)
|
config := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute)
|
||||||
handler := NewServiceHandlerMock()
|
handler := NewServiceHandlerMock()
|
||||||
handler2 := NewServiceHandlerMock()
|
handler2 := NewServiceHandlerMock()
|
||||||
config.RegisterHandler(handler)
|
config.RegisterEventHandler(handler)
|
||||||
config.RegisterHandler(handler2)
|
config.RegisterEventHandler(handler2)
|
||||||
go sharedInformers.Start(stopCh)
|
go sharedInformers.Start(stopCh)
|
||||||
go config.Run(stopCh)
|
go config.Run(stopCh)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user