From 7ce368ccd2566781586f6f2ea8fc33a95f6ed9fe Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Fri, 17 Mar 2017 15:15:51 +0100 Subject: [PATCH] Simplify proxy config for Services by removing Mux. --- cmd/kube-proxy/app/server.go | 9 +- cmd/kubemark/hollow-node.go | 2 +- pkg/kubemark/hollow_proxy.go | 8 +- pkg/proxy/config/BUILD | 5 -- pkg/proxy/config/api.go | 109 ---------------------- pkg/proxy/config/api_test.go | 70 +++++---------- pkg/proxy/config/config.go | 154 ++++++++++++++++---------------- pkg/proxy/config/config_test.go | 119 ++++++++++++------------ 8 files changed, 157 insertions(+), 319 deletions(-) delete mode 100644 pkg/proxy/config/api.go diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index b06af3a3468..6272b23d88d 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -305,19 +305,14 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err // Note: RegisterHandler() calls need to happen before creation of Sources because sources // only notify on changes, and the initial update (on process start) may be lost if no handlers // are registered yet. - serviceConfig := proxyconfig.NewServiceConfig() + serviceConfig := proxyconfig.NewServiceConfig(client.Core().RESTClient(), config.ConfigSyncPeriod) serviceConfig.RegisterHandler(proxier) + go serviceConfig.Run(wait.NeverStop) endpointsConfig := proxyconfig.NewEndpointsConfig(client.Core().RESTClient(), config.ConfigSyncPeriod) endpointsConfig.RegisterHandler(endpointsHandler) go endpointsConfig.Run(wait.NeverStop) - proxyconfig.NewSourceAPI( - client.Core().RESTClient(), - config.ConfigSyncPeriod, - serviceConfig.Channel("api"), - ) - config.NodeRef = &clientv1.ObjectReference{ Kind: "Node", Name: hostname, diff --git a/cmd/kubemark/hollow-node.go b/cmd/kubemark/hollow-node.go index 6295707982e..d55fe31a592 100644 --- a/cmd/kubemark/hollow-node.go +++ b/cmd/kubemark/hollow-node.go @@ -136,7 +136,7 @@ func main() { iptInterface := fakeiptables.NewFake() - serviceConfig := proxyconfig.NewServiceConfig() + serviceConfig := proxyconfig.NewServiceConfig(internalClientset.Core().RESTClient(), 15*time.Minute) serviceConfig.RegisterHandler(&kubemark.FakeProxyHandler{}) endpointsConfig := proxyconfig.NewEndpointsConfig(internalClientset.Core().RESTClient(), 15*time.Minute) diff --git a/pkg/kubemark/hollow_proxy.go b/pkg/kubemark/hollow_proxy.go index 5b0ddf9e66b..6cee72f2b40 100644 --- a/pkg/kubemark/hollow_proxy.go +++ b/pkg/kubemark/hollow_proxy.go @@ -17,8 +17,6 @@ limitations under the License. package kubemark import ( - "time" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" v1core "k8s.io/client-go/kubernetes/typed/core/v1" @@ -74,11 +72,7 @@ func NewHollowProxyOrDie( } go endpointsConfig.Run(wait.NeverStop) - proxyconfig.NewSourceAPI( - client.Core().RESTClient(), - 15*time.Minute, - serviceConfig.Channel("api"), - ) + go serviceConfig.Run(wait.NeverStop) hollowProxy, err := proxyapp.NewProxyServer(client, eventClient, config, iptInterface, &FakeProxier{}, broadcaster, recorder, nil, "fake") if err != nil { diff --git a/pkg/proxy/config/BUILD b/pkg/proxy/config/BUILD index 652099798c0..52be6161d04 100644 --- a/pkg/proxy/config/BUILD +++ b/pkg/proxy/config/BUILD @@ -11,7 +11,6 @@ load( go_library( name = "go_default_library", srcs = [ - "api.go", "config.go", "doc.go", ], @@ -20,14 +19,11 @@ go_library( "//pkg/api:go_default_library", "//pkg/client/listers/core/internalversion:go_default_library", "//pkg/util/config:go_default_library", - "//vendor:github.com/davecgh/go-spew/spew", "//vendor:github.com/golang/glog", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/fields", "//vendor:k8s.io/apimachinery/pkg/labels", - "//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/apimachinery/pkg/util/runtime", - "//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/client-go/tools/cache", ], ) @@ -42,7 +38,6 @@ go_test( tags = ["automanaged"], deps = [ "//pkg/api:go_default_library", - "//vendor:k8s.io/apimachinery/pkg/api/equality", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/util/wait", diff --git a/pkg/proxy/config/api.go b/pkg/proxy/config/api.go deleted file mode 100644 index 9c6d629e228..00000000000 --- a/pkg/proxy/config/api.go +++ /dev/null @@ -1,109 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package config - -import ( - "fmt" - "time" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/tools/cache" - "k8s.io/kubernetes/pkg/api" -) - -// NewSourceAPI creates config source that watches for changes to the services and endpoints. -func NewSourceAPI(c cache.Getter, period time.Duration, servicesChan chan<- ServiceUpdate) { - servicesLW := cache.NewListWatchFromClient(c, "services", metav1.NamespaceAll, fields.Everything()) - newSourceAPI(servicesLW, period, servicesChan, wait.NeverStop) -} - -func newSourceAPI( - servicesLW cache.ListerWatcher, - period time.Duration, - servicesChan chan<- ServiceUpdate, - stopCh <-chan struct{}) { - serviceController := NewServiceController(servicesLW, period, servicesChan) - go serviceController.Run(stopCh) - - if !cache.WaitForCacheSync(stopCh, serviceController.HasSynced) { - utilruntime.HandleError(fmt.Errorf("source controllers not synced")) - return - } - servicesChan <- ServiceUpdate{Op: SYNCED} -} - -func sendAddService(servicesChan chan<- ServiceUpdate) func(obj interface{}) { - return func(obj interface{}) { - service, ok := obj.(*api.Service) - if !ok { - utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Service: %v", obj)) - return - } - servicesChan <- ServiceUpdate{Op: ADD, Service: service} - } -} - -func sendUpdateService(servicesChan chan<- ServiceUpdate) func(oldObj, newObj interface{}) { - return func(_, newObj interface{}) { - service, ok := newObj.(*api.Service) - if !ok { - utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Service: %v", newObj)) - return - } - servicesChan <- ServiceUpdate{Op: UPDATE, Service: service} - } -} - -func sendDeleteService(servicesChan chan<- ServiceUpdate) func(obj interface{}) { - return func(obj interface{}) { - var service *api.Service - switch t := obj.(type) { - case *api.Service: - service = t - case cache.DeletedFinalStateUnknown: - var ok bool - service, ok = t.Obj.(*api.Service) - if !ok { - utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Service: %v", t.Obj)) - return - } - default: - utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Service: %v", t)) - return - } - servicesChan <- ServiceUpdate{Op: REMOVE, Service: service} - } -} - -// NewServiceController creates a controller that is watching services and sending -// updates into ServiceUpdate channel. -func NewServiceController(lw cache.ListerWatcher, period time.Duration, ch chan<- ServiceUpdate) cache.Controller { - _, serviceController := cache.NewInformer( - lw, - &api.Service{}, - period, - cache.ResourceEventHandlerFuncs{ - AddFunc: sendAddService(ch), - UpdateFunc: sendUpdateService(ch), - DeleteFunc: sendDeleteService(ch), - }, - ) - return serviceController -} diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go index b90e69a2091..bc55748b9cf 100644 --- a/pkg/proxy/config/api_test.go +++ b/pkg/proxy/config/api_test.go @@ -23,10 +23,8 @@ import ( "testing" "time" - apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api" @@ -65,68 +63,40 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) { watchResp: fakeWatch, } - ch := make(chan ServiceUpdate) + stopCh := make(chan struct{}) + defer close(stopCh) - serviceController := NewServiceController(lw, 30*time.Second, ch) - go serviceController.Run(wait.NeverStop) + ch := make(chan struct{}) + handler := newSvcHandler(t, nil, func() { ch <- struct{}{} }) + + serviceConfig := newServiceConfig(lw, time.Minute) + serviceConfig.RegisterHandler(handler) + go serviceConfig.Run(stopCh) // Add the first service + handler.expected = []api.Service{*service1v1} fakeWatch.Add(service1v1) - got, ok := <-ch - if !ok { - t.Errorf("Unable to read from channel when expected") - } - expected := ServiceUpdate{Op: ADD, Service: service1v1} - if !apiequality.Semantic.DeepEqual(expected, got) { - t.Errorf("Expected %#v; Got %#v", expected, got) - } + <-ch // Add another service + handler.expected = []api.Service{*service1v1, *service2} fakeWatch.Add(service2) - got, ok = <-ch - if !ok { - t.Errorf("Unable to read from channel when expected") - } - // Could be sorted either of these two ways: - expected = ServiceUpdate{Op: ADD, Service: service2} - - if !apiequality.Semantic.DeepEqual(expected, got) { - t.Errorf("Expected %#v, Got %#v", expected, got) - } + <-ch // Modify service1 + handler.expected = []api.Service{*service1v2, *service2} fakeWatch.Modify(service1v2) - got, ok = <-ch - if !ok { - t.Errorf("Unable to read from channel when expected") - } - expected = ServiceUpdate{Op: UPDATE, Service: service1v2} - - if !apiequality.Semantic.DeepEqual(expected, got) { - t.Errorf("Expected %#v, Got %#v", expected, got) - } + <-ch // Delete service1 + handler.expected = []api.Service{*service2} fakeWatch.Delete(service1v2) - got, ok = <-ch - if !ok { - t.Errorf("Unable to read from channel when expected") - } - expected = ServiceUpdate{Op: REMOVE, Service: service1v2} - if !apiequality.Semantic.DeepEqual(expected, got) { - t.Errorf("Expected %#v, Got %#v", expected, got) - } + <-ch // Delete service2 + handler.expected = []api.Service{} fakeWatch.Delete(service2) - got, ok = <-ch - if !ok { - t.Errorf("Unable to read from channel when expected") - } - expected = ServiceUpdate{Op: REMOVE, Service: service2} - if !apiequality.Semantic.DeepEqual(expected, got) { - t.Errorf("Expected %#v, Got %#v", expected, got) - } + <-ch } func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) { @@ -270,7 +240,7 @@ func TestInitialSync(t *testing.T) { watchResp: fakeEpsWatch, } - svcConfig := NewServiceConfig() + svcConfig := newServiceConfig(svcLW, time.Minute) epsConfig := newEndpointsConfig(epsLW, time.Minute) svcHandler := newSvcHandler(t, []api.Service{*svc2, *svc1}, wg.Done) svcConfig.RegisterHandler(svcHandler) @@ -279,7 +249,7 @@ func TestInitialSync(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) + go svcConfig.Run(stopCh) go epsConfig.Run(stopCh) - newSourceAPI(svcLW, time.Minute, svcConfig.Channel("one"), stopCh) wg.Wait() } diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 6cb454a92d1..2dddefa0e25 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -18,15 +18,12 @@ package config import ( "fmt" - "sync" "time" - "github.com/davecgh/go-spew/spew" "github.com/golang/glog" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api" @@ -183,102 +180,101 @@ func (c *EndpointsConfig) dispatchUpdate() { // ServiceConfig tracks a set of service configurations. // It accepts "set", "add" and "remove" operations of services via channels, and invokes registered handlers on change. type ServiceConfig struct { - mux *config.Mux - bcaster *config.Broadcaster - store *serviceStore + informer cache.Controller + lister listers.ServiceLister + handlers []ServiceConfigHandler + // updates channel is used to trigger registered handlers + updates chan struct{} } // NewServiceConfig creates a new ServiceConfig. -// It immediately runs the created ServiceConfig. -func NewServiceConfig() *ServiceConfig { - // The updates channel is used to send interrupts to the Services handler. - // It's buffered because we never want to block for as long as there is a - // pending interrupt, but don't want to drop them if the handler is doing - // work. - updates := make(chan struct{}, 1) - store := &serviceStore{updates: updates, services: make(map[string]map[types.NamespacedName]*api.Service)} - mux := config.NewMux(store) - bcaster := config.NewBroadcaster() - go watchForUpdates(bcaster, store, updates) - return &ServiceConfig{mux, bcaster, store} +func NewServiceConfig(c cache.Getter, period time.Duration) *ServiceConfig { + servicesLW := cache.NewListWatchFromClient(c, "services", metav1.NamespaceAll, fields.Everything()) + return newServiceConfig(servicesLW, period) +} + +func newServiceConfig(lw cache.ListerWatcher, period time.Duration) *ServiceConfig { + result := &ServiceConfig{} + + store, informer := cache.NewIndexerInformer( + lw, + &api.Service{}, + period, + cache.ResourceEventHandlerFuncs{ + AddFunc: result.handleAddService, + UpdateFunc: result.handleUpdateService, + DeleteFunc: result.handleDeleteService, + }, + cache.Indexers{}, + ) + result.informer = informer + result.lister = listers.NewServiceLister(store) + return result } // RegisterHandler registers a handler which is called on every services change. func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) { - c.bcaster.Add(config.ListenerFunc(func(instance interface{}) { - glog.V(3).Infof("Calling handler.OnServiceUpdate()") - handler.OnServiceUpdate(instance.([]api.Service)) - })) + c.handlers = append(c.handlers, handler) } -// Channel returns a channel to which services updates should be delivered. -func (c *ServiceConfig) Channel(source string) chan ServiceUpdate { - ch := c.mux.Channel(source) - serviceCh := make(chan ServiceUpdate) +func (c *ServiceConfig) Run(stopCh <-chan struct{}) { + // The updates channel is used to send interrupts to the Services handler. + // It's buffered because we never want to block for as long as there is a + // pending interrupt, but don't want to drop them if the handler is doing + // work. + c.updates = make(chan struct{}, 1) + go c.informer.Run(stopCh) + if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) { + utilruntime.HandleError(fmt.Errorf("service controller not synced")) + return + } + + // We hanve synced informers. Now we can start delivering updates + // to the registered handler. go func() { - for update := range serviceCh { - ch <- update + for range c.updates { + services, err := c.lister.List(labels.Everything()) + if err != nil { + glog.Errorf("Error while listing services from cache: %v", err) + // This will cause a retry (if there isnt' any other trigger in-flight). + c.dispatchUpdate() + continue + } + svcs := make([]api.Service, 0, len(services)) + for i := range services { + svcs = append(svcs, *services[i]) + } + for i := range c.handlers { + glog.V(3).Infof("Calling handler.OnServiceUpdate()") + c.handlers[i].OnServiceUpdate(svcs) + } } }() - return serviceCh + // Close updates channel when stopCh is closed. + go func() { + <-stopCh + close(c.updates) + }() } -// Config returns list of all services from underlying store. -func (c *ServiceConfig) Config() []api.Service { - return c.store.MergedState().([]api.Service) +func (c *ServiceConfig) handleAddService(_ interface{}) { + c.dispatchUpdate() } -type serviceStore struct { - serviceLock sync.RWMutex - services map[string]map[types.NamespacedName]*api.Service - synced bool - updates chan<- struct{} +func (c *ServiceConfig) handleUpdateService(_, _ interface{}) { + c.dispatchUpdate() } -func (s *serviceStore) Merge(source string, change interface{}) error { - s.serviceLock.Lock() - services := s.services[source] - if services == nil { - services = make(map[types.NamespacedName]*api.Service) - } - update := change.(ServiceUpdate) - switch update.Op { - case ADD, UPDATE: - glog.V(5).Infof("Adding new service from source %s : %s", source, spew.Sdump(update.Service)) - name := types.NamespacedName{Namespace: update.Service.Namespace, Name: update.Service.Name} - services[name] = update.Service - case REMOVE: - glog.V(5).Infof("Removing a service %s", spew.Sdump(update.Service)) - name := types.NamespacedName{Namespace: update.Service.Namespace, Name: update.Service.Name} - delete(services, name) - case SYNCED: - s.synced = true +func (c *ServiceConfig) handleDeleteService(_ interface{}) { + c.dispatchUpdate() +} + +func (c *ServiceConfig) dispatchUpdate() { + select { + case c.updates <- struct{}{}: default: - glog.V(4).Infof("Received invalid update type: %s", spew.Sdump(update)) + glog.V(4).Infof("Service handler alread has a pending interrupt.") } - s.services[source] = services - synced := s.synced - s.serviceLock.Unlock() - if s.updates != nil && synced { - select { - case s.updates <- struct{}{}: - default: - glog.V(4).Infof("Service handler already has a pending interrupt.") - } - } - return nil -} - -func (s *serviceStore) MergedState() interface{} { - s.serviceLock.RLock() - defer s.serviceLock.RUnlock() - services := make([]api.Service, 0) - for _, sourceServices := range s.services { - for _, value := range sourceServices { - services = append(services, *value) - } - } - return services } // watchForUpdates invokes bcaster.Notify() with the latest version of an object diff --git a/pkg/proxy/config/config_test.go b/pkg/proxy/config/config_test.go index 9eaf672524b..9f852a67814 100644 --- a/pkg/proxy/config/config_test.go +++ b/pkg/proxy/config/config_test.go @@ -139,92 +139,89 @@ func CreateEndpointsUpdate(op Operation, endpoints *api.Endpoints) EndpointsUpda } func TestNewServiceAddedAndNotified(t *testing.T) { - config := NewServiceConfig() - config.store.synced = true - channel := config.Channel("one") + fakeWatch := watch.NewFake() + lw := fakeLW{ + listResp: &api.ServiceList{Items: []api.Service{}}, + watchResp: fakeWatch, + } + stopCh := make(chan struct{}) + defer close(stopCh) + + config := newServiceConfig(lw, time.Minute) handler := NewServiceHandlerMock() config.RegisterHandler(handler) - serviceUpdate := CreateServiceUpdate(ADD, &api.Service{ + go config.Run(stopCh) + + service := &api.Service{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}}, - }) - channel <- serviceUpdate - handler.ValidateServices(t, []api.Service{*serviceUpdate.Service}) + } + fakeWatch.Add(service) + handler.ValidateServices(t, []api.Service{*service}) } func TestServiceAddedRemovedSetAndNotified(t *testing.T) { - config := NewServiceConfig() - config.store.synced = true - channel := config.Channel("one") - handler := NewServiceHandlerMock() - config.RegisterHandler(handler) - serviceUpdate := CreateServiceUpdate(ADD, &api.Service{ - ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, - Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}}, - }) - channel <- serviceUpdate - handler.ValidateServices(t, []api.Service{*serviceUpdate.Service}) - - serviceUpdate2 := CreateServiceUpdate(ADD, &api.Service{ - ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, - Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}}, - }) - channel <- serviceUpdate2 - services := []api.Service{*serviceUpdate2.Service, *serviceUpdate.Service} - handler.ValidateServices(t, services) - - serviceUpdate3 := CreateServiceUpdate(REMOVE, &api.Service{ - ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, - }) - channel <- serviceUpdate3 - services = []api.Service{*serviceUpdate2.Service} - handler.ValidateServices(t, services) -} - -func TestNewMultipleSourcesServicesAddedAndNotified(t *testing.T) { - config := NewServiceConfig() - config.store.synced = true - channelOne := config.Channel("one") - channelTwo := config.Channel("two") - if channelOne == channelTwo { - t.Error("Same channel handed back for one and two") + fakeWatch := watch.NewFake() + lw := fakeLW{ + listResp: &api.ServiceList{Items: []api.Service{}}, + watchResp: fakeWatch, } + stopCh := make(chan struct{}) + defer close(stopCh) + + config := newServiceConfig(lw, time.Minute) handler := NewServiceHandlerMock() config.RegisterHandler(handler) - serviceUpdate1 := CreateServiceUpdate(ADD, &api.Service{ + go config.Run(stopCh) + + service1 := &api.Service{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}}, - }) - serviceUpdate2 := CreateServiceUpdate(ADD, &api.Service{ + } + fakeWatch.Add(service1) + handler.ValidateServices(t, []api.Service{*service1}) + + service2 := &api.Service{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}}, - }) - channelOne <- serviceUpdate1 - channelTwo <- serviceUpdate2 - services := []api.Service{*serviceUpdate2.Service, *serviceUpdate1.Service} + } + fakeWatch.Add(service2) + services := []api.Service{*service2, *service1} + handler.ValidateServices(t, services) + + fakeWatch.Delete(service1) + services = []api.Service{*service2} handler.ValidateServices(t, services) } -func TestNewMultipleSourcesServicesMultipleHandlersAddedAndNotified(t *testing.T) { - config := NewServiceConfig() - config.store.synced = true - channelOne := config.Channel("one") - channelTwo := config.Channel("two") +func TestNewServicesMultipleHandlersAddedAndNotified(t *testing.T) { + fakeWatch := watch.NewFake() + lw := fakeLW{ + listResp: &api.ServiceList{Items: []api.Service{}}, + watchResp: fakeWatch, + } + stopCh := make(chan struct{}) + defer close(stopCh) + + config := newServiceConfig(lw, time.Minute) handler := NewServiceHandlerMock() handler2 := NewServiceHandlerMock() config.RegisterHandler(handler) config.RegisterHandler(handler2) - serviceUpdate1 := CreateServiceUpdate(ADD, &api.Service{ + go config.Run(stopCh) + + service1 := &api.Service{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}}, - }) - serviceUpdate2 := CreateServiceUpdate(ADD, &api.Service{ + } + service2 := &api.Service{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}}, - }) - channelOne <- serviceUpdate1 - channelTwo <- serviceUpdate2 - services := []api.Service{*serviceUpdate2.Service, *serviceUpdate1.Service} + } + fakeWatch.Add(service1) + fakeWatch.Add(service2) + + services := []api.Service{*service2, *service1} handler.ValidateServices(t, services) handler2.ValidateServices(t, services) }