diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index add6594c2a9..6e173095703 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -25,7 +25,6 @@ import ( "strconv" "time" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd" clientcmdapi "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd/api" @@ -130,8 +129,7 @@ func (s *ProxyServer) Run(_ []string) error { } config.NewSourceAPI( - client.Services(api.NamespaceAll), - client.Endpoints(api.NamespaceAll), + client, 30*time.Second, serviceConfig.Channel("api"), endpointsConfig.Channel("api"), diff --git a/pkg/proxy/config/api.go b/pkg/proxy/config/api.go index 727b970b063..19c94376366 100644 --- a/pkg/proxy/config/api.go +++ b/pkg/proxy/config/api.go @@ -21,223 +21,41 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" - "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" - "github.com/golang/glog" ) -// TODO: to use Reflector, need to change the ServicesWatcher to a generic ListerWatcher. -// ServicesWatcher is capable of listing and watching for changes to services across ALL namespaces -type ServicesWatcher interface { - List(label labels.Selector) (*api.ServiceList, error) - Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) +// NewSourceAPIserver creates config source that watches for changes to the services and endpoints. +func NewSourceAPI(c *client.Client, period time.Duration, servicesChan chan<- ServiceUpdate, endpointsChan chan<- EndpointsUpdate) { + servicesLW := cache.NewListWatchFromClient(c, "services", api.NamespaceAll, fields.Everything()) + endpointsLW := cache.NewListWatchFromClient(c, "endpoints", api.NamespaceAll, fields.Everything()) + + newServicesSourceApiFromLW(servicesLW, period, servicesChan) + newEndpointsSourceApiFromLW(endpointsLW, period, endpointsChan) } -// EndpointsWatcher is capable of listing and watching for changes to endpoints across ALL namespaces -type EndpointsWatcher interface { - List(label labels.Selector) (*api.EndpointsList, error) - Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) -} - -// SourceAPI implements a configuration source for services and endpoints that -// uses the client watch API to efficiently detect changes. -type SourceAPI struct { - s servicesReflector - e endpointsReflector -} - -type servicesReflector struct { - watcher ServicesWatcher - services chan<- ServiceUpdate - resourceVersion string - waitDuration time.Duration - reconnectDuration time.Duration -} - -type endpointsReflector struct { - watcher EndpointsWatcher - endpoints chan<- EndpointsUpdate - resourceVersion string - waitDuration time.Duration - reconnectDuration time.Duration -} - -// NewSourceAPI creates a config source that watches for changes to the services and endpoints. -func NewSourceAPI(servicesWatcher ServicesWatcher, endpointsWatcher EndpointsWatcher, period time.Duration, services chan<- ServiceUpdate, endpoints chan<- EndpointsUpdate) *SourceAPI { - config := &SourceAPI{ - s: servicesReflector{ - watcher: servicesWatcher, - services: services, - resourceVersion: "", - waitDuration: period, - // prevent hot loops if the server starts to misbehave - reconnectDuration: time.Second * 1, - }, - e: endpointsReflector{ - watcher: endpointsWatcher, - endpoints: endpoints, - resourceVersion: "", - waitDuration: period, - // prevent hot loops if the server starts to misbehave - reconnectDuration: time.Second * 1, - }, - } - go util.Forever(func() { config.s.listAndWatch() }, period) - go util.Forever(func() { config.e.listAndWatch() }, period) - return config -} - -func (r *servicesReflector) listAndWatch() { - r.run(&r.resourceVersion) - time.Sleep(wait.Jitter(r.reconnectDuration, 0.0)) -} - -func (r *endpointsReflector) listAndWatch() { - r.run(&r.resourceVersion) - time.Sleep(wait.Jitter(r.reconnectDuration, 0.0)) -} - -// run loops forever looking for changes to services. -func (s *servicesReflector) run(resourceVersion *string) { - if len(*resourceVersion) == 0 { - services, err := s.watcher.List(labels.Everything()) - if err != nil { - glog.Errorf("Unable to load services: %v", err) - // TODO: reconcile with pkg/client/cache which doesn't use reflector. - time.Sleep(wait.Jitter(s.waitDuration, 0.0)) - return +func newServicesSourceApiFromLW(servicesLW cache.ListerWatcher, period time.Duration, servicesChan chan<- ServiceUpdate) { + servicesPush := func(objs []interface{}) { + var services []api.Service + for _, o := range objs { + services = append(services, *(o.(*api.Service))) } - *resourceVersion = services.ResourceVersion - // TODO: replace with code to update the - s.services <- ServiceUpdate{Op: SET, Services: services.Items} + servicesChan <- ServiceUpdate{Op: SET, Services: services} } - watcher, err := s.watcher.Watch(labels.Everything(), fields.Everything(), *resourceVersion) - if err != nil { - glog.Errorf("Unable to watch for services changes: %v", err) - if !client.IsTimeout(err) { - // Reset so that we do a fresh get request - *resourceVersion = "" - } - time.Sleep(wait.Jitter(s.waitDuration, 0.0)) - return - } - defer watcher.Stop() - - ch := watcher.ResultChan() - s.watchHandler(resourceVersion, ch, s.services) + serviceQueue := cache.NewUndeltaStore(servicesPush, cache.MetaNamespaceKeyFunc) + cache.NewReflector(servicesLW, &api.Service{}, serviceQueue, period).Run() } -// watchHandler loops over an event channel and delivers config changes to an update channel. -func (s *servicesReflector) watchHandler(resourceVersion *string, ch <-chan watch.Event, updates chan<- ServiceUpdate) { - for { - select { - case event, ok := <-ch: - if !ok { - glog.V(4).Infof("WatchServices channel closed") - return - } - - if event.Object == nil { - glog.Errorf("Got nil over WatchServices channel") - return - } - var service *api.Service - switch obj := event.Object.(type) { - case *api.Service: - service = obj - case *api.Status: - glog.Warningf("Got error status on WatchServices channel: %+v", obj) - *resourceVersion = "" - return - default: - glog.Errorf("Got unexpected object over WatchServices channel: %+v", obj) - *resourceVersion = "" - return - } - - *resourceVersion = service.ResourceVersion - - switch event.Type { - case watch.Added, watch.Modified: - updates <- ServiceUpdate{Op: ADD, Services: []api.Service{*service}} - - case watch.Deleted: - updates <- ServiceUpdate{Op: REMOVE, Services: []api.Service{*service}} - } - } - } -} - -// run loops forever looking for changes to endpoints. -func (s *endpointsReflector) run(resourceVersion *string) { - if len(*resourceVersion) == 0 { - endpoints, err := s.watcher.List(labels.Everything()) - if err != nil { - glog.Errorf("Unable to load endpoints: %v", err) - time.Sleep(wait.Jitter(s.waitDuration, 0.0)) - return - } - *resourceVersion = endpoints.ResourceVersion - s.endpoints <- EndpointsUpdate{Op: SET, Endpoints: endpoints.Items} - } - - watcher, err := s.watcher.Watch(labels.Everything(), fields.Everything(), *resourceVersion) - if err != nil { - glog.Errorf("Unable to watch for endpoints changes: %v", err) - if !client.IsTimeout(err) { - // Reset so that we do a fresh get request - *resourceVersion = "" - } - - time.Sleep(wait.Jitter(s.waitDuration, 0.0)) - return - } - defer watcher.Stop() - - ch := watcher.ResultChan() - s.watchHandler(resourceVersion, ch, s.endpoints) -} - -// watchHandler loops over an event channel and delivers config changes to an update channel. -func (s *endpointsReflector) watchHandler(resourceVersion *string, ch <-chan watch.Event, updates chan<- EndpointsUpdate) { - for { - select { - case event, ok := <-ch: - if !ok { - glog.V(4).Infof("WatchEndpoints channel closed") - return - } - - if event.Object == nil { - glog.Errorf("Got nil over WatchEndpoints channel") - return - } - var endpoints *api.Endpoints - switch obj := event.Object.(type) { - case *api.Endpoints: - endpoints = obj - case *api.Status: - glog.Warningf("Got error status on WatchEndpoints channel: %+v", obj) - *resourceVersion = "" - return - default: - glog.Errorf("Got unexpected object over WatchEndpoints channel: %+v", obj) - *resourceVersion = "" - return - } - *resourceVersion = endpoints.ResourceVersion - - switch event.Type { - case watch.Added, watch.Modified: - updates <- EndpointsUpdate{Op: ADD, Endpoints: []api.Endpoints{*endpoints}} - - case watch.Deleted: - updates <- EndpointsUpdate{Op: REMOVE, Endpoints: []api.Endpoints{*endpoints}} - } +func newEndpointsSourceApiFromLW(endpointsLW cache.ListerWatcher, period time.Duration, endpointsChan chan<- EndpointsUpdate) { + endpointsPush := func(objs []interface{}) { + var endpoints []api.Endpoints + for _, o := range objs { + endpoints = append(endpoints, *(o.(*api.Endpoints))) } + endpointsChan <- EndpointsUpdate{Op: SET, Endpoints: endpoints} } + + endpointQueue := cache.NewUndeltaStore(endpointsPush, cache.MetaNamespaceKeyFunc) + cache.NewReflector(endpointsLW, &api.Endpoints{}, endpointQueue, period).Run() } diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go index 1a0c376c8f4..dbaccd90fab 100644 --- a/pkg/proxy/config/api_test.go +++ b/pkg/proxy/config/api_test.go @@ -17,335 +17,229 @@ limitations under the License. package config import ( - "errors" - "reflect" "testing" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) -func TestServices(t *testing.T) { - service := api.Service{ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"}} +type fakeLW struct { + listResp runtime.Object + watchResp watch.Interface +} +func (lw fakeLW) List() (runtime.Object, error) { + return lw.listResp, nil +} + +func (lw fakeLW) Watch(resourceVersion string) (watch.Interface, error) { + return lw.watchResp, nil +} + +var _ cache.ListerWatcher = fakeLW{} + +func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) { + service1v1 := &api.Service{ + ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "s1"}, + Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}}} + service1v2 := &api.Service{ + ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "s1"}, + Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}}} + service2 := &api.Service{ + ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "s2"}, + Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 30}}}} + + // Setup fake api client. fakeWatch := watch.NewFake() - fakeClient := &testclient.Fake{Watch: fakeWatch} - services := make(chan ServiceUpdate) - source := SourceAPI{ - s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll), services: services}, - e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll)}} - resourceVersion := "1" - go func() { - // called twice - source.s.run(&resourceVersion) - source.s.run(&resourceVersion) - }() - - // test adding a service to the watch - fakeWatch.Add(&service) - if !reflect.DeepEqual(fakeClient.Actions, []testclient.FakeAction{{"watch-services", "1"}}) { - t.Errorf("expected call to watch-services, got %#v", fakeClient) + lw := fakeLW{ + listResp: &api.ServiceList{Items: []api.Service{}}, + watchResp: fakeWatch, } - actual := <-services - expected := ServiceUpdate{Op: ADD, Services: []api.Service{service}} - if !reflect.DeepEqual(expected, actual) { - t.Errorf("expected %#v, got %#v", expected, actual) + ch := make(chan ServiceUpdate) + + newServicesSourceApiFromLW(lw, 30*time.Second, ch) + + got, ok := <-ch + if !ok { + t.Errorf("Unable to read from channel when expected") + } + expected := ServiceUpdate{Op: SET, Services: []api.Service{}} + if !api.Semantic.DeepEqual(expected, got) { + t.Errorf("Expected %#v; Got %#v", expected, got) } - // verify that a delete results in a config change - fakeWatch.Delete(&service) - actual = <-services - expected = ServiceUpdate{Op: REMOVE, Services: []api.Service{service}} - if !reflect.DeepEqual(expected, actual) { - t.Errorf("expected %#v, got %#v", expected, actual) + // Add the first service + fakeWatch.Add(service1v1) + got, ok = <-ch + if !ok { + t.Errorf("Unable to read from channel when expected") + } + expected = ServiceUpdate{Op: SET, Services: []api.Service{*service1v1}} + if !api.Semantic.DeepEqual(expected, got) { + t.Errorf("Expected %#v; Got %#v", expected, got) } - // verify that closing the channel results in a new call to WatchServices with a higher resource version - newFakeWatch := watch.NewFake() - fakeClient.Watch = newFakeWatch - fakeWatch.Stop() + // Add another service + fakeWatch.Add(service2) + got, ok = <-ch + if !ok { + t.Errorf("Unable to read from channel when expected") + } + // Could be sorted either of these two ways: + expectedA := ServiceUpdate{Op: SET, Services: []api.Service{*service1v1, *service2}} + expectedB := ServiceUpdate{Op: SET, Services: []api.Service{*service2, *service1v1}} - newFakeWatch.Add(&service) - if !reflect.DeepEqual(fakeClient.Actions, []testclient.FakeAction{{"watch-services", "1"}, {"watch-services", "2"}}) { - t.Errorf("expected call to watch-endpoints, got %#v", fakeClient) + if !api.Semantic.DeepEqual(expectedA, got) && !api.Semantic.DeepEqual(expectedB, got) { + t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, got) + } + + // Modify service1 + fakeWatch.Modify(service1v2) + got, ok = <-ch + if !ok { + t.Errorf("Unable to read from channel when expected") + } + expectedA = ServiceUpdate{Op: SET, Services: []api.Service{*service1v2, *service2}} + expectedB = ServiceUpdate{Op: SET, Services: []api.Service{*service2, *service1v2}} + + if !api.Semantic.DeepEqual(expectedA, got) && !api.Semantic.DeepEqual(expectedB, got) { + t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, got) + } + + // Delete service1 + fakeWatch.Delete(service1v2) + got, ok = <-ch + if !ok { + t.Errorf("Unable to read from channel when expected") + } + expected = ServiceUpdate{Op: SET, Services: []api.Service{*service2}} + if !api.Semantic.DeepEqual(expected, got) { + t.Errorf("Expected %#v, Got %#v", expected, got) + } + + // Delete service2 + fakeWatch.Delete(service2) + got, ok = <-ch + if !ok { + t.Errorf("Unable to read from channel when expected") + } + expected = ServiceUpdate{Op: SET, Services: []api.Service{}} + if !api.Semantic.DeepEqual(expected, got) { + t.Errorf("Expected %#v, Got %#v", expected, got) } } -func TestServicesFromZero(t *testing.T) { - service := api.Service{ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"}} - - fakeWatch := watch.NewFake() - fakeWatch.Stop() - fakeClient := testclient.NewSimpleFake(&api.ServiceList{ - ListMeta: api.ListMeta{ResourceVersion: "2"}, - Items: []api.Service{ - service, - }, - }) - fakeClient.Watch = fakeWatch - services := make(chan ServiceUpdate) - source := SourceAPI{ - s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll), services: services}, - e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll)}} - resourceVersion := "" - ch := make(chan struct{}) - go func() { - source.s.run(&resourceVersion) - close(ch) - }() - - // should get services SET - actual := <-services - expected := ServiceUpdate{Op: SET, Services: []api.Service{service}} - if !reflect.DeepEqual(expected, actual) { - t.Errorf("expected %#v, got %#v", expected, actual) - } - - // should have listed, then watched - <-ch - if resourceVersion != "2" { - t.Errorf("unexpected resource version, got %#v", resourceVersion) - } - if !reflect.DeepEqual(fakeClient.Actions, []testclient.FakeAction{{"list-services", nil}, {"watch-services", "2"}}) { - t.Errorf("unexpected actions, got %#v", fakeClient) - } -} - -func TestServicesError(t *testing.T) { - fakeClient := &testclient.Fake{Err: errors.New("test")} - services := make(chan ServiceUpdate) - source := SourceAPI{ - s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll), services: services}, - e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll)}} - resourceVersion := "1" - ch := make(chan struct{}) - go func() { - source.s.run(&resourceVersion) - close(ch) - }() - - // should have listed only - <-ch - if resourceVersion != "" { - t.Errorf("unexpected resource version, got %#v", resourceVersion) - } - if !reflect.DeepEqual(fakeClient.Actions, []testclient.FakeAction{{"watch-services", "1"}}) { - t.Errorf("unexpected actions, got %#v", fakeClient) - } -} - -func TestServicesErrorTimeout(t *testing.T) { - fakeClient := &testclient.Fake{Err: errors.New("use of closed network connection")} - services := make(chan ServiceUpdate) - source := SourceAPI{ - s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll), services: services}, - e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll)}} - resourceVersion := "1" - ch := make(chan struct{}) - go func() { - source.s.run(&resourceVersion) - close(ch) - }() - - // should have listed only - <-ch - if resourceVersion != "1" { - t.Errorf("unexpected resource version, got %#v", resourceVersion) - } - if !reflect.DeepEqual(fakeClient.Actions, []testclient.FakeAction{{"watch-services", "1"}}) { - t.Errorf("unexpected actions, got %#v", fakeClient) - } -} - -func TestServicesFromZeroError(t *testing.T) { - fakeClient := &testclient.Fake{Err: errors.New("test")} - services := make(chan ServiceUpdate) - source := SourceAPI{ - s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll), services: services}, - e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll)}} - resourceVersion := "" - ch := make(chan struct{}) - go func() { - source.s.run(&resourceVersion) - close(ch) - }() - - // should have listed only - <-ch - if resourceVersion != "" { - t.Errorf("unexpected resource version, got %#v", resourceVersion) - } - if !reflect.DeepEqual(fakeClient.Actions, []testclient.FakeAction{{"list-services", nil}}) { - t.Errorf("unexpected actions, got %#v", fakeClient) - } -} - -func TestEndpoints(t *testing.T) { - endpoint := api.Endpoints{ - ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"}, +func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) { + endpoints1v1 := &api.Endpoints{ + ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "e1"}, Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Port: 9000}}, + Addresses: []api.EndpointAddress{ + {IP: "1.2.3.4"}, + }, + Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + }}, + } + endpoints1v2 := &api.Endpoints{ + ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "e1"}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{ + {IP: "1.2.3.4"}, + {IP: "4.3.2.1"}, + }, + Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + }}, + } + endpoints2 := &api.Endpoints{ + ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "e2"}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{ + {IP: "5.6.7.8"}, + }, + Ports: []api.EndpointPort{{Port: 80, Protocol: "TCP"}}, }}, } + // Setup fake api client. fakeWatch := watch.NewFake() - fakeClient := &testclient.Fake{Watch: fakeWatch} - endpoints := make(chan EndpointsUpdate) - source := SourceAPI{ - s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll)}, - e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}} - resourceVersion := "1" - go func() { - // called twice - source.e.run(&resourceVersion) - source.e.run(&resourceVersion) - }() - - // test adding an endpoint to the watch - fakeWatch.Add(&endpoint) - if !reflect.DeepEqual(fakeClient.Actions, []testclient.FakeAction{{"watch-endpoints", "1"}}) { - t.Errorf("expected call to watch-endpoints, got %#v", fakeClient) + lw := fakeLW{ + listResp: &api.EndpointsList{Items: []api.Endpoints{}}, + watchResp: fakeWatch, } - actual := <-endpoints - expected := EndpointsUpdate{Op: ADD, Endpoints: []api.Endpoints{endpoint}} - if !reflect.DeepEqual(expected, actual) { - t.Errorf("expected %#v, got %#v", expected, actual) + ch := make(chan EndpointsUpdate) + + newEndpointsSourceApiFromLW(lw, 30*time.Second, ch) + + got, ok := <-ch + if !ok { + t.Errorf("Unable to read from channel when expected") + } + expected := EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{}} + if !api.Semantic.DeepEqual(expected, got) { + t.Errorf("Expected %#v; Got %#v", expected, got) } - // verify that a delete results in a config change - fakeWatch.Delete(&endpoint) - actual = <-endpoints - expected = EndpointsUpdate{Op: REMOVE, Endpoints: []api.Endpoints{endpoint}} - if !reflect.DeepEqual(expected, actual) { - t.Errorf("expected %#v, got %#v", expected, actual) + // Add the first endpoints + fakeWatch.Add(endpoints1v1) + got, ok = <-ch + if !ok { + t.Errorf("Unable to read from channel when expected") + } + expected = EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints1v1}} + if !api.Semantic.DeepEqual(expected, got) { + t.Errorf("Expected %#v; Got %#v", expected, got) } - // verify that closing the channel results in a new call to WatchEndpoints with a higher resource version - newFakeWatch := watch.NewFake() - fakeClient.Watch = newFakeWatch - fakeWatch.Stop() + // Add another endpoints + fakeWatch.Add(endpoints2) + got, ok = <-ch + if !ok { + t.Errorf("Unable to read from channel when expected") + } + // Could be sorted either of these two ways: + expectedA := EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints1v1, *endpoints2}} + expectedB := EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints2, *endpoints1v1}} - newFakeWatch.Add(&endpoint) - if !reflect.DeepEqual(fakeClient.Actions, []testclient.FakeAction{{"watch-endpoints", "1"}, {"watch-endpoints", "2"}}) { - t.Errorf("expected call to watch-endpoints, got %#v", fakeClient) - } -} - -func TestEndpointsFromZero(t *testing.T) { - endpoint := api.Endpoints{ - ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Port: 9000}}, - }}, - } - - fakeWatch := watch.NewFake() - fakeWatch.Stop() - fakeClient := testclient.NewSimpleFake(&api.EndpointsList{ - ListMeta: api.ListMeta{ResourceVersion: "2"}, - Items: []api.Endpoints{ - endpoint, - }, - }) - fakeClient.Watch = fakeWatch - endpoints := make(chan EndpointsUpdate) - source := SourceAPI{ - s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll)}, - e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}} - resourceVersion := "" - ch := make(chan struct{}) - go func() { - source.e.run(&resourceVersion) - close(ch) - }() - - // should get endpoints SET - actual := <-endpoints - expected := EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{endpoint}} - if !reflect.DeepEqual(expected, actual) { - t.Errorf("expected %#v, got %#v", expected, actual) - } - - // should have listed, then watched - <-ch - if resourceVersion != "2" { - t.Errorf("unexpected resource version, got %#v", resourceVersion) - } - if !reflect.DeepEqual(fakeClient.Actions, []testclient.FakeAction{{"list-endpoints", nil}, {"watch-endpoints", "2"}}) { - t.Errorf("unexpected actions, got %#v", fakeClient) - } -} - -func TestEndpointsError(t *testing.T) { - fakeClient := &testclient.Fake{Err: errors.New("test")} - endpoints := make(chan EndpointsUpdate) - source := SourceAPI{ - s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll)}, - e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}} - resourceVersion := "1" - ch := make(chan struct{}) - go func() { - source.e.run(&resourceVersion) - close(ch) - }() - - // should have listed only - <-ch - if resourceVersion != "" { - t.Errorf("unexpected resource version, got %#v", resourceVersion) - } - if !reflect.DeepEqual(fakeClient.Actions, []testclient.FakeAction{{"watch-endpoints", "1"}}) { - t.Errorf("unexpected actions, got %#v", fakeClient) - } -} - -func TestEndpointsErrorTimeout(t *testing.T) { - fakeClient := &testclient.Fake{Err: errors.New("use of closed network connection")} - endpoints := make(chan EndpointsUpdate) - source := SourceAPI{ - s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll)}, - e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}} - resourceVersion := "1" - ch := make(chan struct{}) - go func() { - source.e.run(&resourceVersion) - close(ch) - }() - - // should have listed only - <-ch - if resourceVersion != "1" { - t.Errorf("unexpected resource version, got %#v", resourceVersion) - } - if !reflect.DeepEqual(fakeClient.Actions, []testclient.FakeAction{{"watch-endpoints", "1"}}) { - t.Errorf("unexpected actions, got %#v", fakeClient) - } -} - -func TestEndpointsFromZeroError(t *testing.T) { - fakeClient := &testclient.Fake{Err: errors.New("test")} - endpoints := make(chan EndpointsUpdate) - source := SourceAPI{ - s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll)}, - e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}} - resourceVersion := "" - ch := make(chan struct{}) - go func() { - source.e.run(&resourceVersion) - close(ch) - }() - - // should have listed only - <-ch - if resourceVersion != "" { - t.Errorf("unexpected resource version, got %#v", resourceVersion) - } - if !reflect.DeepEqual(fakeClient.Actions, []testclient.FakeAction{{"list-endpoints", nil}}) { - t.Errorf("unexpected actions, got %#v", fakeClient) + if !api.Semantic.DeepEqual(expectedA, got) && !api.Semantic.DeepEqual(expectedB, got) { + t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, got) + } + + // Modify endpoints1 + fakeWatch.Modify(endpoints1v2) + got, ok = <-ch + if !ok { + t.Errorf("Unable to read from channel when expected") + } + expectedA = EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints1v2, *endpoints2}} + expectedB = EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints2, *endpoints1v2}} + + if !api.Semantic.DeepEqual(expectedA, got) && !api.Semantic.DeepEqual(expectedB, got) { + t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, got) + } + + // Delete endpoints1 + fakeWatch.Delete(endpoints1v2) + got, ok = <-ch + if !ok { + t.Errorf("Unable to read from channel when expected") + } + expected = EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints2}} + if !api.Semantic.DeepEqual(expected, got) { + t.Errorf("Expected %#v, Got %#v", expected, got) + } + + // Delete endpoints2 + fakeWatch.Delete(endpoints2) + got, ok = <-ch + if !ok { + t.Errorf("Unable to read from channel when expected") + } + expected = EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{}} + if !api.Semantic.DeepEqual(expected, got) { + t.Errorf("Expected %#v, Got %#v", expected, got) } }