diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go index 6d8d0a68f0b..afc42817d65 100644 --- a/pkg/proxy/config/api_test.go +++ b/pkg/proxy/config/api_test.go @@ -18,7 +18,6 @@ package config import ( "reflect" - "sort" "sync" "testing" "time" @@ -51,40 +50,34 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - ch := make(chan struct{}) - handler := newSvcHandler(t, nil, func() { ch <- struct{}{} }) + handler := NewServiceHandlerMock() sharedInformers := informers.NewSharedInformerFactory(client, time.Minute) serviceConfig := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute) - serviceConfig.RegisterHandler(handler) + serviceConfig.RegisterEventHandler(handler) go sharedInformers.Start(stopCh) go serviceConfig.Run(stopCh) // Add the first service - handler.expected = []*api.Service{service1v1} fakeWatch.Add(service1v1) - <-ch + handler.ValidateServices(t, []*api.Service{service1v1}) // Add another service - handler.expected = []*api.Service{service1v1, service2} fakeWatch.Add(service2) - <-ch + handler.ValidateServices(t, []*api.Service{service1v1, service2}) // Modify service1 - handler.expected = []*api.Service{service1v2, service2} fakeWatch.Modify(service1v2) - <-ch + handler.ValidateServices(t, []*api.Service{service1v2, service2}) // Delete service1 - handler.expected = []*api.Service{service2} fakeWatch.Delete(service1v2) - <-ch + handler.ValidateServices(t, []*api.Service{service2}) // Delete service2 - handler.expected = []*api.Service{} fakeWatch.Delete(service2) - <-ch + handler.ValidateServices(t, []*api.Service{}) } func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) { @@ -155,22 +148,17 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) { handler.ValidateEndpoints(t, []*api.Endpoints{}) } -type svcHandler struct { - t *testing.T - expected []*api.Service - done func() -} - -func newSvcHandler(t *testing.T, svcs []*api.Service, done func()) *svcHandler { - return &svcHandler{t: t, expected: svcs, done: done} -} - -func (s *svcHandler) OnServiceUpdate(services []*api.Service) { - defer s.done() - sort.Sort(sortedServices(services)) - if !reflect.DeepEqual(s.expected, services) { - s.t.Errorf("Unexpected services: %#v, expected: %#v", services, s.expected) +func newSvcHandler(t *testing.T, svcs []*api.Service, done func()) ServiceHandler { + shm := &ServiceHandlerMock{ + state: make(map[types.NamespacedName]*api.Service), } + shm.process = func(services []*api.Service) { + defer done() + if !reflect.DeepEqual(services, svcs) { + t.Errorf("Unexpected services: %#v, expected: %#v", services, svcs) + } + } + return shm } func newEpsHandler(t *testing.T, eps []*api.Endpoints, done func()) EndpointsHandler { @@ -213,7 +201,7 @@ func TestInitialSync(t *testing.T) { svcConfig := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), 0) epsConfig := NewEndpointsConfig(sharedInformers.Core().InternalVersion().Endpoints(), 0) svcHandler := newSvcHandler(t, []*api.Service{svc2, svc1}, wg.Done) - svcConfig.RegisterHandler(svcHandler) + svcConfig.RegisterEventHandler(svcHandler) epsHandler := newEpsHandler(t, []*api.Endpoints{eps2, eps1}, wg.Done) epsConfig.RegisterEventHandler(epsHandler) diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 89cc3c1b639..fbc1d41785e 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -32,6 +32,7 @@ import ( ) // ServiceConfigHandler is an abstract interface of objects which receive update notifications for the set of services. +// DEPRECATED: Use ServiceHandler instead - this will be removed soon. type ServiceConfigHandler interface { // OnServiceUpdate gets called when a service is created, removed or changed // on any of the configuration sources. An example is when a new service @@ -46,7 +47,24 @@ type ServiceConfigHandler interface { OnServiceUpdate(services []*api.Service) } -// EndpointsHandler is an abstract interface o objects which receive +// ServiceHandler is an abstract interface of objects whic receive +// notifications about service object changes. +type ServiceHandler interface { + // OnServiceAdd is called whenever creation of new service object + // is observed. + OnServiceAdd(service *api.Service) + // OnServiceUpdate is called whenever modification of an existing + // service object is observed. + OnServiceUpdate(oldService, service *api.Service) + // OnServiceDelete is called whenever deletion of an existing service + // object is observed. + OnServiceDelete(service *api.Service) + // OnServiceSynced is called once all the initial even handlers were + // called and the state is fully propagated to local cache. + OnServiceSynced() +} + +// EndpointsHandler is an abstract interface of objects which receive // notifications about endpoints object changes. type EndpointsHandler interface { // OnEndpointsAdd is called whenever creation of new endpoints object @@ -157,7 +175,7 @@ func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) { } } for i := range c.eventHandlers { - glog.V(4).Infof("Calling handler.OnEndpointsUpdate") + glog.V(4).Infof("Calling handler.OnEndpointsDelete") c.eventHandlers[i].OnEndpointsDelete(endpoints) } } @@ -165,9 +183,11 @@ func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) { // ServiceConfig tracks a set of service configurations. // It accepts "set", "add" and "remove" operations of services via channels, and invokes registered handlers on change. type ServiceConfig struct { - lister listers.ServiceLister - listerSynced cache.InformerSynced - handlers []ServiceConfigHandler + lister listers.ServiceLister + listerSynced cache.InformerSynced + eventHandlers []ServiceHandler + // TODO: Remove as soon as we migrate everything to event handlers. + handlers []ServiceConfigHandler // updates channel is used to trigger registered handlers updates chan struct{} stop chan struct{} @@ -199,10 +219,16 @@ func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPerio } // RegisterHandler registers a handler which is called on every services change. +// DEPRECATED: Use RegisterEventHandler instead - this will be removed soon. func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) { c.handlers = append(c.handlers, handler) } +// RegisterEventHandler registers a handler which is called on every service change. +func (c *ServiceConfig) RegisterEventHandler(handler ServiceHandler) { + c.eventHandlers = append(c.eventHandlers, handler) +} + // Run starts the goroutine responsible for calling // registered handlers. func (c *ServiceConfig) Run(stopCh <-chan struct{}) { @@ -217,6 +243,10 @@ func (c *ServiceConfig) Run(stopCh <-chan struct{}) { // We have synced informers. Now we can start delivering updates // to the registered handler. + for i := range c.eventHandlers { + glog.V(3).Infof("Calling handler.OnServiceSynced()") + c.eventHandlers[i].OnServiceSynced() + } go func() { defer utilruntime.HandleCrash() for { @@ -241,24 +271,60 @@ func (c *ServiceConfig) Run(stopCh <-chan struct{}) { } } }() + // Close updates channel when stopCh is closed. - go func() { - <-stopCh - close(c.stop) - }() - <-stopCh + close(c.stop) } -func (c *ServiceConfig) handleAddService(_ interface{}) { +func (c *ServiceConfig) handleAddService(obj interface{}) { + service, ok := obj.(*api.Service) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj)) + return + } + for i := range c.eventHandlers { + glog.V(4).Infof("Calling handler.OnServiceAdd") + c.eventHandlers[i].OnServiceAdd(service) + } c.dispatchUpdate() } -func (c *ServiceConfig) handleUpdateService(_, _ interface{}) { +func (c *ServiceConfig) handleUpdateService(oldObj, newObj interface{}) { + oldService, ok := oldObj.(*api.Service) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj)) + return + } + service, ok := newObj.(*api.Service) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj)) + return + } + for i := range c.eventHandlers { + glog.V(4).Infof("Calling handler.OnServiceUpdate") + c.eventHandlers[i].OnServiceUpdate(oldService, service) + } c.dispatchUpdate() } -func (c *ServiceConfig) handleDeleteService(_ interface{}) { +func (c *ServiceConfig) handleDeleteService(obj interface{}) { + service, ok := obj.(*api.Service) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj)) + return + } + if service, ok = tombstone.Obj.(*api.Service); !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj)) + return + } + } + for i := range c.eventHandlers { + glog.V(4).Infof("Calling handler.OnServiceDelete") + c.eventHandlers[i].OnServiceDelete(service) + } c.dispatchUpdate() } @@ -272,12 +338,3 @@ func (c *ServiceConfig) dispatchUpdate() { glog.V(4).Infof("Service handler already has a pending interrupt.") } } - -// watchForUpdates invokes bcaster.Notify() with the latest version of an object -// when changes occur. -func watchForUpdates(bcaster *config.Broadcaster, accessor config.Accessor, updates <-chan struct{}) { - for true { - <-updates - bcaster.Notify(accessor.MergedState()) - } -} diff --git a/pkg/proxy/config/config_test.go b/pkg/proxy/config/config_test.go index d4a9a370e07..c64072d2345 100644 --- a/pkg/proxy/config/config_test.go +++ b/pkg/proxy/config/config_test.go @@ -46,16 +46,66 @@ func (s sortedServices) Less(i, j int) bool { } type ServiceHandlerMock struct { + lock sync.Mutex + + state map[types.NamespacedName]*api.Service + synced bool updated chan []*api.Service + process func([]*api.Service) } func NewServiceHandlerMock() *ServiceHandlerMock { - return &ServiceHandlerMock{updated: make(chan []*api.Service, 5)} + shm := &ServiceHandlerMock{ + state: make(map[types.NamespacedName]*api.Service), + updated: make(chan []*api.Service, 5), + } + shm.process = func(services []*api.Service) { + shm.updated <- services + } + return shm } -func (h *ServiceHandlerMock) OnServiceUpdate(services []*api.Service) { +func (h *ServiceHandlerMock) OnServiceAdd(service *api.Service) { + h.lock.Lock() + defer h.lock.Unlock() + namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + h.state[namespacedName] = service + h.sendServices() +} + +func (h *ServiceHandlerMock) OnServiceUpdate(oldService, service *api.Service) { + h.lock.Lock() + defer h.lock.Unlock() + namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + h.state[namespacedName] = service + h.sendServices() +} + +func (h *ServiceHandlerMock) OnServiceDelete(service *api.Service) { + h.lock.Lock() + defer h.lock.Unlock() + namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + delete(h.state, namespacedName) + h.sendServices() +} + +func (h *ServiceHandlerMock) OnServiceSynced() { + h.lock.Lock() + defer h.lock.Unlock() + h.synced = true + h.sendServices() +} + +func (h *ServiceHandlerMock) sendServices() { + if !h.synced { + return + } + services := make([]*api.Service, 0, len(h.state)) + for _, svc := range h.state { + services = append(services, svc) + } sort.Sort(sortedServices(services)) - h.updated <- services + h.process(services) } func (h *ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []*api.Service) { @@ -185,7 +235,7 @@ func TestNewServiceAddedAndNotified(t *testing.T) { config := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute) handler := NewServiceHandlerMock() - config.RegisterHandler(handler) + config.RegisterEventHandler(handler) go sharedInformers.Start(stopCh) go config.Run(stopCh) @@ -209,7 +259,7 @@ func TestServiceAddedRemovedSetAndNotified(t *testing.T) { config := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute) handler := NewServiceHandlerMock() - config.RegisterHandler(handler) + config.RegisterEventHandler(handler) go sharedInformers.Start(stopCh) go config.Run(stopCh) @@ -246,8 +296,8 @@ func TestNewServicesMultipleHandlersAddedAndNotified(t *testing.T) { config := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute) handler := NewServiceHandlerMock() handler2 := NewServiceHandlerMock() - config.RegisterHandler(handler) - config.RegisterHandler(handler2) + config.RegisterEventHandler(handler) + config.RegisterEventHandler(handler2) go sharedInformers.Start(stopCh) go config.Run(stopCh)