From 01e668187cc728e73b09286839ee3435c41e43bc Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Thu, 28 Aug 2014 22:31:41 -0400 Subject: [PATCH] Services and Endpoints weren't properly sync'ing They need incremental changes and a resync on start. --- pkg/api/register.go | 1 + pkg/api/types.go | 6 + pkg/api/v1beta1/register.go | 1 + pkg/api/v1beta1/types.go | 6 + pkg/client/client.go | 16 ++- pkg/client/fake.go | 25 +++- pkg/proxy/config/api.go | 32 ++++- pkg/proxy/config/api_test.go | 181 ++++++++++++++++++++++++-- pkg/registry/endpoint/registry.go | 1 + pkg/registry/endpoint/storage.go | 9 +- pkg/registry/endpoint/storage_test.go | 27 ++++ pkg/registry/etcd/etcd.go | 7 + pkg/registry/etcd/etcd_test.go | 31 ++++- pkg/registry/registrytest/service.go | 13 +- 14 files changed, 326 insertions(+), 30 deletions(-) diff --git a/pkg/api/register.go b/pkg/api/register.go index 2b62442e32c..54bd719bddb 100644 --- a/pkg/api/register.go +++ b/pkg/api/register.go @@ -35,6 +35,7 @@ func init() { ServerOp{}, ContainerManifestList{}, Endpoints{}, + EndpointsList{}, Binding{}, ) } diff --git a/pkg/api/types.go b/pkg/api/types.go index 8073713c88a..2dd6c06b66d 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -353,6 +353,12 @@ type Endpoints struct { Endpoints []string `json:"endpoints,omitempty" yaml:"endpoints,omitempty"` } +// EndpointsList is a list of endpoints. +type EndpointsList struct { + JSONBase `json:",inline" yaml:",inline"` + Items []Endpoints `json:"items,omitempty" yaml:"items,omitempty"` +} + // Minion is a worker node in Kubernetenes. // The name of the minion according to etcd is in JSONBase.ID. type Minion struct { diff --git a/pkg/api/v1beta1/register.go b/pkg/api/v1beta1/register.go index ae3f2be40cf..c5768bac8aa 100644 --- a/pkg/api/v1beta1/register.go +++ b/pkg/api/v1beta1/register.go @@ -35,6 +35,7 @@ func init() { ServerOp{}, ContainerManifestList{}, Endpoints{}, + EndpointsList{}, Binding{}, ) } diff --git a/pkg/api/v1beta1/types.go b/pkg/api/v1beta1/types.go index 66d31670bf5..41278370acb 100644 --- a/pkg/api/v1beta1/types.go +++ b/pkg/api/v1beta1/types.go @@ -364,6 +364,12 @@ type Endpoints struct { Endpoints []string `json:"endpoints,omitempty" yaml:"endpoints,omitempty"` } +// EndpointsList is a list of endpoints. +type EndpointsList struct { + JSONBase `json:",inline" yaml:",inline"` + Items []Endpoints `json:"items,omitempty" yaml:"items,omitempty"` +} + // Minion is a worker node in Kubernetenes. // The name of the minion according to etcd is in JSONBase.ID. type Minion struct { diff --git a/pkg/client/client.go b/pkg/client/client.go index cfc0dfe997b..e9b377636cb 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -65,12 +65,17 @@ type ReplicationControllerInterface interface { // ServiceInterface has methods to work with Service resources. type ServiceInterface interface { + ListServices(selector labels.Selector) (api.ServiceList, error) GetService(id string) (api.Service, error) CreateService(api.Service) (api.Service, error) UpdateService(api.Service) (api.Service, error) DeleteService(string) error WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) +} +// EndpointsInterface has methods to work with Endpoints resources +type EndpointsInterface interface { + ListEndpoints(selector labels.Selector) (api.EndpointsList, error) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) } @@ -318,8 +323,9 @@ func (c *Client) WatchReplicationControllers(label, field labels.Selector, resou Watch() } -func (c *Client) ListServices(selector labels.Selector) (list api.ServiceList, err error) { - err = c.Get().Path("services").SelectorParam("labels", selector).Do().Into(&list) +// ListServices takes a selector, and returns the list of services that match that selector +func (c *Client) ListServices(selector labels.Selector) (result api.ServiceList, err error) { + err = c.Get().Path("services").SelectorParam("labels", selector).Do().Into(&result) return } @@ -361,6 +367,12 @@ func (c *Client) WatchServices(label, field labels.Selector, resourceVersion uin Watch() } +// ListEndpoints takes a selector, and returns the list of endpoints that match that selector +func (c *Client) ListEndpoints(selector labels.Selector) (result api.EndpointsList, err error) { + err = c.Get().Path("endpoints").SelectorParam("labels", selector).Do().Into(&result) + return +} + // WatchEndpoints returns a watch.Interface that watches the requested endpoints for a service. func (c *Client) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { return c.Get(). diff --git a/pkg/client/fake.go b/pkg/client/fake.go index 7ee2df2a6f1..136cb20acaf 100644 --- a/pkg/client/fake.go +++ b/pkg/client/fake.go @@ -32,10 +32,13 @@ type FakeAction struct { // implementation. This makes faking out just the method you want to test easier. type Fake struct { // Fake by default keeps a simple list of the methods that have been called. - Actions []FakeAction - Pods api.PodList - Ctrl api.ReplicationController - Watch watch.Interface + Actions []FakeAction + Pods api.PodList + Ctrl api.ReplicationController + ServiceList api.ServiceList + EndpointsList api.EndpointsList + Err error + Watch watch.Interface } func (c *Fake) ListPods(selector labels.Selector) (api.PodList, error) { @@ -93,6 +96,11 @@ func (c *Fake) WatchReplicationControllers(label, field labels.Selector, resourc return c.Watch, nil } +func (c *Fake) ListServices(selector labels.Selector) (api.ServiceList, error) { + c.Actions = append(c.Actions, FakeAction{Action: "list-services"}) + return c.ServiceList, c.Err +} + func (c *Fake) GetService(name string) (api.Service, error) { c.Actions = append(c.Actions, FakeAction{Action: "get-service", Value: name}) return api.Service{}, nil @@ -115,12 +123,17 @@ func (c *Fake) DeleteService(service string) error { func (c *Fake) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { c.Actions = append(c.Actions, FakeAction{Action: "watch-services", Value: resourceVersion}) - return c.Watch, nil + return c.Watch, c.Err +} + +func (c *Fake) ListEndpoints(selector labels.Selector) (api.EndpointsList, error) { + c.Actions = append(c.Actions, FakeAction{Action: "list-endpoints"}) + return c.EndpointsList, c.Err } func (c *Fake) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { c.Actions = append(c.Actions, FakeAction{Action: "watch-endpoints", Value: resourceVersion}) - return c.Watch, nil + return c.Watch, c.Err } func (c *Fake) ServerVersion() (*version.Info, error) { diff --git a/pkg/proxy/config/api.go b/pkg/proxy/config/api.go index 44873f3f0cc..7c411ad45d1 100644 --- a/pkg/proxy/config/api.go +++ b/pkg/proxy/config/api.go @@ -29,6 +29,8 @@ import ( // Watcher is the interface needed to receive changes to services and endpoints. type Watcher interface { + ListServices(label labels.Selector) (api.ServiceList, error) + ListEndpoints(label labels.Selector) (api.EndpointsList, error) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) } @@ -70,6 +72,17 @@ func NewSourceAPI(client Watcher, period time.Duration, services chan<- ServiceU // runServices loops forever looking for changes to services. func (s *SourceAPI) runServices(resourceVersion *uint64) { + if *resourceVersion == 0 { + services, err := s.client.ListServices(labels.Everything()) + if err != nil { + glog.Errorf("Unable to load services: %v", err) + time.Sleep(wait.Jitter(s.waitDuration, 0.0)) + return + } + *resourceVersion = services.ResourceVersion + s.services <- ServiceUpdate{Op: SET, Services: services.Items} + } + watcher, err := s.client.WatchServices(labels.Everything(), labels.Everything(), *resourceVersion) if err != nil { glog.Errorf("Unable to watch for services changes: %v", err) @@ -97,10 +110,10 @@ func handleServicesWatch(resourceVersion *uint64, ch <-chan watch.Event, updates switch event.Type { case watch.Added, watch.Modified: - updates <- ServiceUpdate{Op: SET, Services: []api.Service{*service}} + updates <- ServiceUpdate{Op: ADD, Services: []api.Service{*service}} case watch.Deleted: - updates <- ServiceUpdate{Op: SET} + updates <- ServiceUpdate{Op: REMOVE, Services: []api.Service{*service}} } } } @@ -108,6 +121,17 @@ func handleServicesWatch(resourceVersion *uint64, ch <-chan watch.Event, updates // runEndpoints loops forever looking for changes to endpoints. func (s *SourceAPI) runEndpoints(resourceVersion *uint64) { + if *resourceVersion == 0 { + endpoints, err := s.client.ListEndpoints(labels.Everything()) + if err != nil { + glog.Errorf("Unable to load endpoints: %v", err) + time.Sleep(wait.Jitter(s.waitDuration, 0.0)) + return + } + *resourceVersion = endpoints.ResourceVersion + s.endpoints <- EndpointsUpdate{Op: SET, Endpoints: endpoints.Items} + } + watcher, err := s.client.WatchEndpoints(labels.Everything(), labels.Everything(), *resourceVersion) if err != nil { glog.Errorf("Unable to watch for endpoints changes: %v", err) @@ -135,10 +159,10 @@ func handleEndpointsWatch(resourceVersion *uint64, ch <-chan watch.Event, update switch event.Type { case watch.Added, watch.Modified: - updates <- EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints}} + updates <- EndpointsUpdate{Op: ADD, Endpoints: []api.Endpoints{*endpoints}} case watch.Deleted: - updates <- EndpointsUpdate{Op: SET} + updates <- EndpointsUpdate{Op: REMOVE, Endpoints: []api.Endpoints{*endpoints}} } } } diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go index f08c6f0d2c3..1fcaa4ed7bc 100644 --- a/pkg/proxy/config/api_test.go +++ b/pkg/proxy/config/api_test.go @@ -17,6 +17,7 @@ limitations under the License. package config import ( + "errors" "reflect" "testing" @@ -32,7 +33,7 @@ func TestServices(t *testing.T) { fakeClient := &client.Fake{Watch: fakeWatch} services := make(chan ServiceUpdate) source := SourceAPI{client: fakeClient, services: services} - resourceVersion := uint64(0) + resourceVersion := uint64(1) go func() { // called twice source.runServices(&resourceVersion) @@ -41,12 +42,12 @@ func TestServices(t *testing.T) { // test adding a service to the watch fakeWatch.Add(&service) - if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", uint64(0)}}) { + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", uint64(1)}}) { t.Errorf("expected call to watch-services, got %#v", fakeClient) } actual := <-services - expected := ServiceUpdate{Op: SET, Services: []api.Service{service}} + expected := ServiceUpdate{Op: ADD, Services: []api.Service{service}} if !reflect.DeepEqual(expected, actual) { t.Errorf("expected %#v, got %#v", expected, actual) } @@ -54,7 +55,7 @@ func TestServices(t *testing.T) { // verify that a delete results in a config change fakeWatch.Delete(&service) actual = <-services - expected = ServiceUpdate{Op: SET} + expected = ServiceUpdate{Op: REMOVE, Services: []api.Service{service}} if !reflect.DeepEqual(expected, actual) { t.Errorf("expected %#v, got %#v", expected, actual) } @@ -65,11 +66,91 @@ func TestServices(t *testing.T) { fakeWatch.Stop() newFakeWatch.Add(&service) - if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", uint64(0)}, {"watch-services", uint64(3)}}) { + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", uint64(1)}, {"watch-services", uint64(3)}}) { t.Errorf("expected call to watch-endpoints, got %#v", fakeClient) } } +func TestServicesFromZero(t *testing.T) { + service := api.Service{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: uint64(2)}} + + fakeWatch := watch.NewFake() + fakeWatch.Stop() + fakeClient := &client.Fake{Watch: fakeWatch} + fakeClient.ServiceList = api.ServiceList{ + JSONBase: api.JSONBase{ResourceVersion: 2}, + Items: []api.Service{ + service, + }, + } + services := make(chan ServiceUpdate) + source := SourceAPI{client: fakeClient, services: services} + resourceVersion := uint64(0) + ch := make(chan struct{}) + go func() { + source.runServices(&resourceVersion) + close(ch) + }() + + // should get services SET + actual := <-services + expected := ServiceUpdate{Op: SET, Services: []api.Service{service}} + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %#v, got %#v", expected, actual) + } + + // should have listed, then watched + <-ch + if resourceVersion != 2 { + t.Errorf("unexpected resource version, got %#v", resourceVersion) + } + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"list-services", nil}, {"watch-services", uint64(2)}}) { + t.Errorf("unexpected actions, got %#v", fakeClient) + } +} + +func TestServicesError(t *testing.T) { + fakeClient := &client.Fake{Err: errors.New("test")} + services := make(chan ServiceUpdate) + source := SourceAPI{client: fakeClient, services: services} + resourceVersion := uint64(1) + ch := make(chan struct{}) + go func() { + source.runServices(&resourceVersion) + close(ch) + }() + + // should have listed only + <-ch + if resourceVersion != 1 { + t.Errorf("unexpected resource version, got %#v", resourceVersion) + } + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", uint64(1)}}) { + t.Errorf("unexpected actions, got %#v", fakeClient) + } +} + +func TestServicesFromZeroError(t *testing.T) { + fakeClient := &client.Fake{Err: errors.New("test")} + services := make(chan ServiceUpdate) + source := SourceAPI{client: fakeClient, services: services} + resourceVersion := uint64(0) + ch := make(chan struct{}) + go func() { + source.runServices(&resourceVersion) + close(ch) + }() + + // should have listed only + <-ch + if resourceVersion != 0 { + t.Errorf("unexpected resource version, got %#v", resourceVersion) + } + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"list-services", nil}}) { + t.Errorf("unexpected actions, got %#v", fakeClient) + } +} + func TestEndpoints(t *testing.T) { endpoint := api.Endpoints{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: uint64(2)}, Endpoints: []string{"127.0.0.1:9000"}} @@ -77,7 +158,7 @@ func TestEndpoints(t *testing.T) { fakeClient := &client.Fake{Watch: fakeWatch} endpoints := make(chan EndpointsUpdate) source := SourceAPI{client: fakeClient, endpoints: endpoints} - resourceVersion := uint64(0) + resourceVersion := uint64(1) go func() { // called twice source.runEndpoints(&resourceVersion) @@ -86,12 +167,12 @@ func TestEndpoints(t *testing.T) { // test adding an endpoint to the watch fakeWatch.Add(&endpoint) - if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", uint64(0)}}) { + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", uint64(1)}}) { t.Errorf("expected call to watch-endpoints, got %#v", fakeClient) } actual := <-endpoints - expected := EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{endpoint}} + expected := EndpointsUpdate{Op: ADD, Endpoints: []api.Endpoints{endpoint}} if !reflect.DeepEqual(expected, actual) { t.Errorf("expected %#v, got %#v", expected, actual) } @@ -99,7 +180,7 @@ func TestEndpoints(t *testing.T) { // verify that a delete results in a config change fakeWatch.Delete(&endpoint) actual = <-endpoints - expected = EndpointsUpdate{Op: SET} + expected = EndpointsUpdate{Op: REMOVE, Endpoints: []api.Endpoints{endpoint}} if !reflect.DeepEqual(expected, actual) { t.Errorf("expected %#v, got %#v", expected, actual) } @@ -110,7 +191,87 @@ func TestEndpoints(t *testing.T) { fakeWatch.Stop() newFakeWatch.Add(&endpoint) - if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", uint64(0)}, {"watch-endpoints", uint64(3)}}) { + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", uint64(1)}, {"watch-endpoints", uint64(3)}}) { t.Errorf("expected call to watch-endpoints, got %#v", fakeClient) } } + +func TestEndpointsFromZero(t *testing.T) { + endpoint := api.Endpoints{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: uint64(2)}, Endpoints: []string{"127.0.0.1:9000"}} + + fakeWatch := watch.NewFake() + fakeWatch.Stop() + fakeClient := &client.Fake{Watch: fakeWatch} + fakeClient.EndpointsList = api.EndpointsList{ + JSONBase: api.JSONBase{ResourceVersion: 2}, + Items: []api.Endpoints{ + endpoint, + }, + } + endpoints := make(chan EndpointsUpdate) + source := SourceAPI{client: fakeClient, endpoints: endpoints} + resourceVersion := uint64(0) + ch := make(chan struct{}) + go func() { + source.runEndpoints(&resourceVersion) + close(ch) + }() + + // should get endpoints SET + actual := <-endpoints + expected := EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{endpoint}} + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %#v, got %#v", expected, actual) + } + + // should have listed, then watched + <-ch + if resourceVersion != 2 { + t.Errorf("unexpected resource version, got %#v", resourceVersion) + } + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"list-endpoints", nil}, {"watch-endpoints", uint64(2)}}) { + t.Errorf("unexpected actions, got %#v", fakeClient) + } +} + +func TestEndpointsError(t *testing.T) { + fakeClient := &client.Fake{Err: errors.New("test")} + endpoints := make(chan EndpointsUpdate) + source := SourceAPI{client: fakeClient, endpoints: endpoints} + resourceVersion := uint64(1) + ch := make(chan struct{}) + go func() { + source.runEndpoints(&resourceVersion) + close(ch) + }() + + // should have listed only + <-ch + if resourceVersion != 1 { + t.Errorf("unexpected resource version, got %#v", resourceVersion) + } + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", uint64(1)}}) { + t.Errorf("unexpected actions, got %#v", fakeClient) + } +} + +func TestEndpointsFromZeroError(t *testing.T) { + fakeClient := &client.Fake{Err: errors.New("test")} + endpoints := make(chan EndpointsUpdate) + source := SourceAPI{client: fakeClient, endpoints: endpoints} + resourceVersion := uint64(0) + ch := make(chan struct{}) + go func() { + source.runEndpoints(&resourceVersion) + close(ch) + }() + + // should have listed only + <-ch + if resourceVersion != 0 { + t.Errorf("unexpected resource version, got %#v", resourceVersion) + } + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"list-endpoints", nil}}) { + t.Errorf("unexpected actions, got %#v", fakeClient) + } +} diff --git a/pkg/registry/endpoint/registry.go b/pkg/registry/endpoint/registry.go index 5384563a48f..1cd5f2d5488 100644 --- a/pkg/registry/endpoint/registry.go +++ b/pkg/registry/endpoint/registry.go @@ -24,6 +24,7 @@ import ( // Registry is an interface for things that know how to store endpoints. type Registry interface { + ListEndpoints() (*api.EndpointsList, error) GetEndpoints(name string) (*api.Endpoints, error) WatchEndpoints(labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error) UpdateEndpoints(e api.Endpoints) error diff --git a/pkg/registry/endpoint/storage.go b/pkg/registry/endpoint/storage.go index 2f710ef997f..b0e4104b9df 100644 --- a/pkg/registry/endpoint/storage.go +++ b/pkg/registry/endpoint/storage.go @@ -37,14 +37,17 @@ func NewStorage(registry Registry) apiserver.RESTStorage { } } -// Get satisfies the RESTStorage interface but is unimplemented. +// Get satisfies the RESTStorage interface. func (rs *Storage) Get(id string) (interface{}, error) { return rs.registry.GetEndpoints(id) } -// List satisfies the RESTStorage interface but is unimplemented. +// List satisfies the RESTStorage interface. func (rs *Storage) List(selector labels.Selector) (interface{}, error) { - return nil, errors.New("unimplemented") + if !selector.Empty() { + return nil, errors.New("label selectors are not supported on endpoints") + } + return rs.registry.ListEndpoints() } // Watch returns Endpoint events via a watch.Interface. diff --git a/pkg/registry/endpoint/storage_test.go b/pkg/registry/endpoint/storage_test.go index 105c26f3008..e57c8e25958 100644 --- a/pkg/registry/endpoint/storage_test.go +++ b/pkg/registry/endpoint/storage_test.go @@ -22,6 +22,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" ) @@ -67,3 +68,29 @@ func TestGetEndpointsMissingService(t *testing.T) { t.Errorf("unexpected endpoints: %#v", obj) } } + +func TestEndpointsRegistryList(t *testing.T) { + registry := registrytest.NewServiceRegistry() + storage := NewStorage(registry) + registry.EndpointsList = api.EndpointsList{ + JSONBase: api.JSONBase{ResourceVersion: 1}, + Items: []api.Endpoints{ + {JSONBase: api.JSONBase{ID: "foo"}}, + {JSONBase: api.JSONBase{ID: "bar"}}, + }, + } + s, _ := storage.List(labels.Everything()) + sl := s.(*api.EndpointsList) + if len(sl.Items) != 2 { + t.Fatalf("Expected 2 endpoints, but got %v", len(sl.Items)) + } + if e, a := "foo", sl.Items[0].ID; e != a { + t.Errorf("Expected %v, but got %v", e, a) + } + if e, a := "bar", sl.Items[1].ID; e != a { + t.Errorf("Expected %v, but got %v", e, a) + } + if sl.ResourceVersion != 1 { + t.Errorf("Unexpected resource version: %#v", sl) + } +} diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 1be8233b9cd..b458e73f6cd 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -370,6 +370,13 @@ func (r *Registry) WatchServices(label, field labels.Selector, resourceVersion u return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported") } +// ListEndpoints obtains a list of Services. +func (r *Registry) ListEndpoints() (*api.EndpointsList, error) { + list := &api.EndpointsList{} + err := r.ExtractList("/registry/services/endpoints", &list.Items, &list.ResourceVersion) + return list, err +} + // UpdateEndpoints update Endpoints of a Service. func (r *Registry) UpdateEndpoints(e api.Endpoints) error { // TODO: this is a really bad misuse of AtomicUpdate, need to compute a diff inside the loop. diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index b6951ed969b..2ad142c5081 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -679,7 +679,7 @@ func TestEtcdListServices(t *testing.T) { } if len(services.Items) != 2 || services.Items[0].ID != "foo" || services.Items[1].ID != "bar" { - t.Errorf("Unexpected pod list: %#v", services) + t.Errorf("Unexpected service list: %#v", services) } } @@ -804,6 +804,35 @@ func TestEtcdUpdateService(t *testing.T) { } } +func TestEtcdListEndpoints(t *testing.T) { + fakeClient := tools.NewFakeEtcdClient(t) + key := "/registry/services/endpoints" + fakeClient.Data[key] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Nodes: []*etcd.Node{ + { + Value: runtime.EncodeOrDie(api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{"127.0.0.1:8345"}}), + }, + { + Value: runtime.EncodeOrDie(api.Endpoints{JSONBase: api.JSONBase{ID: "bar"}}), + }, + }, + }, + }, + E: nil, + } + registry := NewTestEtcdRegistry(fakeClient) + services, err := registry.ListEndpoints() + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + if len(services.Items) != 2 || services.Items[0].ID != "foo" || services.Items[1].ID != "bar" { + t.Errorf("Unexpected endpoints list: %#v", services) + } +} + func TestEtcdGetEndpoints(t *testing.T) { fakeClient := tools.NewFakeEtcdClient(t) registry := NewTestEtcdRegistry(fakeClient) diff --git a/pkg/registry/registrytest/service.go b/pkg/registry/registrytest/service.go index e82d1033f16..d53fc9b62f7 100644 --- a/pkg/registry/registrytest/service.go +++ b/pkg/registry/registrytest/service.go @@ -27,10 +27,11 @@ func NewServiceRegistry() *ServiceRegistry { } type ServiceRegistry struct { - List api.ServiceList - Service *api.Service - Err error - Endpoints api.Endpoints + List api.ServiceList + Service *api.Service + Err error + Endpoints api.Endpoints + EndpointsList api.EndpointsList DeletedID string GottenID string @@ -66,6 +67,10 @@ func (r *ServiceRegistry) WatchServices(label, field labels.Selector, resourceVe return nil, r.Err } +func (r *ServiceRegistry) ListEndpoints() (*api.EndpointsList, error) { + return &r.EndpointsList, r.Err +} + func (r *ServiceRegistry) GetEndpoints(id string) (*api.Endpoints, error) { r.GottenID = id return &r.Endpoints, r.Err