From 295800201e6aa58912b59b0a34ad1f0eab00135b Mon Sep 17 00:00:00 2001 From: Eric Tune Date: Sat, 10 Jan 2015 21:13:32 -0800 Subject: [PATCH] Make pkg/proxy/config more like pkg/kubelet/config Split SourceAPI into two subobjects. Parallel structure for endpoints, services will allow changing to use generic code in pkg/client/cache/reflector.go. Rename some funcs to be more like pkg/client/cache. --- pkg/proxy/config/api.go | 95 ++++++++++++++++++++++-------------- pkg/proxy/config/api_test.go | 64 +++++++++++++++--------- 2 files changed, 101 insertions(+), 58 deletions(-) diff --git a/pkg/proxy/config/api.go b/pkg/proxy/config/api.go index 87ce0902472..1507fe5f692 100644 --- a/pkg/proxy/config/api.go +++ b/pkg/proxy/config/api.go @@ -28,6 +28,7 @@ import ( "github.com/golang/glog" ) +// TODO: to use Reflector, need to change the ServicesWatcher to a generic ListerWatcher. // ServicesWatcher is capable of listing and watching for changes to services across ALL namespaces type ServicesWatcher interface { List(label labels.Selector) (*api.ServiceList, error) @@ -43,12 +44,22 @@ type EndpointsWatcher interface { // SourceAPI implements a configuration source for services and endpoints that // uses the client watch API to efficiently detect changes. type SourceAPI struct { - servicesWatcher ServicesWatcher - endpointsWatcher EndpointsWatcher + s servicesReflector + e endpointsReflector +} - services chan<- ServiceUpdate - endpoints chan<- EndpointsUpdate +type servicesReflector struct { + watcher ServicesWatcher + services chan<- ServiceUpdate + resourceVersion string + waitDuration time.Duration + reconnectDuration time.Duration +} +type endpointsReflector struct { + watcher EndpointsWatcher + endpoints chan<- EndpointsUpdate + resourceVersion string waitDuration time.Duration reconnectDuration time.Duration } @@ -56,42 +67,54 @@ type SourceAPI struct { // NewSourceAPI creates a config source that watches for changes to the services and endpoints. func NewSourceAPI(servicesWatcher ServicesWatcher, endpointsWatcher EndpointsWatcher, period time.Duration, services chan<- ServiceUpdate, endpoints chan<- EndpointsUpdate) *SourceAPI { config := &SourceAPI{ - servicesWatcher: servicesWatcher, - endpointsWatcher: endpointsWatcher, - services: services, - endpoints: endpoints, - - waitDuration: period, - // prevent hot loops if the server starts to misbehave - reconnectDuration: time.Second * 1, + s: servicesReflector{ + watcher: servicesWatcher, + services: services, + resourceVersion: "", + waitDuration: period, + // prevent hot loops if the server starts to misbehave + reconnectDuration: time.Second * 1, + }, + e: endpointsReflector{ + watcher: endpointsWatcher, + endpoints: endpoints, + resourceVersion: "", + waitDuration: period, + // prevent hot loops if the server starts to misbehave + reconnectDuration: time.Second * 1, + }, } - serviceVersion := "" - go util.Forever(func() { - config.runServices(&serviceVersion) - time.Sleep(wait.Jitter(config.reconnectDuration, 0.0)) - }, period) - endpointVersion := "" - go util.Forever(func() { - config.runEndpoints(&endpointVersion) - time.Sleep(wait.Jitter(config.reconnectDuration, 0.0)) - }, period) + go util.Forever(func() { config.s.listAndWatch() }, period) + go util.Forever(func() { config.e.listAndWatch() }, period) return config } -// runServices loops forever looking for changes to services. -func (s *SourceAPI) runServices(resourceVersion *string) { +func (r *servicesReflector) listAndWatch() { + r.run(&r.resourceVersion) + time.Sleep(wait.Jitter(r.reconnectDuration, 0.0)) +} + +func (r *endpointsReflector) listAndWatch() { + r.run(&r.resourceVersion) + time.Sleep(wait.Jitter(r.reconnectDuration, 0.0)) +} + +// run loops forever looking for changes to services. +func (s *servicesReflector) run(resourceVersion *string) { if len(*resourceVersion) == 0 { - services, err := s.servicesWatcher.List(labels.Everything()) + services, err := s.watcher.List(labels.Everything()) if err != nil { glog.Errorf("Unable to load services: %v", err) + // TODO: reconcile with pkg/client/cache which doesn't use reflector. time.Sleep(wait.Jitter(s.waitDuration, 0.0)) return } *resourceVersion = services.ResourceVersion + // TODO: replace with code to update the s.services <- ServiceUpdate{Op: SET, Services: services.Items} } - watcher, err := s.servicesWatcher.Watch(labels.Everything(), labels.Everything(), *resourceVersion) + watcher, err := s.watcher.Watch(labels.Everything(), labels.Everything(), *resourceVersion) if err != nil { glog.Errorf("Unable to watch for services changes: %v", err) if !client.IsTimeout(err) { @@ -104,11 +127,11 @@ func (s *SourceAPI) runServices(resourceVersion *string) { defer watcher.Stop() ch := watcher.ResultChan() - handleServicesWatch(resourceVersion, ch, s.services) + s.watchHandler(resourceVersion, ch, s.services) } -// handleServicesWatch loops over an event channel and delivers config changes to an update channel. -func handleServicesWatch(resourceVersion *string, ch <-chan watch.Event, updates chan<- ServiceUpdate) { +// watchHandler loops over an event channel and delivers config changes to an update channel. +func (s *servicesReflector) watchHandler(resourceVersion *string, ch <-chan watch.Event, updates chan<- ServiceUpdate) { for { select { case event, ok := <-ch: @@ -146,10 +169,10 @@ func handleServicesWatch(resourceVersion *string, ch <-chan watch.Event, updates } } -// runEndpoints loops forever looking for changes to endpoints. -func (s *SourceAPI) runEndpoints(resourceVersion *string) { +// run loops forever looking for changes to endpoints. +func (s *endpointsReflector) run(resourceVersion *string) { if len(*resourceVersion) == 0 { - endpoints, err := s.endpointsWatcher.List(labels.Everything()) + endpoints, err := s.watcher.List(labels.Everything()) if err != nil { glog.Errorf("Unable to load endpoints: %v", err) time.Sleep(wait.Jitter(s.waitDuration, 0.0)) @@ -159,7 +182,7 @@ func (s *SourceAPI) runEndpoints(resourceVersion *string) { s.endpoints <- EndpointsUpdate{Op: SET, Endpoints: endpoints.Items} } - watcher, err := s.endpointsWatcher.Watch(labels.Everything(), labels.Everything(), *resourceVersion) + watcher, err := s.watcher.Watch(labels.Everything(), labels.Everything(), *resourceVersion) if err != nil { glog.Errorf("Unable to watch for endpoints changes: %v", err) if !client.IsTimeout(err) { @@ -173,11 +196,11 @@ func (s *SourceAPI) runEndpoints(resourceVersion *string) { defer watcher.Stop() ch := watcher.ResultChan() - handleEndpointsWatch(resourceVersion, ch, s.endpoints) + s.watchHandler(resourceVersion, ch, s.endpoints) } -// handleEndpointsWatch loops over an event channel and delivers config changes to an update channel. -func handleEndpointsWatch(resourceVersion *string, ch <-chan watch.Event, updates chan<- EndpointsUpdate) { +// watchHandler loops over an event channel and delivers config changes to an update channel. +func (s *endpointsReflector) watchHandler(resourceVersion *string, ch <-chan watch.Event, updates chan<- EndpointsUpdate) { for { select { case event, ok := <-ch: diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go index ebb0b18d3ce..11d34e47e40 100644 --- a/pkg/proxy/config/api_test.go +++ b/pkg/proxy/config/api_test.go @@ -32,12 +32,14 @@ func TestServices(t *testing.T) { fakeWatch := watch.NewFake() fakeClient := &client.Fake{Watch: fakeWatch} services := make(chan ServiceUpdate) - source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), services: services} + source := SourceAPI{ + s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll), services: services}, + e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll)}} resourceVersion := "1" go func() { // called twice - source.runServices(&resourceVersion) - source.runServices(&resourceVersion) + source.s.run(&resourceVersion) + source.s.run(&resourceVersion) }() // test adding a service to the watch @@ -84,11 +86,13 @@ func TestServicesFromZero(t *testing.T) { }, } services := make(chan ServiceUpdate) - source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), services: services} + source := SourceAPI{ + s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll), services: services}, + e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll)}} resourceVersion := "" ch := make(chan struct{}) go func() { - source.runServices(&resourceVersion) + source.s.run(&resourceVersion) close(ch) }() @@ -112,11 +116,13 @@ func TestServicesFromZero(t *testing.T) { func TestServicesError(t *testing.T) { fakeClient := &client.Fake{Err: errors.New("test")} services := make(chan ServiceUpdate) - source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), services: services} + source := SourceAPI{ + s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll), services: services}, + e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll)}} resourceVersion := "1" ch := make(chan struct{}) go func() { - source.runServices(&resourceVersion) + source.s.run(&resourceVersion) close(ch) }() @@ -133,11 +139,13 @@ func TestServicesError(t *testing.T) { func TestServicesErrorTimeout(t *testing.T) { fakeClient := &client.Fake{Err: errors.New("use of closed network connection")} services := make(chan ServiceUpdate) - source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), services: services} + source := SourceAPI{ + s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll), services: services}, + e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll)}} resourceVersion := "1" ch := make(chan struct{}) go func() { - source.runServices(&resourceVersion) + source.s.run(&resourceVersion) close(ch) }() @@ -154,11 +162,13 @@ func TestServicesErrorTimeout(t *testing.T) { func TestServicesFromZeroError(t *testing.T) { fakeClient := &client.Fake{Err: errors.New("test")} services := make(chan ServiceUpdate) - source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), services: services} + source := SourceAPI{ + s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll), services: services}, + e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll)}} resourceVersion := "" ch := make(chan struct{}) go func() { - source.runServices(&resourceVersion) + source.s.run(&resourceVersion) close(ch) }() @@ -178,12 +188,14 @@ func TestEndpoints(t *testing.T) { fakeWatch := watch.NewFake() fakeClient := &client.Fake{Watch: fakeWatch} endpoints := make(chan EndpointsUpdate) - source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints} + source := SourceAPI{ + s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll)}, + e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}} resourceVersion := "1" go func() { // called twice - source.runEndpoints(&resourceVersion) - source.runEndpoints(&resourceVersion) + source.e.run(&resourceVersion) + source.e.run(&resourceVersion) }() // test adding an endpoint to the watch @@ -230,11 +242,13 @@ func TestEndpointsFromZero(t *testing.T) { }, } endpoints := make(chan EndpointsUpdate) - source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints} + source := SourceAPI{ + s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll)}, + e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}} resourceVersion := "" ch := make(chan struct{}) go func() { - source.runEndpoints(&resourceVersion) + source.e.run(&resourceVersion) close(ch) }() @@ -258,11 +272,13 @@ func TestEndpointsFromZero(t *testing.T) { func TestEndpointsError(t *testing.T) { fakeClient := &client.Fake{Err: errors.New("test")} endpoints := make(chan EndpointsUpdate) - source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints} + source := SourceAPI{ + s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll)}, + e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}} resourceVersion := "1" ch := make(chan struct{}) go func() { - source.runEndpoints(&resourceVersion) + source.e.run(&resourceVersion) close(ch) }() @@ -279,11 +295,13 @@ func TestEndpointsError(t *testing.T) { func TestEndpointsErrorTimeout(t *testing.T) { fakeClient := &client.Fake{Err: errors.New("use of closed network connection")} endpoints := make(chan EndpointsUpdate) - source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints} + source := SourceAPI{ + s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll)}, + e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}} resourceVersion := "1" ch := make(chan struct{}) go func() { - source.runEndpoints(&resourceVersion) + source.e.run(&resourceVersion) close(ch) }() @@ -300,11 +318,13 @@ func TestEndpointsErrorTimeout(t *testing.T) { func TestEndpointsFromZeroError(t *testing.T) { fakeClient := &client.Fake{Err: errors.New("test")} endpoints := make(chan EndpointsUpdate) - source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints} + source := SourceAPI{ + s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll)}, + e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}} resourceVersion := "" ch := make(chan struct{}) go func() { - source.runEndpoints(&resourceVersion) + source.e.run(&resourceVersion) close(ch) }()