diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 4ac3a78b963..6272b23d88d 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -305,18 +305,13 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err // Note: RegisterHandler() calls need to happen before creation of Sources because sources // only notify on changes, and the initial update (on process start) may be lost if no handlers // are registered yet. - serviceConfig := proxyconfig.NewServiceConfig() + serviceConfig := proxyconfig.NewServiceConfig(client.Core().RESTClient(), config.ConfigSyncPeriod) serviceConfig.RegisterHandler(proxier) + go serviceConfig.Run(wait.NeverStop) - endpointsConfig := proxyconfig.NewEndpointsConfig() + endpointsConfig := proxyconfig.NewEndpointsConfig(client.Core().RESTClient(), config.ConfigSyncPeriod) endpointsConfig.RegisterHandler(endpointsHandler) - - proxyconfig.NewSourceAPI( - client.Core().RESTClient(), - config.ConfigSyncPeriod, - serviceConfig.Channel("api"), - endpointsConfig.Channel("api"), - ) + go endpointsConfig.Run(wait.NeverStop) config.NodeRef = &clientv1.ObjectReference{ Kind: "Node", diff --git a/cmd/kubemark/hollow-node.go b/cmd/kubemark/hollow-node.go index dce53e35912..d55fe31a592 100644 --- a/cmd/kubemark/hollow-node.go +++ b/cmd/kubemark/hollow-node.go @@ -18,6 +18,7 @@ package main import ( "fmt" + "time" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/util/flag" @@ -135,10 +136,10 @@ func main() { iptInterface := fakeiptables.NewFake() - serviceConfig := proxyconfig.NewServiceConfig() + serviceConfig := proxyconfig.NewServiceConfig(internalClientset.Core().RESTClient(), 15*time.Minute) serviceConfig.RegisterHandler(&kubemark.FakeProxyHandler{}) - endpointsConfig := proxyconfig.NewEndpointsConfig() + endpointsConfig := proxyconfig.NewEndpointsConfig(internalClientset.Core().RESTClient(), 15*time.Minute) endpointsConfig.RegisterHandler(&kubemark.FakeProxyHandler{}) eventClient, err := clientgoclientset.NewForConfig(clientConfig) diff --git a/pkg/kubemark/BUILD b/pkg/kubemark/BUILD index accdd89e766..6f9315a8d6f 100644 --- a/pkg/kubemark/BUILD +++ b/pkg/kubemark/BUILD @@ -41,6 +41,7 @@ go_library( "//vendor:github.com/golang/glog", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/types", + "//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/client-go/kubernetes/typed/core/v1", "//vendor:k8s.io/client-go/pkg/api/v1", "//vendor:k8s.io/client-go/tools/record", diff --git a/pkg/kubemark/hollow_proxy.go b/pkg/kubemark/hollow_proxy.go index 7de8cba4b51..6cee72f2b40 100644 --- a/pkg/kubemark/hollow_proxy.go +++ b/pkg/kubemark/hollow_proxy.go @@ -17,9 +17,8 @@ limitations under the License. package kubemark import ( - "time" - "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" v1core "k8s.io/client-go/kubernetes/typed/core/v1" clientv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/record" @@ -71,12 +70,9 @@ func NewHollowProxyOrDie( UID: types.UID(nodeName), Namespace: "", } - proxyconfig.NewSourceAPI( - client.Core().RESTClient(), - 15*time.Minute, - serviceConfig.Channel("api"), - endpointsConfig.Channel("api"), - ) + + go endpointsConfig.Run(wait.NeverStop) + go serviceConfig.Run(wait.NeverStop) hollowProxy, err := proxyapp.NewProxyServer(client, eventClient, config, iptInterface, &FakeProxier{}, broadcaster, recorder, nil, "fake") if err != nil { diff --git a/pkg/proxy/config/BUILD b/pkg/proxy/config/BUILD index 56e86e24e9d..52be6161d04 100644 --- a/pkg/proxy/config/BUILD +++ b/pkg/proxy/config/BUILD @@ -11,21 +11,19 @@ load( go_library( name = "go_default_library", srcs = [ - "api.go", "config.go", "doc.go", ], tags = ["automanaged"], deps = [ "//pkg/api:go_default_library", + "//pkg/client/listers/core/internalversion:go_default_library", "//pkg/util/config:go_default_library", - "//vendor:github.com/davecgh/go-spew/spew", "//vendor:github.com/golang/glog", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/fields", - "//vendor:k8s.io/apimachinery/pkg/types", + "//vendor:k8s.io/apimachinery/pkg/labels", "//vendor:k8s.io/apimachinery/pkg/util/runtime", - "//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/client-go/tools/cache", ], ) @@ -40,7 +38,6 @@ go_test( tags = ["automanaged"], deps = [ "//pkg/api:go_default_library", - "//vendor:k8s.io/apimachinery/pkg/api/equality", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/util/wait", diff --git a/pkg/proxy/config/api.go b/pkg/proxy/config/api.go deleted file mode 100644 index 02aff427768..00000000000 --- a/pkg/proxy/config/api.go +++ /dev/null @@ -1,175 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package config - -import ( - "fmt" - "time" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/tools/cache" - "k8s.io/kubernetes/pkg/api" -) - -// NewSourceAPI creates config source that watches for changes to the services and endpoints. -func NewSourceAPI(c cache.Getter, period time.Duration, servicesChan chan<- ServiceUpdate, endpointsChan chan<- EndpointsUpdate) { - servicesLW := cache.NewListWatchFromClient(c, "services", metav1.NamespaceAll, fields.Everything()) - endpointsLW := cache.NewListWatchFromClient(c, "endpoints", metav1.NamespaceAll, fields.Everything()) - newSourceAPI(servicesLW, endpointsLW, period, servicesChan, endpointsChan, wait.NeverStop) -} - -func newSourceAPI( - servicesLW cache.ListerWatcher, - endpointsLW cache.ListerWatcher, - period time.Duration, - servicesChan chan<- ServiceUpdate, - endpointsChan chan<- EndpointsUpdate, - stopCh <-chan struct{}) { - serviceController := NewServiceController(servicesLW, period, servicesChan) - go serviceController.Run(stopCh) - - endpointsController := NewEndpointsController(endpointsLW, period, endpointsChan) - go endpointsController.Run(stopCh) - - if !cache.WaitForCacheSync(stopCh, serviceController.HasSynced, endpointsController.HasSynced) { - utilruntime.HandleError(fmt.Errorf("source controllers not synced")) - return - } - servicesChan <- ServiceUpdate{Op: SYNCED} - endpointsChan <- EndpointsUpdate{Op: SYNCED} -} - -func sendAddService(servicesChan chan<- ServiceUpdate) func(obj interface{}) { - return func(obj interface{}) { - service, ok := obj.(*api.Service) - if !ok { - utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Service: %v", obj)) - return - } - servicesChan <- ServiceUpdate{Op: ADD, Service: service} - } -} - -func sendUpdateService(servicesChan chan<- ServiceUpdate) func(oldObj, newObj interface{}) { - return func(_, newObj interface{}) { - service, ok := newObj.(*api.Service) - if !ok { - utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Service: %v", newObj)) - return - } - servicesChan <- ServiceUpdate{Op: UPDATE, Service: service} - } -} - -func sendDeleteService(servicesChan chan<- ServiceUpdate) func(obj interface{}) { - return func(obj interface{}) { - var service *api.Service - switch t := obj.(type) { - case *api.Service: - service = t - case cache.DeletedFinalStateUnknown: - var ok bool - service, ok = t.Obj.(*api.Service) - if !ok { - utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Service: %v", t.Obj)) - return - } - default: - utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Service: %v", t)) - return - } - servicesChan <- ServiceUpdate{Op: REMOVE, Service: service} - } -} - -func sendAddEndpoints(endpointsChan chan<- EndpointsUpdate) func(obj interface{}) { - return func(obj interface{}) { - endpoints, ok := obj.(*api.Endpoints) - if !ok { - utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Endpoints: %v", obj)) - return - } - endpointsChan <- EndpointsUpdate{Op: ADD, Endpoints: endpoints} - } -} - -func sendUpdateEndpoints(endpointsChan chan<- EndpointsUpdate) func(oldObj, newObj interface{}) { - return func(_, newObj interface{}) { - endpoints, ok := newObj.(*api.Endpoints) - if !ok { - utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Endpoints: %v", newObj)) - return - } - endpointsChan <- EndpointsUpdate{Op: UPDATE, Endpoints: endpoints} - } -} - -func sendDeleteEndpoints(endpointsChan chan<- EndpointsUpdate) func(obj interface{}) { - return func(obj interface{}) { - var endpoints *api.Endpoints - switch t := obj.(type) { - case *api.Endpoints: - endpoints = t - case cache.DeletedFinalStateUnknown: - var ok bool - endpoints, ok = t.Obj.(*api.Endpoints) - if !ok { - utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Endpoints: %v", t.Obj)) - return - } - default: - utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Endpoints: %v", obj)) - return - } - endpointsChan <- EndpointsUpdate{Op: REMOVE, Endpoints: endpoints} - } -} - -// NewServiceController creates a controller that is watching services and sending -// updates into ServiceUpdate channel. -func NewServiceController(lw cache.ListerWatcher, period time.Duration, ch chan<- ServiceUpdate) cache.Controller { - _, serviceController := cache.NewInformer( - lw, - &api.Service{}, - period, - cache.ResourceEventHandlerFuncs{ - AddFunc: sendAddService(ch), - UpdateFunc: sendUpdateService(ch), - DeleteFunc: sendDeleteService(ch), - }, - ) - return serviceController -} - -// NewEndpointsController creates a controller that is watching endpoints and sending -// updates into EndpointsUpdate channel. -func NewEndpointsController(lw cache.ListerWatcher, period time.Duration, ch chan<- EndpointsUpdate) cache.Controller { - _, endpointsController := cache.NewInformer( - lw, - &api.Endpoints{}, - period, - cache.ResourceEventHandlerFuncs{ - AddFunc: sendAddEndpoints(ch), - UpdateFunc: sendUpdateEndpoints(ch), - DeleteFunc: sendDeleteEndpoints(ch), - }, - ) - return endpointsController -} diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go index 4f5cf4c0169..bc55748b9cf 100644 --- a/pkg/proxy/config/api_test.go +++ b/pkg/proxy/config/api_test.go @@ -23,10 +23,8 @@ import ( "testing" "time" - apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api" @@ -65,68 +63,40 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) { watchResp: fakeWatch, } - ch := make(chan ServiceUpdate) + stopCh := make(chan struct{}) + defer close(stopCh) - serviceController := NewServiceController(lw, 30*time.Second, ch) - go serviceController.Run(wait.NeverStop) + ch := make(chan struct{}) + handler := newSvcHandler(t, nil, func() { ch <- struct{}{} }) + + serviceConfig := newServiceConfig(lw, time.Minute) + serviceConfig.RegisterHandler(handler) + go serviceConfig.Run(stopCh) // Add the first service + handler.expected = []api.Service{*service1v1} fakeWatch.Add(service1v1) - got, ok := <-ch - if !ok { - t.Errorf("Unable to read from channel when expected") - } - expected := ServiceUpdate{Op: ADD, Service: service1v1} - if !apiequality.Semantic.DeepEqual(expected, got) { - t.Errorf("Expected %#v; Got %#v", expected, got) - } + <-ch // Add another service + handler.expected = []api.Service{*service1v1, *service2} fakeWatch.Add(service2) - got, ok = <-ch - if !ok { - t.Errorf("Unable to read from channel when expected") - } - // Could be sorted either of these two ways: - expected = ServiceUpdate{Op: ADD, Service: service2} - - if !apiequality.Semantic.DeepEqual(expected, got) { - t.Errorf("Expected %#v, Got %#v", expected, got) - } + <-ch // Modify service1 + handler.expected = []api.Service{*service1v2, *service2} fakeWatch.Modify(service1v2) - got, ok = <-ch - if !ok { - t.Errorf("Unable to read from channel when expected") - } - expected = ServiceUpdate{Op: UPDATE, Service: service1v2} - - if !apiequality.Semantic.DeepEqual(expected, got) { - t.Errorf("Expected %#v, Got %#v", expected, got) - } + <-ch // Delete service1 + handler.expected = []api.Service{*service2} fakeWatch.Delete(service1v2) - got, ok = <-ch - if !ok { - t.Errorf("Unable to read from channel when expected") - } - expected = ServiceUpdate{Op: REMOVE, Service: service1v2} - if !apiequality.Semantic.DeepEqual(expected, got) { - t.Errorf("Expected %#v, Got %#v", expected, got) - } + <-ch // Delete service2 + handler.expected = []api.Service{} fakeWatch.Delete(service2) - got, ok = <-ch - if !ok { - t.Errorf("Unable to read from channel when expected") - } - expected = ServiceUpdate{Op: REMOVE, Service: service2} - if !apiequality.Semantic.DeepEqual(expected, got) { - t.Errorf("Expected %#v, Got %#v", expected, got) - } + <-ch } func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) { @@ -166,68 +136,40 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) { watchResp: fakeWatch, } - ch := make(chan EndpointsUpdate) + stopCh := make(chan struct{}) + defer close(stopCh) - endpointsController := NewEndpointsController(lw, 30*time.Second, ch) - go endpointsController.Run(wait.NeverStop) + ch := make(chan struct{}) + handler := newEpsHandler(t, nil, func() { ch <- struct{}{} }) + + endpointsConfig := newEndpointsConfig(lw, time.Minute) + endpointsConfig.RegisterHandler(handler) + go endpointsConfig.Run(stopCh) // Add the first endpoints + handler.expected = []*api.Endpoints{endpoints1v1} fakeWatch.Add(endpoints1v1) - got, ok := <-ch - if !ok { - t.Errorf("Unable to read from channel when expected") - } - expected := EndpointsUpdate{Op: ADD, Endpoints: endpoints1v1} - if !apiequality.Semantic.DeepEqual(expected, got) { - t.Errorf("Expected %#v; Got %#v", expected, got) - } + <-ch // Add another endpoints + handler.expected = []*api.Endpoints{endpoints1v1, endpoints2} fakeWatch.Add(endpoints2) - got, ok = <-ch - if !ok { - t.Errorf("Unable to read from channel when expected") - } - // Could be sorted either of these two ways: - expected = EndpointsUpdate{Op: ADD, Endpoints: endpoints2} - - if !apiequality.Semantic.DeepEqual(expected, got) { - t.Errorf("Expected %#v, Got %#v", expected, got) - } + <-ch // Modify endpoints1 + handler.expected = []*api.Endpoints{endpoints1v2, endpoints2} fakeWatch.Modify(endpoints1v2) - got, ok = <-ch - if !ok { - t.Errorf("Unable to read from channel when expected") - } - expected = EndpointsUpdate{Op: UPDATE, Endpoints: endpoints1v2} - - if !apiequality.Semantic.DeepEqual(expected, got) { - t.Errorf("Expected %#v, Got %#v", expected, got) - } + <-ch // Delete endpoints1 + handler.expected = []*api.Endpoints{endpoints2} fakeWatch.Delete(endpoints1v2) - got, ok = <-ch - if !ok { - t.Errorf("Unable to read from channel when expected") - } - expected = EndpointsUpdate{Op: REMOVE, Endpoints: endpoints1v2} - if !apiequality.Semantic.DeepEqual(expected, got) { - t.Errorf("Expected %#v, Got %#v", expected, got) - } + <-ch // Delete endpoints2 + handler.expected = []*api.Endpoints{} fakeWatch.Delete(endpoints2) - got, ok = <-ch - if !ok { - t.Errorf("Unable to read from channel when expected") - } - expected = EndpointsUpdate{Op: REMOVE, Endpoints: endpoints2} - if !apiequality.Semantic.DeepEqual(expected, got) { - t.Errorf("Expected %#v, Got %#v", expected, got) - } + <-ch } type svcHandler struct { @@ -286,13 +228,6 @@ func TestInitialSync(t *testing.T) { // Wait for both services and endpoints handler. wg.Add(2) - svcConfig := NewServiceConfig() - epsConfig := NewEndpointsConfig() - svcHandler := newSvcHandler(t, []api.Service{*svc2, *svc1}, wg.Done) - svcConfig.RegisterHandler(svcHandler) - epsHandler := newEpsHandler(t, []*api.Endpoints{eps2, eps1}, wg.Done) - epsConfig.RegisterHandler(epsHandler) - // Setup fake api client. fakeSvcWatch := watch.NewFake() svcLW := fakeLW{ @@ -305,8 +240,16 @@ func TestInitialSync(t *testing.T) { watchResp: fakeEpsWatch, } + svcConfig := newServiceConfig(svcLW, time.Minute) + epsConfig := newEndpointsConfig(epsLW, time.Minute) + svcHandler := newSvcHandler(t, []api.Service{*svc2, *svc1}, wg.Done) + svcConfig.RegisterHandler(svcHandler) + epsHandler := newEpsHandler(t, []*api.Endpoints{eps2, eps1}, wg.Done) + epsConfig.RegisterHandler(epsHandler) + stopCh := make(chan struct{}) defer close(stopCh) - newSourceAPI(svcLW, epsLW, time.Minute, svcConfig.Channel("one"), epsConfig.Channel("two"), stopCh) + go svcConfig.Run(stopCh) + go epsConfig.Run(stopCh) wg.Wait() } diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 2487f579513..bb558316f73 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -17,40 +17,20 @@ limitations under the License. package config import ( - "sync" + "fmt" + "time" - "github.com/davecgh/go-spew/spew" "github.com/golang/glog" - "k8s.io/apimachinery/pkg/types" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api" + listers "k8s.io/kubernetes/pkg/client/listers/core/internalversion" "k8s.io/kubernetes/pkg/util/config" ) -// Operation is a type of operation of services or endpoints. -type Operation int - -// These are the available operation types. -const ( - ADD Operation = iota - UPDATE - REMOVE - SYNCED -) - -// ServiceUpdate describes an operation of services, sent on the channel. -// You can add, update or remove single service by setting Op == ADD|UPDATE|REMOVE. -type ServiceUpdate struct { - Service *api.Service - Op Operation -} - -// EndpointsUpdate describes an operation of endpoints, sent on the channel. -// You can add, update or remove single endpoints by setting Op == ADD|UPDATE|REMOVE. -type EndpointsUpdate struct { - Endpoints *api.Endpoints - Op Operation -} - // ServiceConfigHandler is an abstract interface of objects which receive update notifications for the set of services. type ServiceConfigHandler interface { // OnServiceUpdate gets called when a configuration has been changed by one of the sources. @@ -76,203 +56,204 @@ type EndpointsConfigHandler interface { // EndpointsConfig tracks a set of endpoints configurations. // It accepts "set", "add" and "remove" operations of endpoints via channels, and invokes registered handlers on change. type EndpointsConfig struct { - mux *config.Mux - bcaster *config.Broadcaster - store *endpointsStore + informer cache.Controller + lister listers.EndpointsLister + handlers []EndpointsConfigHandler + // updates channel is used to trigger registered handlers. + updates chan struct{} } // NewEndpointsConfig creates a new EndpointsConfig. -// It immediately runs the created EndpointsConfig. -func NewEndpointsConfig() *EndpointsConfig { - // The updates channel is used to send interrupts to the Endpoints handler. - // It's buffered because we never want to block for as long as there is a - // pending interrupt, but don't want to drop them if the handler is doing - // work. - updates := make(chan struct{}, 1) - store := &endpointsStore{updates: updates, endpoints: make(map[string]map[types.NamespacedName]*api.Endpoints)} - mux := config.NewMux(store) - bcaster := config.NewBroadcaster() - go watchForUpdates(bcaster, store, updates) - return &EndpointsConfig{mux, bcaster, store} +func NewEndpointsConfig(c cache.Getter, period time.Duration) *EndpointsConfig { + endpointsLW := cache.NewListWatchFromClient(c, "endpoints", metav1.NamespaceAll, fields.Everything()) + return newEndpointsConfig(endpointsLW, period) +} + +func newEndpointsConfig(lw cache.ListerWatcher, period time.Duration) *EndpointsConfig { + result := &EndpointsConfig{} + + store, informer := cache.NewIndexerInformer( + lw, + &api.Endpoints{}, + period, + cache.ResourceEventHandlerFuncs{ + AddFunc: result.handleAddEndpoints, + UpdateFunc: result.handleUpdateEndpoints, + DeleteFunc: result.handleDeleteEndpoints, + }, + cache.Indexers{}, + ) + result.informer = informer + result.lister = listers.NewEndpointsLister(store) + return result } // RegisterHandler registers a handler which is called on every endpoints change. func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) { - c.bcaster.Add(config.ListenerFunc(func(instance interface{}) { - glog.V(3).Infof("Calling handler.OnEndpointsUpdate()") - handler.OnEndpointsUpdate(instance.([]*api.Endpoints)) - })) + c.handlers = append(c.handlers, handler) } -// Channel returns a channel to which endpoints updates should be delivered. -func (c *EndpointsConfig) Channel(source string) chan EndpointsUpdate { - ch := c.mux.Channel(source) - endpointsCh := make(chan EndpointsUpdate) +// Run starts the underlying informer and goroutine responsible for calling +// registered handlers. +func (c *EndpointsConfig) Run(stopCh <-chan struct{}) { + // The updates channel is used to send interrupts to the Endpoints handler. + // It's buffered because we never want to block for as long as there is a + // pending interrupt, but don't want to drop them if the handler is doing + // work. + c.updates = make(chan struct{}, 1) + go c.informer.Run(stopCh) + if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) { + utilruntime.HandleError(fmt.Errorf("endpoint controller not synced")) + return + } + + // We have synced informers. Now we can start delivering updates + // to the registered handler. go func() { - for update := range endpointsCh { - ch <- update + for range c.updates { + endpoints, err := c.lister.List(labels.Everything()) + if err != nil { + glog.Errorf("Error while listing endpoints from cache: %v", err) + // This will cause a retry (if there isn't any other trigger in-flight). + c.dispatchUpdate() + continue + } + if endpoints == nil { + endpoints = []*api.Endpoints{} + } + for i := range c.handlers { + glog.V(3).Infof("Calling handler.OnEndpointsUpdate()") + c.handlers[i].OnEndpointsUpdate(endpoints) + } } }() - return endpointsCh + // Close updates channel when stopCh is closed. + go func() { + <-stopCh + close(c.updates) + }() } -// Config returns list of all endpoints from underlying store. -func (c *EndpointsConfig) Config() []api.Endpoints { - return c.store.MergedState().([]api.Endpoints) +func (c *EndpointsConfig) handleAddEndpoints(_ interface{}) { + c.dispatchUpdate() } -type endpointsStore struct { - endpointLock sync.RWMutex - endpoints map[string]map[types.NamespacedName]*api.Endpoints - synced bool - updates chan<- struct{} +func (c *EndpointsConfig) handleUpdateEndpoints(_, _ interface{}) { + c.dispatchUpdate() } -func (s *endpointsStore) Merge(source string, change interface{}) error { - s.endpointLock.Lock() - endpoints := s.endpoints[source] - if endpoints == nil { - endpoints = make(map[types.NamespacedName]*api.Endpoints) - } - update := change.(EndpointsUpdate) - switch update.Op { - case ADD, UPDATE: - glog.V(5).Infof("Adding new endpoint from source %s : %s", source, spew.Sdump(update.Endpoints)) - name := types.NamespacedName{Namespace: update.Endpoints.Namespace, Name: update.Endpoints.Name} - endpoints[name] = update.Endpoints - case REMOVE: - glog.V(5).Infof("Removing an endpoint %s", spew.Sdump(update.Endpoints)) - name := types.NamespacedName{Namespace: update.Endpoints.Namespace, Name: update.Endpoints.Name} - delete(endpoints, name) - case SYNCED: - s.synced = true +func (c *EndpointsConfig) handleDeleteEndpoints(_ interface{}) { + c.dispatchUpdate() +} + +func (c *EndpointsConfig) dispatchUpdate() { + select { + case c.updates <- struct{}{}: default: - glog.V(4).Infof("Received invalid update type: %s", spew.Sdump(update)) + glog.V(4).Infof("Endpoints handler already has a pending interrupt.") } - s.endpoints[source] = endpoints - synced := s.synced - s.endpointLock.Unlock() - if s.updates != nil && synced { - select { - case s.updates <- struct{}{}: - default: - glog.V(4).Infof("Endpoints handler already has a pending interrupt.") - } - } - return nil -} - -func (s *endpointsStore) MergedState() interface{} { - s.endpointLock.RLock() - defer s.endpointLock.RUnlock() - endpoints := make([]*api.Endpoints, 0) - for _, sourceEndpoints := range s.endpoints { - for _, value := range sourceEndpoints { - endpoints = append(endpoints, value) - } - } - return endpoints } // ServiceConfig tracks a set of service configurations. // It accepts "set", "add" and "remove" operations of services via channels, and invokes registered handlers on change. type ServiceConfig struct { - mux *config.Mux - bcaster *config.Broadcaster - store *serviceStore + informer cache.Controller + lister listers.ServiceLister + handlers []ServiceConfigHandler + // updates channel is used to trigger registered handlers + updates chan struct{} } // NewServiceConfig creates a new ServiceConfig. -// It immediately runs the created ServiceConfig. -func NewServiceConfig() *ServiceConfig { - // The updates channel is used to send interrupts to the Services handler. - // It's buffered because we never want to block for as long as there is a - // pending interrupt, but don't want to drop them if the handler is doing - // work. - updates := make(chan struct{}, 1) - store := &serviceStore{updates: updates, services: make(map[string]map[types.NamespacedName]*api.Service)} - mux := config.NewMux(store) - bcaster := config.NewBroadcaster() - go watchForUpdates(bcaster, store, updates) - return &ServiceConfig{mux, bcaster, store} +func NewServiceConfig(c cache.Getter, period time.Duration) *ServiceConfig { + servicesLW := cache.NewListWatchFromClient(c, "services", metav1.NamespaceAll, fields.Everything()) + return newServiceConfig(servicesLW, period) +} + +func newServiceConfig(lw cache.ListerWatcher, period time.Duration) *ServiceConfig { + result := &ServiceConfig{} + + store, informer := cache.NewIndexerInformer( + lw, + &api.Service{}, + period, + cache.ResourceEventHandlerFuncs{ + AddFunc: result.handleAddService, + UpdateFunc: result.handleUpdateService, + DeleteFunc: result.handleDeleteService, + }, + cache.Indexers{}, + ) + result.informer = informer + result.lister = listers.NewServiceLister(store) + return result } // RegisterHandler registers a handler which is called on every services change. func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) { - c.bcaster.Add(config.ListenerFunc(func(instance interface{}) { - glog.V(3).Infof("Calling handler.OnServiceUpdate()") - handler.OnServiceUpdate(instance.([]api.Service)) - })) + c.handlers = append(c.handlers, handler) } -// Channel returns a channel to which services updates should be delivered. -func (c *ServiceConfig) Channel(source string) chan ServiceUpdate { - ch := c.mux.Channel(source) - serviceCh := make(chan ServiceUpdate) +// Run starts the underlying informer and goroutine responsible for calling +// registered handlers. +func (c *ServiceConfig) Run(stopCh <-chan struct{}) { + // The updates channel is used to send interrupts to the Services handler. + // It's buffered because we never want to block for as long as there is a + // pending interrupt, but don't want to drop them if the handler is doing + // work. + c.updates = make(chan struct{}, 1) + go c.informer.Run(stopCh) + if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) { + utilruntime.HandleError(fmt.Errorf("service controller not synced")) + return + } + + // We hanve synced informers. Now we can start delivering updates + // to the registered handler. go func() { - for update := range serviceCh { - ch <- update + for range c.updates { + services, err := c.lister.List(labels.Everything()) + if err != nil { + glog.Errorf("Error while listing services from cache: %v", err) + // This will cause a retry (if there isnt' any other trigger in-flight). + c.dispatchUpdate() + continue + } + svcs := make([]api.Service, 0, len(services)) + for i := range services { + svcs = append(svcs, *services[i]) + } + for i := range c.handlers { + glog.V(3).Infof("Calling handler.OnServiceUpdate()") + c.handlers[i].OnServiceUpdate(svcs) + } } }() - return serviceCh + // Close updates channel when stopCh is closed. + go func() { + <-stopCh + close(c.updates) + }() } -// Config returns list of all services from underlying store. -func (c *ServiceConfig) Config() []api.Service { - return c.store.MergedState().([]api.Service) +func (c *ServiceConfig) handleAddService(_ interface{}) { + c.dispatchUpdate() } -type serviceStore struct { - serviceLock sync.RWMutex - services map[string]map[types.NamespacedName]*api.Service - synced bool - updates chan<- struct{} +func (c *ServiceConfig) handleUpdateService(_, _ interface{}) { + c.dispatchUpdate() } -func (s *serviceStore) Merge(source string, change interface{}) error { - s.serviceLock.Lock() - services := s.services[source] - if services == nil { - services = make(map[types.NamespacedName]*api.Service) - } - update := change.(ServiceUpdate) - switch update.Op { - case ADD, UPDATE: - glog.V(5).Infof("Adding new service from source %s : %s", source, spew.Sdump(update.Service)) - name := types.NamespacedName{Namespace: update.Service.Namespace, Name: update.Service.Name} - services[name] = update.Service - case REMOVE: - glog.V(5).Infof("Removing a service %s", spew.Sdump(update.Service)) - name := types.NamespacedName{Namespace: update.Service.Namespace, Name: update.Service.Name} - delete(services, name) - case SYNCED: - s.synced = true +func (c *ServiceConfig) handleDeleteService(_ interface{}) { + c.dispatchUpdate() +} + +func (c *ServiceConfig) dispatchUpdate() { + select { + case c.updates <- struct{}{}: default: - glog.V(4).Infof("Received invalid update type: %s", spew.Sdump(update)) + glog.V(4).Infof("Service handler alread has a pending interrupt.") } - s.services[source] = services - synced := s.synced - s.serviceLock.Unlock() - if s.updates != nil && synced { - select { - case s.updates <- struct{}{}: - default: - glog.V(4).Infof("Service handler already has a pending interrupt.") - } - } - return nil -} - -func (s *serviceStore) MergedState() interface{} { - s.serviceLock.RLock() - defer s.serviceLock.RUnlock() - services := make([]api.Service, 0) - for _, sourceServices := range s.services { - for _, value := range sourceServices { - services = append(services, *value) - } - } - return services } // watchForUpdates invokes bcaster.Notify() with the latest version of an object diff --git a/pkg/proxy/config/config_test.go b/pkg/proxy/config/config_test.go index 6be9c378554..8608e88f732 100644 --- a/pkg/proxy/config/config_test.go +++ b/pkg/proxy/config/config_test.go @@ -24,19 +24,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" "k8s.io/kubernetes/pkg/api" ) -const TomcatPort int = 8080 -const TomcatName = "tomcat" - -var TomcatEndpoints = map[string]string{"c0": "1.1.1.1:18080", "c1": "2.2.2.2:18081"} - -const MysqlPort int = 3306 -const MysqlName = "mysql" - -var MysqlEndpoints = map[string]string{"c0": "1.1.1.1:13306", "c3": "2.2.2.2:13306"} - type sortedServices []api.Service func (s sortedServices) Len() int { @@ -129,197 +120,198 @@ func (h *EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints } } -func CreateServiceUpdate(op Operation, service *api.Service) ServiceUpdate { - return ServiceUpdate{Op: op, Service: service} -} - -func CreateEndpointsUpdate(op Operation, endpoints *api.Endpoints) EndpointsUpdate { - return EndpointsUpdate{Op: op, Endpoints: endpoints} -} - func TestNewServiceAddedAndNotified(t *testing.T) { - config := NewServiceConfig() - config.store.synced = true - channel := config.Channel("one") + fakeWatch := watch.NewFake() + lw := fakeLW{ + listResp: &api.ServiceList{Items: []api.Service{}}, + watchResp: fakeWatch, + } + stopCh := make(chan struct{}) + defer close(stopCh) + + config := newServiceConfig(lw, time.Minute) handler := NewServiceHandlerMock() config.RegisterHandler(handler) - serviceUpdate := CreateServiceUpdate(ADD, &api.Service{ + go config.Run(stopCh) + + service := &api.Service{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}}, - }) - channel <- serviceUpdate - handler.ValidateServices(t, []api.Service{*serviceUpdate.Service}) + } + fakeWatch.Add(service) + handler.ValidateServices(t, []api.Service{*service}) } func TestServiceAddedRemovedSetAndNotified(t *testing.T) { - config := NewServiceConfig() - config.store.synced = true - channel := config.Channel("one") - handler := NewServiceHandlerMock() - config.RegisterHandler(handler) - serviceUpdate := CreateServiceUpdate(ADD, &api.Service{ - ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, - Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}}, - }) - channel <- serviceUpdate - handler.ValidateServices(t, []api.Service{*serviceUpdate.Service}) - - serviceUpdate2 := CreateServiceUpdate(ADD, &api.Service{ - ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, - Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}}, - }) - channel <- serviceUpdate2 - services := []api.Service{*serviceUpdate2.Service, *serviceUpdate.Service} - handler.ValidateServices(t, services) - - serviceUpdate3 := CreateServiceUpdate(REMOVE, &api.Service{ - ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, - }) - channel <- serviceUpdate3 - services = []api.Service{*serviceUpdate2.Service} - handler.ValidateServices(t, services) -} - -func TestNewMultipleSourcesServicesAddedAndNotified(t *testing.T) { - config := NewServiceConfig() - config.store.synced = true - channelOne := config.Channel("one") - channelTwo := config.Channel("two") - if channelOne == channelTwo { - t.Error("Same channel handed back for one and two") + fakeWatch := watch.NewFake() + lw := fakeLW{ + listResp: &api.ServiceList{Items: []api.Service{}}, + watchResp: fakeWatch, } + stopCh := make(chan struct{}) + defer close(stopCh) + + config := newServiceConfig(lw, time.Minute) handler := NewServiceHandlerMock() config.RegisterHandler(handler) - serviceUpdate1 := CreateServiceUpdate(ADD, &api.Service{ + go config.Run(stopCh) + + service1 := &api.Service{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}}, - }) - serviceUpdate2 := CreateServiceUpdate(ADD, &api.Service{ + } + fakeWatch.Add(service1) + handler.ValidateServices(t, []api.Service{*service1}) + + service2 := &api.Service{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}}, - }) - channelOne <- serviceUpdate1 - channelTwo <- serviceUpdate2 - services := []api.Service{*serviceUpdate2.Service, *serviceUpdate1.Service} + } + fakeWatch.Add(service2) + services := []api.Service{*service2, *service1} + handler.ValidateServices(t, services) + + fakeWatch.Delete(service1) + services = []api.Service{*service2} handler.ValidateServices(t, services) } -func TestNewMultipleSourcesServicesMultipleHandlersAddedAndNotified(t *testing.T) { - config := NewServiceConfig() - config.store.synced = true - channelOne := config.Channel("one") - channelTwo := config.Channel("two") +func TestNewServicesMultipleHandlersAddedAndNotified(t *testing.T) { + fakeWatch := watch.NewFake() + lw := fakeLW{ + listResp: &api.ServiceList{Items: []api.Service{}}, + watchResp: fakeWatch, + } + stopCh := make(chan struct{}) + defer close(stopCh) + + config := newServiceConfig(lw, time.Minute) handler := NewServiceHandlerMock() handler2 := NewServiceHandlerMock() config.RegisterHandler(handler) config.RegisterHandler(handler2) - serviceUpdate1 := CreateServiceUpdate(ADD, &api.Service{ + go config.Run(stopCh) + + service1 := &api.Service{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}}, - }) - serviceUpdate2 := CreateServiceUpdate(ADD, &api.Service{ + } + service2 := &api.Service{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}}, - }) - channelOne <- serviceUpdate1 - channelTwo <- serviceUpdate2 - services := []api.Service{*serviceUpdate2.Service, *serviceUpdate1.Service} + } + fakeWatch.Add(service1) + fakeWatch.Add(service2) + + services := []api.Service{*service2, *service1} handler.ValidateServices(t, services) handler2.ValidateServices(t, services) } -func TestNewMultipleSourcesEndpointsMultipleHandlersAddedAndNotified(t *testing.T) { - config := NewEndpointsConfig() - config.store.synced = true - channelOne := config.Channel("one") - channelTwo := config.Channel("two") +func TestNewEndpointsMultipleHandlersAddedAndNotified(t *testing.T) { + fakeWatch := watch.NewFake() + lw := fakeLW{ + listResp: &api.EndpointsList{Items: []api.Endpoints{}}, + watchResp: fakeWatch, + } + stopCh := make(chan struct{}) + defer close(stopCh) + + config := newEndpointsConfig(lw, time.Minute) handler := NewEndpointsHandlerMock() handler2 := NewEndpointsHandlerMock() config.RegisterHandler(handler) config.RegisterHandler(handler2) - endpointsUpdate1 := CreateEndpointsUpdate(ADD, &api.Endpoints{ + go config.Run(stopCh) + + endpoints1 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "1.1.1.1"}, {IP: "2.2.2.2"}}, Ports: []api.EndpointPort{{Port: 80}}, }}, - }) - endpointsUpdate2 := CreateEndpointsUpdate(ADD, &api.Endpoints{ + } + endpoints2 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "3.3.3.3"}, {IP: "4.4.4.4"}}, Ports: []api.EndpointPort{{Port: 80}}, }}, - }) - channelOne <- endpointsUpdate1 - channelTwo <- endpointsUpdate2 + } + fakeWatch.Add(endpoints1) + fakeWatch.Add(endpoints2) - endpoints := []*api.Endpoints{endpointsUpdate2.Endpoints, endpointsUpdate1.Endpoints} + endpoints := []*api.Endpoints{endpoints2, endpoints1} handler.ValidateEndpoints(t, endpoints) handler2.ValidateEndpoints(t, endpoints) } -func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) { - config := NewEndpointsConfig() - config.store.synced = true - channelOne := config.Channel("one") - channelTwo := config.Channel("two") +func TestNewEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) { + fakeWatch := watch.NewFake() + lw := fakeLW{ + listResp: &api.EndpointsList{Items: []api.Endpoints{}}, + watchResp: fakeWatch, + } + stopCh := make(chan struct{}) + defer close(stopCh) + + config := newEndpointsConfig(lw, time.Minute) handler := NewEndpointsHandlerMock() handler2 := NewEndpointsHandlerMock() config.RegisterHandler(handler) config.RegisterHandler(handler2) - endpointsUpdate1 := CreateEndpointsUpdate(ADD, &api.Endpoints{ + go config.Run(stopCh) + + endpoints1 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "1.1.1.1"}, {IP: "2.2.2.2"}}, Ports: []api.EndpointPort{{Port: 80}}, }}, - }) - endpointsUpdate2 := CreateEndpointsUpdate(ADD, &api.Endpoints{ + } + endpoints2 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "3.3.3.3"}, {IP: "4.4.4.4"}}, Ports: []api.EndpointPort{{Port: 80}}, }}, - }) - channelOne <- endpointsUpdate1 - channelTwo <- endpointsUpdate2 + } + fakeWatch.Add(endpoints1) + fakeWatch.Add(endpoints2) - endpoints := []*api.Endpoints{endpointsUpdate2.Endpoints, endpointsUpdate1.Endpoints} + endpoints := []*api.Endpoints{endpoints2, endpoints1} handler.ValidateEndpoints(t, endpoints) handler2.ValidateEndpoints(t, endpoints) // Add one more - endpointsUpdate3 := CreateEndpointsUpdate(ADD, &api.Endpoints{ + endpoints3 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foobar"}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "5.5.5.5"}, {IP: "6.6.6.6"}}, Ports: []api.EndpointPort{{Port: 80}}, }}, - }) - channelTwo <- endpointsUpdate3 - endpoints = []*api.Endpoints{endpointsUpdate2.Endpoints, endpointsUpdate1.Endpoints, endpointsUpdate3.Endpoints} + } + fakeWatch.Add(endpoints3) + endpoints = []*api.Endpoints{endpoints2, endpoints1, endpoints3} handler.ValidateEndpoints(t, endpoints) handler2.ValidateEndpoints(t, endpoints) // Update the "foo" service with new endpoints - endpointsUpdate1 = CreateEndpointsUpdate(ADD, &api.Endpoints{ + endpoints1v2 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "7.7.7.7"}}, Ports: []api.EndpointPort{{Port: 80}}, }}, - }) - channelOne <- endpointsUpdate1 - endpoints = []*api.Endpoints{endpointsUpdate2.Endpoints, endpointsUpdate1.Endpoints, endpointsUpdate3.Endpoints} + } + fakeWatch.Modify(endpoints1v2) + endpoints = []*api.Endpoints{endpoints2, endpoints1v2, endpoints3} handler.ValidateEndpoints(t, endpoints) handler2.ValidateEndpoints(t, endpoints) - // Remove "bar" service - endpointsUpdate2 = CreateEndpointsUpdate(REMOVE, &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}}) - channelTwo <- endpointsUpdate2 - - endpoints = []*api.Endpoints{endpointsUpdate1.Endpoints, endpointsUpdate3.Endpoints} + // Remove "bar" endpoints + fakeWatch.Delete(endpoints2) + endpoints = []*api.Endpoints{endpoints1v2, endpoints3} handler.ValidateEndpoints(t, endpoints) handler2.ValidateEndpoints(t, endpoints) }