diff --git a/pkg/proxy/config/api.go b/pkg/proxy/config/api.go index 55951e5a712..51f8abf4a23 100644 --- a/pkg/proxy/config/api.go +++ b/pkg/proxy/config/api.go @@ -21,41 +21,54 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/cache" - client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/fields" ) // NewSourceAPI creates config source that watches for changes to the services and endpoints. -func NewSourceAPI(c *client.Client, period time.Duration, servicesChan chan<- ServiceUpdate, endpointsChan chan<- EndpointsUpdate) { +func NewSourceAPI(c cache.Getter, period time.Duration, servicesChan chan<- ServiceUpdate, endpointsChan chan<- EndpointsUpdate) { servicesLW := cache.NewListWatchFromClient(c, "services", api.NamespaceAll, fields.Everything()) - endpointsLW := cache.NewListWatchFromClient(c, "endpoints", api.NamespaceAll, fields.Everything()) + cache.NewReflector(servicesLW, &api.Service{}, NewServiceStore(nil, servicesChan), period).Run() - newServicesSourceApiFromLW(servicesLW, period, servicesChan) - newEndpointsSourceApiFromLW(endpointsLW, period, endpointsChan) + endpointsLW := cache.NewListWatchFromClient(c, "endpoints", api.NamespaceAll, fields.Everything()) + cache.NewReflector(endpointsLW, &api.Endpoints{}, NewEndpointsStore(nil, endpointsChan), period).Run() } -func newServicesSourceApiFromLW(servicesLW cache.ListerWatcher, period time.Duration, servicesChan chan<- ServiceUpdate) { - servicesPush := func(objs []interface{}) { +// NewServiceStore creates an undelta store that expands updates to the store into +// ServiceUpdate events on the channel. If no store is passed, a default store will +// be initialized. Allows reuse of a cache store across multiple components. +func NewServiceStore(store cache.Store, ch chan<- ServiceUpdate) cache.Store { + fn := func(objs []interface{}) { var services []api.Service for _, o := range objs { services = append(services, *(o.(*api.Service))) } - servicesChan <- ServiceUpdate{Op: SET, Services: services} + ch <- ServiceUpdate{Op: SET, Services: services} + } + if store == nil { + store = cache.NewStore(cache.MetaNamespaceKeyFunc) + } + return &cache.UndeltaStore{ + Store: store, + PushFunc: fn, } - - serviceQueue := cache.NewUndeltaStore(servicesPush, cache.MetaNamespaceKeyFunc) - cache.NewReflector(servicesLW, &api.Service{}, serviceQueue, period).Run() } -func newEndpointsSourceApiFromLW(endpointsLW cache.ListerWatcher, period time.Duration, endpointsChan chan<- EndpointsUpdate) { - endpointsPush := func(objs []interface{}) { +// NewEndpointsStore creates an undelta store that expands updates to the store into +// EndpointsUpdate events on the channel. If no store is passed, a default store will +// be initialized. Allows reuse of a cache store across multiple components. +func NewEndpointsStore(store cache.Store, ch chan<- EndpointsUpdate) cache.Store { + fn := func(objs []interface{}) { var endpoints []api.Endpoints for _, o := range objs { endpoints = append(endpoints, *(o.(*api.Endpoints))) } - endpointsChan <- EndpointsUpdate{Op: SET, Endpoints: endpoints} + ch <- EndpointsUpdate{Op: SET, Endpoints: endpoints} + } + if store == nil { + store = cache.NewStore(cache.MetaNamespaceKeyFunc) + } + return &cache.UndeltaStore{ + Store: store, + PushFunc: fn, } - - endpointQueue := cache.NewUndeltaStore(endpointsPush, cache.MetaNamespaceKeyFunc) - cache.NewReflector(endpointsLW, &api.Endpoints{}, endpointQueue, period).Run() } diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go index 6e5b0e11d48..66d62ae9ef8 100644 --- a/pkg/proxy/config/api_test.go +++ b/pkg/proxy/config/api_test.go @@ -61,7 +61,7 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) { ch := make(chan ServiceUpdate) - newServicesSourceApiFromLW(lw, 30*time.Second, ch) + cache.NewReflector(lw, &api.Service{}, NewServiceStore(nil, ch), 30*time.Second).Run() got, ok := <-ch if !ok { @@ -172,7 +172,7 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) { ch := make(chan EndpointsUpdate) - newEndpointsSourceApiFromLW(lw, 30*time.Second, ch) + cache.NewReflector(lw, &api.Endpoints{}, NewEndpointsStore(nil, ch), 30*time.Second).Run() got, ok := <-ch if !ok {