Make pkg/proxy/config more like pkg/kubelet/config

Split SourceAPI into two subobjects.

Parallel structure for endpoints, services will allow
changing to use generic code in pkg/client/cache/reflector.go.

Rename some funcs to be more like pkg/client/cache.
This commit is contained in:
Eric Tune
2015-01-10 21:13:32 -08:00
parent fa152ab3f1
commit 295800201e
2 changed files with 101 additions and 58 deletions

View File

@@ -32,12 +32,14 @@ func TestServices(t *testing.T) {
fakeWatch := watch.NewFake()
fakeClient := &client.Fake{Watch: fakeWatch}
services := make(chan ServiceUpdate)
source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), services: services}
source := SourceAPI{
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll), services: services},
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll)}}
resourceVersion := "1"
go func() {
// called twice
source.runServices(&resourceVersion)
source.runServices(&resourceVersion)
source.s.run(&resourceVersion)
source.s.run(&resourceVersion)
}()
// test adding a service to the watch
@@ -84,11 +86,13 @@ func TestServicesFromZero(t *testing.T) {
},
}
services := make(chan ServiceUpdate)
source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), services: services}
source := SourceAPI{
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll), services: services},
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll)}}
resourceVersion := ""
ch := make(chan struct{})
go func() {
source.runServices(&resourceVersion)
source.s.run(&resourceVersion)
close(ch)
}()
@@ -112,11 +116,13 @@ func TestServicesFromZero(t *testing.T) {
func TestServicesError(t *testing.T) {
fakeClient := &client.Fake{Err: errors.New("test")}
services := make(chan ServiceUpdate)
source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), services: services}
source := SourceAPI{
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll), services: services},
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll)}}
resourceVersion := "1"
ch := make(chan struct{})
go func() {
source.runServices(&resourceVersion)
source.s.run(&resourceVersion)
close(ch)
}()
@@ -133,11 +139,13 @@ func TestServicesError(t *testing.T) {
func TestServicesErrorTimeout(t *testing.T) {
fakeClient := &client.Fake{Err: errors.New("use of closed network connection")}
services := make(chan ServiceUpdate)
source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), services: services}
source := SourceAPI{
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll), services: services},
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll)}}
resourceVersion := "1"
ch := make(chan struct{})
go func() {
source.runServices(&resourceVersion)
source.s.run(&resourceVersion)
close(ch)
}()
@@ -154,11 +162,13 @@ func TestServicesErrorTimeout(t *testing.T) {
func TestServicesFromZeroError(t *testing.T) {
fakeClient := &client.Fake{Err: errors.New("test")}
services := make(chan ServiceUpdate)
source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), services: services}
source := SourceAPI{
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll), services: services},
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll)}}
resourceVersion := ""
ch := make(chan struct{})
go func() {
source.runServices(&resourceVersion)
source.s.run(&resourceVersion)
close(ch)
}()
@@ -178,12 +188,14 @@ func TestEndpoints(t *testing.T) {
fakeWatch := watch.NewFake()
fakeClient := &client.Fake{Watch: fakeWatch}
endpoints := make(chan EndpointsUpdate)
source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}
source := SourceAPI{
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll)},
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}}
resourceVersion := "1"
go func() {
// called twice
source.runEndpoints(&resourceVersion)
source.runEndpoints(&resourceVersion)
source.e.run(&resourceVersion)
source.e.run(&resourceVersion)
}()
// test adding an endpoint to the watch
@@ -230,11 +242,13 @@ func TestEndpointsFromZero(t *testing.T) {
},
}
endpoints := make(chan EndpointsUpdate)
source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}
source := SourceAPI{
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll)},
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}}
resourceVersion := ""
ch := make(chan struct{})
go func() {
source.runEndpoints(&resourceVersion)
source.e.run(&resourceVersion)
close(ch)
}()
@@ -258,11 +272,13 @@ func TestEndpointsFromZero(t *testing.T) {
func TestEndpointsError(t *testing.T) {
fakeClient := &client.Fake{Err: errors.New("test")}
endpoints := make(chan EndpointsUpdate)
source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}
source := SourceAPI{
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll)},
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}}
resourceVersion := "1"
ch := make(chan struct{})
go func() {
source.runEndpoints(&resourceVersion)
source.e.run(&resourceVersion)
close(ch)
}()
@@ -279,11 +295,13 @@ func TestEndpointsError(t *testing.T) {
func TestEndpointsErrorTimeout(t *testing.T) {
fakeClient := &client.Fake{Err: errors.New("use of closed network connection")}
endpoints := make(chan EndpointsUpdate)
source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}
source := SourceAPI{
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll)},
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}}
resourceVersion := "1"
ch := make(chan struct{})
go func() {
source.runEndpoints(&resourceVersion)
source.e.run(&resourceVersion)
close(ch)
}()
@@ -300,11 +318,13 @@ func TestEndpointsErrorTimeout(t *testing.T) {
func TestEndpointsFromZeroError(t *testing.T) {
fakeClient := &client.Fake{Err: errors.New("test")}
endpoints := make(chan EndpointsUpdate)
source := SourceAPI{servicesWatcher: fakeClient.Services(api.NamespaceAll), endpointsWatcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}
source := SourceAPI{
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll)},
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}}
resourceVersion := ""
ch := make(chan struct{})
go func() {
source.runEndpoints(&resourceVersion)
source.e.run(&resourceVersion)
close(ch)
}()