diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 4ac3a78b963..b06af3a3468 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -308,14 +308,14 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err serviceConfig := proxyconfig.NewServiceConfig() serviceConfig.RegisterHandler(proxier) - endpointsConfig := proxyconfig.NewEndpointsConfig() + endpointsConfig := proxyconfig.NewEndpointsConfig(client.Core().RESTClient(), config.ConfigSyncPeriod) endpointsConfig.RegisterHandler(endpointsHandler) + go endpointsConfig.Run(wait.NeverStop) proxyconfig.NewSourceAPI( client.Core().RESTClient(), config.ConfigSyncPeriod, serviceConfig.Channel("api"), - endpointsConfig.Channel("api"), ) config.NodeRef = &clientv1.ObjectReference{ diff --git a/cmd/kubemark/hollow-node.go b/cmd/kubemark/hollow-node.go index dce53e35912..6295707982e 100644 --- a/cmd/kubemark/hollow-node.go +++ b/cmd/kubemark/hollow-node.go @@ -18,6 +18,7 @@ package main import ( "fmt" + "time" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/util/flag" @@ -138,7 +139,7 @@ func main() { serviceConfig := proxyconfig.NewServiceConfig() serviceConfig.RegisterHandler(&kubemark.FakeProxyHandler{}) - endpointsConfig := proxyconfig.NewEndpointsConfig() + endpointsConfig := proxyconfig.NewEndpointsConfig(internalClientset.Core().RESTClient(), 15*time.Minute) endpointsConfig.RegisterHandler(&kubemark.FakeProxyHandler{}) eventClient, err := clientgoclientset.NewForConfig(clientConfig) diff --git a/pkg/kubemark/BUILD b/pkg/kubemark/BUILD index accdd89e766..6f9315a8d6f 100644 --- a/pkg/kubemark/BUILD +++ b/pkg/kubemark/BUILD @@ -41,6 +41,7 @@ go_library( "//vendor:github.com/golang/glog", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/types", + "//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/client-go/kubernetes/typed/core/v1", "//vendor:k8s.io/client-go/pkg/api/v1", "//vendor:k8s.io/client-go/tools/record", diff --git a/pkg/kubemark/hollow_proxy.go b/pkg/kubemark/hollow_proxy.go index 7de8cba4b51..5b0ddf9e66b 100644 --- a/pkg/kubemark/hollow_proxy.go +++ b/pkg/kubemark/hollow_proxy.go @@ -20,6 +20,7 @@ import ( "time" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" v1core "k8s.io/client-go/kubernetes/typed/core/v1" clientv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/record" @@ -71,11 +72,12 @@ func NewHollowProxyOrDie( UID: types.UID(nodeName), Namespace: "", } + + go endpointsConfig.Run(wait.NeverStop) proxyconfig.NewSourceAPI( client.Core().RESTClient(), 15*time.Minute, serviceConfig.Channel("api"), - endpointsConfig.Channel("api"), ) hollowProxy, err := proxyapp.NewProxyServer(client, eventClient, config, iptInterface, &FakeProxier{}, broadcaster, recorder, nil, "fake") diff --git a/pkg/proxy/config/BUILD b/pkg/proxy/config/BUILD index 56e86e24e9d..652099798c0 100644 --- a/pkg/proxy/config/BUILD +++ b/pkg/proxy/config/BUILD @@ -18,11 +18,13 @@ go_library( tags = ["automanaged"], deps = [ "//pkg/api:go_default_library", + "//pkg/client/listers/core/internalversion:go_default_library", "//pkg/util/config:go_default_library", "//vendor:github.com/davecgh/go-spew/spew", "//vendor:github.com/golang/glog", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/fields", + "//vendor:k8s.io/apimachinery/pkg/labels", "//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/wait", diff --git a/pkg/proxy/config/api.go b/pkg/proxy/config/api.go index 02aff427768..9c6d629e228 100644 --- a/pkg/proxy/config/api.go +++ b/pkg/proxy/config/api.go @@ -29,31 +29,24 @@ import ( ) // NewSourceAPI creates config source that watches for changes to the services and endpoints. -func NewSourceAPI(c cache.Getter, period time.Duration, servicesChan chan<- ServiceUpdate, endpointsChan chan<- EndpointsUpdate) { +func NewSourceAPI(c cache.Getter, period time.Duration, servicesChan chan<- ServiceUpdate) { servicesLW := cache.NewListWatchFromClient(c, "services", metav1.NamespaceAll, fields.Everything()) - endpointsLW := cache.NewListWatchFromClient(c, "endpoints", metav1.NamespaceAll, fields.Everything()) - newSourceAPI(servicesLW, endpointsLW, period, servicesChan, endpointsChan, wait.NeverStop) + newSourceAPI(servicesLW, period, servicesChan, wait.NeverStop) } func newSourceAPI( servicesLW cache.ListerWatcher, - endpointsLW cache.ListerWatcher, period time.Duration, servicesChan chan<- ServiceUpdate, - endpointsChan chan<- EndpointsUpdate, stopCh <-chan struct{}) { serviceController := NewServiceController(servicesLW, period, servicesChan) go serviceController.Run(stopCh) - endpointsController := NewEndpointsController(endpointsLW, period, endpointsChan) - go endpointsController.Run(stopCh) - - if !cache.WaitForCacheSync(stopCh, serviceController.HasSynced, endpointsController.HasSynced) { + if !cache.WaitForCacheSync(stopCh, serviceController.HasSynced) { utilruntime.HandleError(fmt.Errorf("source controllers not synced")) return } servicesChan <- ServiceUpdate{Op: SYNCED} - endpointsChan <- EndpointsUpdate{Op: SYNCED} } func sendAddService(servicesChan chan<- ServiceUpdate) func(obj interface{}) { @@ -99,49 +92,6 @@ func sendDeleteService(servicesChan chan<- ServiceUpdate) func(obj interface{}) } } -func sendAddEndpoints(endpointsChan chan<- EndpointsUpdate) func(obj interface{}) { - return func(obj interface{}) { - endpoints, ok := obj.(*api.Endpoints) - if !ok { - utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Endpoints: %v", obj)) - return - } - endpointsChan <- EndpointsUpdate{Op: ADD, Endpoints: endpoints} - } -} - -func sendUpdateEndpoints(endpointsChan chan<- EndpointsUpdate) func(oldObj, newObj interface{}) { - return func(_, newObj interface{}) { - endpoints, ok := newObj.(*api.Endpoints) - if !ok { - utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Endpoints: %v", newObj)) - return - } - endpointsChan <- EndpointsUpdate{Op: UPDATE, Endpoints: endpoints} - } -} - -func sendDeleteEndpoints(endpointsChan chan<- EndpointsUpdate) func(obj interface{}) { - return func(obj interface{}) { - var endpoints *api.Endpoints - switch t := obj.(type) { - case *api.Endpoints: - endpoints = t - case cache.DeletedFinalStateUnknown: - var ok bool - endpoints, ok = t.Obj.(*api.Endpoints) - if !ok { - utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Endpoints: %v", t.Obj)) - return - } - default: - utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Endpoints: %v", obj)) - return - } - endpointsChan <- EndpointsUpdate{Op: REMOVE, Endpoints: endpoints} - } -} - // NewServiceController creates a controller that is watching services and sending // updates into ServiceUpdate channel. func NewServiceController(lw cache.ListerWatcher, period time.Duration, ch chan<- ServiceUpdate) cache.Controller { @@ -157,19 +107,3 @@ func NewServiceController(lw cache.ListerWatcher, period time.Duration, ch chan< ) return serviceController } - -// NewEndpointsController creates a controller that is watching endpoints and sending -// updates into EndpointsUpdate channel. -func NewEndpointsController(lw cache.ListerWatcher, period time.Duration, ch chan<- EndpointsUpdate) cache.Controller { - _, endpointsController := cache.NewInformer( - lw, - &api.Endpoints{}, - period, - cache.ResourceEventHandlerFuncs{ - AddFunc: sendAddEndpoints(ch), - UpdateFunc: sendUpdateEndpoints(ch), - DeleteFunc: sendDeleteEndpoints(ch), - }, - ) - return endpointsController -} diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go index 4f5cf4c0169..b90e69a2091 100644 --- a/pkg/proxy/config/api_test.go +++ b/pkg/proxy/config/api_test.go @@ -166,68 +166,40 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) { watchResp: fakeWatch, } - ch := make(chan EndpointsUpdate) + stopCh := make(chan struct{}) + defer close(stopCh) - endpointsController := NewEndpointsController(lw, 30*time.Second, ch) - go endpointsController.Run(wait.NeverStop) + ch := make(chan struct{}) + handler := newEpsHandler(t, nil, func() { ch <- struct{}{} }) + + endpointsConfig := newEndpointsConfig(lw, time.Minute) + endpointsConfig.RegisterHandler(handler) + go endpointsConfig.Run(stopCh) // Add the first endpoints + handler.expected = []*api.Endpoints{endpoints1v1} fakeWatch.Add(endpoints1v1) - got, ok := <-ch - if !ok { - t.Errorf("Unable to read from channel when expected") - } - expected := EndpointsUpdate{Op: ADD, Endpoints: endpoints1v1} - if !apiequality.Semantic.DeepEqual(expected, got) { - t.Errorf("Expected %#v; Got %#v", expected, got) - } + <-ch // Add another endpoints + handler.expected = []*api.Endpoints{endpoints1v1, endpoints2} fakeWatch.Add(endpoints2) - got, ok = <-ch - if !ok { - t.Errorf("Unable to read from channel when expected") - } - // Could be sorted either of these two ways: - expected = EndpointsUpdate{Op: ADD, Endpoints: endpoints2} - - if !apiequality.Semantic.DeepEqual(expected, got) { - t.Errorf("Expected %#v, Got %#v", expected, got) - } + <-ch // Modify endpoints1 + handler.expected = []*api.Endpoints{endpoints1v2, endpoints2} fakeWatch.Modify(endpoints1v2) - got, ok = <-ch - if !ok { - t.Errorf("Unable to read from channel when expected") - } - expected = EndpointsUpdate{Op: UPDATE, Endpoints: endpoints1v2} - - if !apiequality.Semantic.DeepEqual(expected, got) { - t.Errorf("Expected %#v, Got %#v", expected, got) - } + <-ch // Delete endpoints1 + handler.expected = []*api.Endpoints{endpoints2} fakeWatch.Delete(endpoints1v2) - got, ok = <-ch - if !ok { - t.Errorf("Unable to read from channel when expected") - } - expected = EndpointsUpdate{Op: REMOVE, Endpoints: endpoints1v2} - if !apiequality.Semantic.DeepEqual(expected, got) { - t.Errorf("Expected %#v, Got %#v", expected, got) - } + <-ch // Delete endpoints2 + handler.expected = []*api.Endpoints{} fakeWatch.Delete(endpoints2) - got, ok = <-ch - if !ok { - t.Errorf("Unable to read from channel when expected") - } - expected = EndpointsUpdate{Op: REMOVE, Endpoints: endpoints2} - if !apiequality.Semantic.DeepEqual(expected, got) { - t.Errorf("Expected %#v, Got %#v", expected, got) - } + <-ch } type svcHandler struct { @@ -286,13 +258,6 @@ func TestInitialSync(t *testing.T) { // Wait for both services and endpoints handler. wg.Add(2) - svcConfig := NewServiceConfig() - epsConfig := NewEndpointsConfig() - svcHandler := newSvcHandler(t, []api.Service{*svc2, *svc1}, wg.Done) - svcConfig.RegisterHandler(svcHandler) - epsHandler := newEpsHandler(t, []*api.Endpoints{eps2, eps1}, wg.Done) - epsConfig.RegisterHandler(epsHandler) - // Setup fake api client. fakeSvcWatch := watch.NewFake() svcLW := fakeLW{ @@ -305,8 +270,16 @@ func TestInitialSync(t *testing.T) { watchResp: fakeEpsWatch, } + svcConfig := NewServiceConfig() + epsConfig := newEndpointsConfig(epsLW, time.Minute) + svcHandler := newSvcHandler(t, []api.Service{*svc2, *svc1}, wg.Done) + svcConfig.RegisterHandler(svcHandler) + epsHandler := newEpsHandler(t, []*api.Endpoints{eps2, eps1}, wg.Done) + epsConfig.RegisterHandler(epsHandler) + stopCh := make(chan struct{}) defer close(stopCh) - newSourceAPI(svcLW, epsLW, time.Minute, svcConfig.Channel("one"), epsConfig.Channel("two"), stopCh) + go epsConfig.Run(stopCh) + newSourceAPI(svcLW, time.Minute, svcConfig.Channel("one"), stopCh) wg.Wait() } diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 2487f579513..6cb454a92d1 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -17,12 +17,20 @@ limitations under the License. package config import ( + "fmt" "sync" + "time" "github.com/davecgh/go-spew/spew" "github.com/golang/glog" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api" + listers "k8s.io/kubernetes/pkg/client/listers/core/internalversion" "k8s.io/kubernetes/pkg/util/config" ) @@ -76,102 +84,100 @@ type EndpointsConfigHandler interface { // EndpointsConfig tracks a set of endpoints configurations. // It accepts "set", "add" and "remove" operations of endpoints via channels, and invokes registered handlers on change. type EndpointsConfig struct { - mux *config.Mux - bcaster *config.Broadcaster - store *endpointsStore + informer cache.Controller + lister listers.EndpointsLister + handlers []EndpointsConfigHandler + // updates channel is used to trigger registered handlers. + updates chan struct{} } // NewEndpointsConfig creates a new EndpointsConfig. -// It immediately runs the created EndpointsConfig. -func NewEndpointsConfig() *EndpointsConfig { - // The updates channel is used to send interrupts to the Endpoints handler. - // It's buffered because we never want to block for as long as there is a - // pending interrupt, but don't want to drop them if the handler is doing - // work. - updates := make(chan struct{}, 1) - store := &endpointsStore{updates: updates, endpoints: make(map[string]map[types.NamespacedName]*api.Endpoints)} - mux := config.NewMux(store) - bcaster := config.NewBroadcaster() - go watchForUpdates(bcaster, store, updates) - return &EndpointsConfig{mux, bcaster, store} +func NewEndpointsConfig(c cache.Getter, period time.Duration) *EndpointsConfig { + endpointsLW := cache.NewListWatchFromClient(c, "endpoints", metav1.NamespaceAll, fields.Everything()) + return newEndpointsConfig(endpointsLW, period) +} + +func newEndpointsConfig(lw cache.ListerWatcher, period time.Duration) *EndpointsConfig { + result := &EndpointsConfig{} + + store, informer := cache.NewIndexerInformer( + lw, + &api.Endpoints{}, + period, + cache.ResourceEventHandlerFuncs{ + AddFunc: result.handleAddEndpoints, + UpdateFunc: result.handleUpdateEndpoints, + DeleteFunc: result.handleDeleteEndpoints, + }, + cache.Indexers{}, + ) + result.informer = informer + result.lister = listers.NewEndpointsLister(store) + return result } // RegisterHandler registers a handler which is called on every endpoints change. func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) { - c.bcaster.Add(config.ListenerFunc(func(instance interface{}) { - glog.V(3).Infof("Calling handler.OnEndpointsUpdate()") - handler.OnEndpointsUpdate(instance.([]*api.Endpoints)) - })) + c.handlers = append(c.handlers, handler) } -// Channel returns a channel to which endpoints updates should be delivered. -func (c *EndpointsConfig) Channel(source string) chan EndpointsUpdate { - ch := c.mux.Channel(source) - endpointsCh := make(chan EndpointsUpdate) +func (c *EndpointsConfig) Run(stopCh <-chan struct{}) { + // The updates channel is used to send interrupts to the Endpoints handler. + // It's buffered because we never want to block for as long as there is a + // pending interrupt, but don't want to drop them if the handler is doing + // work. + c.updates = make(chan struct{}, 1) + go c.informer.Run(stopCh) + if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) { + utilruntime.HandleError(fmt.Errorf("endpoint controller not synced")) + return + } + + // We have synced informers. Now we can start delivering updates + // to the registered handler. go func() { - for update := range endpointsCh { - ch <- update + for range c.updates { + endpoints, err := c.lister.List(labels.Everything()) + if err != nil { + glog.Errorf("Error while listing endpoints from cache: %v", err) + // This will cause a retry (if there isn't any other trigger in-flight). + c.dispatchUpdate() + continue + } + if endpoints == nil { + endpoints = []*api.Endpoints{} + } + for i := range c.handlers { + glog.V(3).Infof("Calling handler.OnEndpointsUpdate()") + c.handlers[i].OnEndpointsUpdate(endpoints) + } } }() - return endpointsCh + // Close updates channel when stopCh is closed. + go func() { + <-stopCh + close(c.updates) + }() } -// Config returns list of all endpoints from underlying store. -func (c *EndpointsConfig) Config() []api.Endpoints { - return c.store.MergedState().([]api.Endpoints) +func (c *EndpointsConfig) handleAddEndpoints(_ interface{}) { + c.dispatchUpdate() } -type endpointsStore struct { - endpointLock sync.RWMutex - endpoints map[string]map[types.NamespacedName]*api.Endpoints - synced bool - updates chan<- struct{} +func (c *EndpointsConfig) handleUpdateEndpoints(_, _ interface{}) { + c.dispatchUpdate() } -func (s *endpointsStore) Merge(source string, change interface{}) error { - s.endpointLock.Lock() - endpoints := s.endpoints[source] - if endpoints == nil { - endpoints = make(map[types.NamespacedName]*api.Endpoints) - } - update := change.(EndpointsUpdate) - switch update.Op { - case ADD, UPDATE: - glog.V(5).Infof("Adding new endpoint from source %s : %s", source, spew.Sdump(update.Endpoints)) - name := types.NamespacedName{Namespace: update.Endpoints.Namespace, Name: update.Endpoints.Name} - endpoints[name] = update.Endpoints - case REMOVE: - glog.V(5).Infof("Removing an endpoint %s", spew.Sdump(update.Endpoints)) - name := types.NamespacedName{Namespace: update.Endpoints.Namespace, Name: update.Endpoints.Name} - delete(endpoints, name) - case SYNCED: - s.synced = true +func (c *EndpointsConfig) handleDeleteEndpoints(_ interface{}) { + c.dispatchUpdate() +} + +func (c *EndpointsConfig) dispatchUpdate() { + select { + case c.updates <- struct{}{}: default: - glog.V(4).Infof("Received invalid update type: %s", spew.Sdump(update)) + glog.V(4).Infof("Endpoints handler already has a pending interrupt.") } - s.endpoints[source] = endpoints - synced := s.synced - s.endpointLock.Unlock() - if s.updates != nil && synced { - select { - case s.updates <- struct{}{}: - default: - glog.V(4).Infof("Endpoints handler already has a pending interrupt.") - } - } - return nil -} - -func (s *endpointsStore) MergedState() interface{} { - s.endpointLock.RLock() - defer s.endpointLock.RUnlock() - endpoints := make([]*api.Endpoints, 0) - for _, sourceEndpoints := range s.endpoints { - for _, value := range sourceEndpoints { - endpoints = append(endpoints, value) - } - } - return endpoints } // ServiceConfig tracks a set of service configurations. diff --git a/pkg/proxy/config/config_test.go b/pkg/proxy/config/config_test.go index 6be9c378554..9eaf672524b 100644 --- a/pkg/proxy/config/config_test.go +++ b/pkg/proxy/config/config_test.go @@ -24,6 +24,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" "k8s.io/kubernetes/pkg/api" ) @@ -228,98 +229,110 @@ func TestNewMultipleSourcesServicesMultipleHandlersAddedAndNotified(t *testing.T handler2.ValidateServices(t, services) } -func TestNewMultipleSourcesEndpointsMultipleHandlersAddedAndNotified(t *testing.T) { - config := NewEndpointsConfig() - config.store.synced = true - channelOne := config.Channel("one") - channelTwo := config.Channel("two") +func TestNewEndpointsMultipleHandlersAddedAndNotified(t *testing.T) { + fakeWatch := watch.NewFake() + lw := fakeLW{ + listResp: &api.EndpointsList{Items: []api.Endpoints{}}, + watchResp: fakeWatch, + } + stopCh := make(chan struct{}) + defer close(stopCh) + + config := newEndpointsConfig(lw, time.Minute) handler := NewEndpointsHandlerMock() handler2 := NewEndpointsHandlerMock() config.RegisterHandler(handler) config.RegisterHandler(handler2) - endpointsUpdate1 := CreateEndpointsUpdate(ADD, &api.Endpoints{ + go config.Run(stopCh) + + endpoints1 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "1.1.1.1"}, {IP: "2.2.2.2"}}, Ports: []api.EndpointPort{{Port: 80}}, }}, - }) - endpointsUpdate2 := CreateEndpointsUpdate(ADD, &api.Endpoints{ + } + endpoints2 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "3.3.3.3"}, {IP: "4.4.4.4"}}, Ports: []api.EndpointPort{{Port: 80}}, }}, - }) - channelOne <- endpointsUpdate1 - channelTwo <- endpointsUpdate2 + } + fakeWatch.Add(endpoints1) + fakeWatch.Add(endpoints2) - endpoints := []*api.Endpoints{endpointsUpdate2.Endpoints, endpointsUpdate1.Endpoints} + endpoints := []*api.Endpoints{endpoints2, endpoints1} handler.ValidateEndpoints(t, endpoints) handler2.ValidateEndpoints(t, endpoints) } -func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) { - config := NewEndpointsConfig() - config.store.synced = true - channelOne := config.Channel("one") - channelTwo := config.Channel("two") +func TestNewEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) { + fakeWatch := watch.NewFake() + lw := fakeLW{ + listResp: &api.EndpointsList{Items: []api.Endpoints{}}, + watchResp: fakeWatch, + } + stopCh := make(chan struct{}) + defer close(stopCh) + + config := newEndpointsConfig(lw, time.Minute) handler := NewEndpointsHandlerMock() handler2 := NewEndpointsHandlerMock() config.RegisterHandler(handler) config.RegisterHandler(handler2) - endpointsUpdate1 := CreateEndpointsUpdate(ADD, &api.Endpoints{ + go config.Run(stopCh) + + endpoints1 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "1.1.1.1"}, {IP: "2.2.2.2"}}, Ports: []api.EndpointPort{{Port: 80}}, }}, - }) - endpointsUpdate2 := CreateEndpointsUpdate(ADD, &api.Endpoints{ + } + endpoints2 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "3.3.3.3"}, {IP: "4.4.4.4"}}, Ports: []api.EndpointPort{{Port: 80}}, }}, - }) - channelOne <- endpointsUpdate1 - channelTwo <- endpointsUpdate2 + } + fakeWatch.Add(endpoints1) + fakeWatch.Add(endpoints2) - endpoints := []*api.Endpoints{endpointsUpdate2.Endpoints, endpointsUpdate1.Endpoints} + endpoints := []*api.Endpoints{endpoints2, endpoints1} handler.ValidateEndpoints(t, endpoints) handler2.ValidateEndpoints(t, endpoints) // Add one more - endpointsUpdate3 := CreateEndpointsUpdate(ADD, &api.Endpoints{ + endpoints3 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foobar"}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "5.5.5.5"}, {IP: "6.6.6.6"}}, Ports: []api.EndpointPort{{Port: 80}}, }}, - }) - channelTwo <- endpointsUpdate3 - endpoints = []*api.Endpoints{endpointsUpdate2.Endpoints, endpointsUpdate1.Endpoints, endpointsUpdate3.Endpoints} + } + fakeWatch.Add(endpoints3) + endpoints = []*api.Endpoints{endpoints2, endpoints1, endpoints3} handler.ValidateEndpoints(t, endpoints) handler2.ValidateEndpoints(t, endpoints) // Update the "foo" service with new endpoints - endpointsUpdate1 = CreateEndpointsUpdate(ADD, &api.Endpoints{ + endpoints1v2 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "7.7.7.7"}}, Ports: []api.EndpointPort{{Port: 80}}, }}, - }) - channelOne <- endpointsUpdate1 - endpoints = []*api.Endpoints{endpointsUpdate2.Endpoints, endpointsUpdate1.Endpoints, endpointsUpdate3.Endpoints} + } + fakeWatch.Modify(endpoints1v2) + endpoints = []*api.Endpoints{endpoints2, endpoints1v2, endpoints3} handler.ValidateEndpoints(t, endpoints) handler2.ValidateEndpoints(t, endpoints) - // Remove "bar" service - endpointsUpdate2 = CreateEndpointsUpdate(REMOVE, &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}}) - channelTwo <- endpointsUpdate2 - - endpoints = []*api.Endpoints{endpointsUpdate1.Endpoints, endpointsUpdate3.Endpoints} + // Remove "bar" endpoints + fakeWatch.Delete(endpoints2) + endpoints = []*api.Endpoints{endpoints1v2, endpoints3} handler.ValidateEndpoints(t, endpoints) handler2.ValidateEndpoints(t, endpoints) }