Merge pull request #1103 from smarterclayton/get_then_watch

Services and Endpoints aren't syncing properly
This commit is contained in:
Daniel Smith 2014-09-03 13:16:41 -07:00
commit 0db7989809
14 changed files with 326 additions and 30 deletions

View File

@ -35,6 +35,7 @@ func init() {
ServerOp{}, ServerOp{},
ContainerManifestList{}, ContainerManifestList{},
Endpoints{}, Endpoints{},
EndpointsList{},
Binding{}, Binding{},
) )
} }

View File

@ -353,6 +353,12 @@ type Endpoints struct {
Endpoints []string `json:"endpoints,omitempty" yaml:"endpoints,omitempty"` Endpoints []string `json:"endpoints,omitempty" yaml:"endpoints,omitempty"`
} }
// EndpointsList is a list of endpoints.
type EndpointsList struct {
JSONBase `json:",inline" yaml:",inline"`
Items []Endpoints `json:"items,omitempty" yaml:"items,omitempty"`
}
// Minion is a worker node in Kubernetenes. // Minion is a worker node in Kubernetenes.
// The name of the minion according to etcd is in JSONBase.ID. // The name of the minion according to etcd is in JSONBase.ID.
type Minion struct { type Minion struct {

View File

@ -35,6 +35,7 @@ func init() {
ServerOp{}, ServerOp{},
ContainerManifestList{}, ContainerManifestList{},
Endpoints{}, Endpoints{},
EndpointsList{},
Binding{}, Binding{},
) )
} }

View File

@ -364,6 +364,12 @@ type Endpoints struct {
Endpoints []string `json:"endpoints,omitempty" yaml:"endpoints,omitempty"` Endpoints []string `json:"endpoints,omitempty" yaml:"endpoints,omitempty"`
} }
// EndpointsList is a list of endpoints.
type EndpointsList struct {
JSONBase `json:",inline" yaml:",inline"`
Items []Endpoints `json:"items,omitempty" yaml:"items,omitempty"`
}
// Minion is a worker node in Kubernetenes. // Minion is a worker node in Kubernetenes.
// The name of the minion according to etcd is in JSONBase.ID. // The name of the minion according to etcd is in JSONBase.ID.
type Minion struct { type Minion struct {

View File

@ -65,12 +65,17 @@ type ReplicationControllerInterface interface {
// ServiceInterface has methods to work with Service resources. // ServiceInterface has methods to work with Service resources.
type ServiceInterface interface { type ServiceInterface interface {
ListServices(selector labels.Selector) (api.ServiceList, error)
GetService(id string) (api.Service, error) GetService(id string) (api.Service, error)
CreateService(api.Service) (api.Service, error) CreateService(api.Service) (api.Service, error)
UpdateService(api.Service) (api.Service, error) UpdateService(api.Service) (api.Service, error)
DeleteService(string) error DeleteService(string) error
WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
}
// EndpointsInterface has methods to work with Endpoints resources
type EndpointsInterface interface {
ListEndpoints(selector labels.Selector) (api.EndpointsList, error)
WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
} }
@ -318,8 +323,9 @@ func (c *Client) WatchReplicationControllers(label, field labels.Selector, resou
Watch() Watch()
} }
func (c *Client) ListServices(selector labels.Selector) (list api.ServiceList, err error) { // ListServices takes a selector, and returns the list of services that match that selector
err = c.Get().Path("services").SelectorParam("labels", selector).Do().Into(&list) func (c *Client) ListServices(selector labels.Selector) (result api.ServiceList, err error) {
err = c.Get().Path("services").SelectorParam("labels", selector).Do().Into(&result)
return return
} }
@ -361,6 +367,12 @@ func (c *Client) WatchServices(label, field labels.Selector, resourceVersion uin
Watch() Watch()
} }
// ListEndpoints takes a selector, and returns the list of endpoints that match that selector
func (c *Client) ListEndpoints(selector labels.Selector) (result api.EndpointsList, err error) {
err = c.Get().Path("endpoints").SelectorParam("labels", selector).Do().Into(&result)
return
}
// WatchEndpoints returns a watch.Interface that watches the requested endpoints for a service. // WatchEndpoints returns a watch.Interface that watches the requested endpoints for a service.
func (c *Client) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { func (c *Client) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return c.Get(). return c.Get().

View File

@ -32,10 +32,13 @@ type FakeAction struct {
// implementation. This makes faking out just the method you want to test easier. // implementation. This makes faking out just the method you want to test easier.
type Fake struct { type Fake struct {
// Fake by default keeps a simple list of the methods that have been called. // Fake by default keeps a simple list of the methods that have been called.
Actions []FakeAction Actions []FakeAction
Pods api.PodList Pods api.PodList
Ctrl api.ReplicationController Ctrl api.ReplicationController
Watch watch.Interface ServiceList api.ServiceList
EndpointsList api.EndpointsList
Err error
Watch watch.Interface
} }
func (c *Fake) ListPods(selector labels.Selector) (api.PodList, error) { func (c *Fake) ListPods(selector labels.Selector) (api.PodList, error) {
@ -93,6 +96,11 @@ func (c *Fake) WatchReplicationControllers(label, field labels.Selector, resourc
return c.Watch, nil return c.Watch, nil
} }
func (c *Fake) ListServices(selector labels.Selector) (api.ServiceList, error) {
c.Actions = append(c.Actions, FakeAction{Action: "list-services"})
return c.ServiceList, c.Err
}
func (c *Fake) GetService(name string) (api.Service, error) { func (c *Fake) GetService(name string) (api.Service, error) {
c.Actions = append(c.Actions, FakeAction{Action: "get-service", Value: name}) c.Actions = append(c.Actions, FakeAction{Action: "get-service", Value: name})
return api.Service{}, nil return api.Service{}, nil
@ -115,12 +123,17 @@ func (c *Fake) DeleteService(service string) error {
func (c *Fake) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { func (c *Fake) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
c.Actions = append(c.Actions, FakeAction{Action: "watch-services", Value: resourceVersion}) c.Actions = append(c.Actions, FakeAction{Action: "watch-services", Value: resourceVersion})
return c.Watch, nil return c.Watch, c.Err
}
func (c *Fake) ListEndpoints(selector labels.Selector) (api.EndpointsList, error) {
c.Actions = append(c.Actions, FakeAction{Action: "list-endpoints"})
return c.EndpointsList, c.Err
} }
func (c *Fake) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { func (c *Fake) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
c.Actions = append(c.Actions, FakeAction{Action: "watch-endpoints", Value: resourceVersion}) c.Actions = append(c.Actions, FakeAction{Action: "watch-endpoints", Value: resourceVersion})
return c.Watch, nil return c.Watch, c.Err
} }
func (c *Fake) ServerVersion() (*version.Info, error) { func (c *Fake) ServerVersion() (*version.Info, error) {

View File

@ -29,6 +29,8 @@ import (
// Watcher is the interface needed to receive changes to services and endpoints. // Watcher is the interface needed to receive changes to services and endpoints.
type Watcher interface { type Watcher interface {
ListServices(label labels.Selector) (api.ServiceList, error)
ListEndpoints(label labels.Selector) (api.EndpointsList, error)
WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
} }
@ -70,6 +72,17 @@ func NewSourceAPI(client Watcher, period time.Duration, services chan<- ServiceU
// runServices loops forever looking for changes to services. // runServices loops forever looking for changes to services.
func (s *SourceAPI) runServices(resourceVersion *uint64) { func (s *SourceAPI) runServices(resourceVersion *uint64) {
if *resourceVersion == 0 {
services, err := s.client.ListServices(labels.Everything())
if err != nil {
glog.Errorf("Unable to load services: %v", err)
time.Sleep(wait.Jitter(s.waitDuration, 0.0))
return
}
*resourceVersion = services.ResourceVersion
s.services <- ServiceUpdate{Op: SET, Services: services.Items}
}
watcher, err := s.client.WatchServices(labels.Everything(), labels.Everything(), *resourceVersion) watcher, err := s.client.WatchServices(labels.Everything(), labels.Everything(), *resourceVersion)
if err != nil { if err != nil {
glog.Errorf("Unable to watch for services changes: %v", err) glog.Errorf("Unable to watch for services changes: %v", err)
@ -97,10 +110,10 @@ func handleServicesWatch(resourceVersion *uint64, ch <-chan watch.Event, updates
switch event.Type { switch event.Type {
case watch.Added, watch.Modified: case watch.Added, watch.Modified:
updates <- ServiceUpdate{Op: SET, Services: []api.Service{*service}} updates <- ServiceUpdate{Op: ADD, Services: []api.Service{*service}}
case watch.Deleted: case watch.Deleted:
updates <- ServiceUpdate{Op: SET} updates <- ServiceUpdate{Op: REMOVE, Services: []api.Service{*service}}
} }
} }
} }
@ -108,6 +121,17 @@ func handleServicesWatch(resourceVersion *uint64, ch <-chan watch.Event, updates
// runEndpoints loops forever looking for changes to endpoints. // runEndpoints loops forever looking for changes to endpoints.
func (s *SourceAPI) runEndpoints(resourceVersion *uint64) { func (s *SourceAPI) runEndpoints(resourceVersion *uint64) {
if *resourceVersion == 0 {
endpoints, err := s.client.ListEndpoints(labels.Everything())
if err != nil {
glog.Errorf("Unable to load endpoints: %v", err)
time.Sleep(wait.Jitter(s.waitDuration, 0.0))
return
}
*resourceVersion = endpoints.ResourceVersion
s.endpoints <- EndpointsUpdate{Op: SET, Endpoints: endpoints.Items}
}
watcher, err := s.client.WatchEndpoints(labels.Everything(), labels.Everything(), *resourceVersion) watcher, err := s.client.WatchEndpoints(labels.Everything(), labels.Everything(), *resourceVersion)
if err != nil { if err != nil {
glog.Errorf("Unable to watch for endpoints changes: %v", err) glog.Errorf("Unable to watch for endpoints changes: %v", err)
@ -135,10 +159,10 @@ func handleEndpointsWatch(resourceVersion *uint64, ch <-chan watch.Event, update
switch event.Type { switch event.Type {
case watch.Added, watch.Modified: case watch.Added, watch.Modified:
updates <- EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints}} updates <- EndpointsUpdate{Op: ADD, Endpoints: []api.Endpoints{*endpoints}}
case watch.Deleted: case watch.Deleted:
updates <- EndpointsUpdate{Op: SET} updates <- EndpointsUpdate{Op: REMOVE, Endpoints: []api.Endpoints{*endpoints}}
} }
} }
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package config package config
import ( import (
"errors"
"reflect" "reflect"
"testing" "testing"
@ -32,7 +33,7 @@ func TestServices(t *testing.T) {
fakeClient := &client.Fake{Watch: fakeWatch} fakeClient := &client.Fake{Watch: fakeWatch}
services := make(chan ServiceUpdate) services := make(chan ServiceUpdate)
source := SourceAPI{client: fakeClient, services: services} source := SourceAPI{client: fakeClient, services: services}
resourceVersion := uint64(0) resourceVersion := uint64(1)
go func() { go func() {
// called twice // called twice
source.runServices(&resourceVersion) source.runServices(&resourceVersion)
@ -41,12 +42,12 @@ func TestServices(t *testing.T) {
// test adding a service to the watch // test adding a service to the watch
fakeWatch.Add(&service) fakeWatch.Add(&service)
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", uint64(0)}}) { if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", uint64(1)}}) {
t.Errorf("expected call to watch-services, got %#v", fakeClient) t.Errorf("expected call to watch-services, got %#v", fakeClient)
} }
actual := <-services actual := <-services
expected := ServiceUpdate{Op: SET, Services: []api.Service{service}} expected := ServiceUpdate{Op: ADD, Services: []api.Service{service}}
if !reflect.DeepEqual(expected, actual) { if !reflect.DeepEqual(expected, actual) {
t.Errorf("expected %#v, got %#v", expected, actual) t.Errorf("expected %#v, got %#v", expected, actual)
} }
@ -54,7 +55,7 @@ func TestServices(t *testing.T) {
// verify that a delete results in a config change // verify that a delete results in a config change
fakeWatch.Delete(&service) fakeWatch.Delete(&service)
actual = <-services actual = <-services
expected = ServiceUpdate{Op: SET} expected = ServiceUpdate{Op: REMOVE, Services: []api.Service{service}}
if !reflect.DeepEqual(expected, actual) { if !reflect.DeepEqual(expected, actual) {
t.Errorf("expected %#v, got %#v", expected, actual) t.Errorf("expected %#v, got %#v", expected, actual)
} }
@ -65,11 +66,91 @@ func TestServices(t *testing.T) {
fakeWatch.Stop() fakeWatch.Stop()
newFakeWatch.Add(&service) newFakeWatch.Add(&service)
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", uint64(0)}, {"watch-services", uint64(3)}}) { if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", uint64(1)}, {"watch-services", uint64(3)}}) {
t.Errorf("expected call to watch-endpoints, got %#v", fakeClient) t.Errorf("expected call to watch-endpoints, got %#v", fakeClient)
} }
} }
func TestServicesFromZero(t *testing.T) {
service := api.Service{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: uint64(2)}}
fakeWatch := watch.NewFake()
fakeWatch.Stop()
fakeClient := &client.Fake{Watch: fakeWatch}
fakeClient.ServiceList = api.ServiceList{
JSONBase: api.JSONBase{ResourceVersion: 2},
Items: []api.Service{
service,
},
}
services := make(chan ServiceUpdate)
source := SourceAPI{client: fakeClient, services: services}
resourceVersion := uint64(0)
ch := make(chan struct{})
go func() {
source.runServices(&resourceVersion)
close(ch)
}()
// should get services SET
actual := <-services
expected := ServiceUpdate{Op: SET, Services: []api.Service{service}}
if !reflect.DeepEqual(expected, actual) {
t.Errorf("expected %#v, got %#v", expected, actual)
}
// should have listed, then watched
<-ch
if resourceVersion != 2 {
t.Errorf("unexpected resource version, got %#v", resourceVersion)
}
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"list-services", nil}, {"watch-services", uint64(2)}}) {
t.Errorf("unexpected actions, got %#v", fakeClient)
}
}
func TestServicesError(t *testing.T) {
fakeClient := &client.Fake{Err: errors.New("test")}
services := make(chan ServiceUpdate)
source := SourceAPI{client: fakeClient, services: services}
resourceVersion := uint64(1)
ch := make(chan struct{})
go func() {
source.runServices(&resourceVersion)
close(ch)
}()
// should have listed only
<-ch
if resourceVersion != 1 {
t.Errorf("unexpected resource version, got %#v", resourceVersion)
}
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", uint64(1)}}) {
t.Errorf("unexpected actions, got %#v", fakeClient)
}
}
func TestServicesFromZeroError(t *testing.T) {
fakeClient := &client.Fake{Err: errors.New("test")}
services := make(chan ServiceUpdate)
source := SourceAPI{client: fakeClient, services: services}
resourceVersion := uint64(0)
ch := make(chan struct{})
go func() {
source.runServices(&resourceVersion)
close(ch)
}()
// should have listed only
<-ch
if resourceVersion != 0 {
t.Errorf("unexpected resource version, got %#v", resourceVersion)
}
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"list-services", nil}}) {
t.Errorf("unexpected actions, got %#v", fakeClient)
}
}
func TestEndpoints(t *testing.T) { func TestEndpoints(t *testing.T) {
endpoint := api.Endpoints{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: uint64(2)}, Endpoints: []string{"127.0.0.1:9000"}} endpoint := api.Endpoints{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: uint64(2)}, Endpoints: []string{"127.0.0.1:9000"}}
@ -77,7 +158,7 @@ func TestEndpoints(t *testing.T) {
fakeClient := &client.Fake{Watch: fakeWatch} fakeClient := &client.Fake{Watch: fakeWatch}
endpoints := make(chan EndpointsUpdate) endpoints := make(chan EndpointsUpdate)
source := SourceAPI{client: fakeClient, endpoints: endpoints} source := SourceAPI{client: fakeClient, endpoints: endpoints}
resourceVersion := uint64(0) resourceVersion := uint64(1)
go func() { go func() {
// called twice // called twice
source.runEndpoints(&resourceVersion) source.runEndpoints(&resourceVersion)
@ -86,12 +167,12 @@ func TestEndpoints(t *testing.T) {
// test adding an endpoint to the watch // test adding an endpoint to the watch
fakeWatch.Add(&endpoint) fakeWatch.Add(&endpoint)
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", uint64(0)}}) { if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", uint64(1)}}) {
t.Errorf("expected call to watch-endpoints, got %#v", fakeClient) t.Errorf("expected call to watch-endpoints, got %#v", fakeClient)
} }
actual := <-endpoints actual := <-endpoints
expected := EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{endpoint}} expected := EndpointsUpdate{Op: ADD, Endpoints: []api.Endpoints{endpoint}}
if !reflect.DeepEqual(expected, actual) { if !reflect.DeepEqual(expected, actual) {
t.Errorf("expected %#v, got %#v", expected, actual) t.Errorf("expected %#v, got %#v", expected, actual)
} }
@ -99,7 +180,7 @@ func TestEndpoints(t *testing.T) {
// verify that a delete results in a config change // verify that a delete results in a config change
fakeWatch.Delete(&endpoint) fakeWatch.Delete(&endpoint)
actual = <-endpoints actual = <-endpoints
expected = EndpointsUpdate{Op: SET} expected = EndpointsUpdate{Op: REMOVE, Endpoints: []api.Endpoints{endpoint}}
if !reflect.DeepEqual(expected, actual) { if !reflect.DeepEqual(expected, actual) {
t.Errorf("expected %#v, got %#v", expected, actual) t.Errorf("expected %#v, got %#v", expected, actual)
} }
@ -110,7 +191,87 @@ func TestEndpoints(t *testing.T) {
fakeWatch.Stop() fakeWatch.Stop()
newFakeWatch.Add(&endpoint) newFakeWatch.Add(&endpoint)
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", uint64(0)}, {"watch-endpoints", uint64(3)}}) { if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", uint64(1)}, {"watch-endpoints", uint64(3)}}) {
t.Errorf("expected call to watch-endpoints, got %#v", fakeClient) t.Errorf("expected call to watch-endpoints, got %#v", fakeClient)
} }
} }
func TestEndpointsFromZero(t *testing.T) {
endpoint := api.Endpoints{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: uint64(2)}, Endpoints: []string{"127.0.0.1:9000"}}
fakeWatch := watch.NewFake()
fakeWatch.Stop()
fakeClient := &client.Fake{Watch: fakeWatch}
fakeClient.EndpointsList = api.EndpointsList{
JSONBase: api.JSONBase{ResourceVersion: 2},
Items: []api.Endpoints{
endpoint,
},
}
endpoints := make(chan EndpointsUpdate)
source := SourceAPI{client: fakeClient, endpoints: endpoints}
resourceVersion := uint64(0)
ch := make(chan struct{})
go func() {
source.runEndpoints(&resourceVersion)
close(ch)
}()
// should get endpoints SET
actual := <-endpoints
expected := EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{endpoint}}
if !reflect.DeepEqual(expected, actual) {
t.Errorf("expected %#v, got %#v", expected, actual)
}
// should have listed, then watched
<-ch
if resourceVersion != 2 {
t.Errorf("unexpected resource version, got %#v", resourceVersion)
}
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"list-endpoints", nil}, {"watch-endpoints", uint64(2)}}) {
t.Errorf("unexpected actions, got %#v", fakeClient)
}
}
func TestEndpointsError(t *testing.T) {
fakeClient := &client.Fake{Err: errors.New("test")}
endpoints := make(chan EndpointsUpdate)
source := SourceAPI{client: fakeClient, endpoints: endpoints}
resourceVersion := uint64(1)
ch := make(chan struct{})
go func() {
source.runEndpoints(&resourceVersion)
close(ch)
}()
// should have listed only
<-ch
if resourceVersion != 1 {
t.Errorf("unexpected resource version, got %#v", resourceVersion)
}
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", uint64(1)}}) {
t.Errorf("unexpected actions, got %#v", fakeClient)
}
}
func TestEndpointsFromZeroError(t *testing.T) {
fakeClient := &client.Fake{Err: errors.New("test")}
endpoints := make(chan EndpointsUpdate)
source := SourceAPI{client: fakeClient, endpoints: endpoints}
resourceVersion := uint64(0)
ch := make(chan struct{})
go func() {
source.runEndpoints(&resourceVersion)
close(ch)
}()
// should have listed only
<-ch
if resourceVersion != 0 {
t.Errorf("unexpected resource version, got %#v", resourceVersion)
}
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"list-endpoints", nil}}) {
t.Errorf("unexpected actions, got %#v", fakeClient)
}
}

View File

@ -24,6 +24,7 @@ import (
// Registry is an interface for things that know how to store endpoints. // Registry is an interface for things that know how to store endpoints.
type Registry interface { type Registry interface {
ListEndpoints() (*api.EndpointsList, error)
GetEndpoints(name string) (*api.Endpoints, error) GetEndpoints(name string) (*api.Endpoints, error)
WatchEndpoints(labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error) WatchEndpoints(labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error)
UpdateEndpoints(e api.Endpoints) error UpdateEndpoints(e api.Endpoints) error

View File

@ -37,14 +37,17 @@ func NewStorage(registry Registry) apiserver.RESTStorage {
} }
} }
// Get satisfies the RESTStorage interface but is unimplemented. // Get satisfies the RESTStorage interface.
func (rs *Storage) Get(id string) (interface{}, error) { func (rs *Storage) Get(id string) (interface{}, error) {
return rs.registry.GetEndpoints(id) return rs.registry.GetEndpoints(id)
} }
// List satisfies the RESTStorage interface but is unimplemented. // List satisfies the RESTStorage interface.
func (rs *Storage) List(selector labels.Selector) (interface{}, error) { func (rs *Storage) List(selector labels.Selector) (interface{}, error) {
return nil, errors.New("unimplemented") if !selector.Empty() {
return nil, errors.New("label selectors are not supported on endpoints")
}
return rs.registry.ListEndpoints()
} }
// Watch returns Endpoint events via a watch.Interface. // Watch returns Endpoint events via a watch.Interface.

View File

@ -22,6 +22,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
) )
@ -67,3 +68,29 @@ func TestGetEndpointsMissingService(t *testing.T) {
t.Errorf("unexpected endpoints: %#v", obj) t.Errorf("unexpected endpoints: %#v", obj)
} }
} }
func TestEndpointsRegistryList(t *testing.T) {
registry := registrytest.NewServiceRegistry()
storage := NewStorage(registry)
registry.EndpointsList = api.EndpointsList{
JSONBase: api.JSONBase{ResourceVersion: 1},
Items: []api.Endpoints{
{JSONBase: api.JSONBase{ID: "foo"}},
{JSONBase: api.JSONBase{ID: "bar"}},
},
}
s, _ := storage.List(labels.Everything())
sl := s.(*api.EndpointsList)
if len(sl.Items) != 2 {
t.Fatalf("Expected 2 endpoints, but got %v", len(sl.Items))
}
if e, a := "foo", sl.Items[0].ID; e != a {
t.Errorf("Expected %v, but got %v", e, a)
}
if e, a := "bar", sl.Items[1].ID; e != a {
t.Errorf("Expected %v, but got %v", e, a)
}
if sl.ResourceVersion != 1 {
t.Errorf("Unexpected resource version: %#v", sl)
}
}

View File

@ -370,6 +370,13 @@ func (r *Registry) WatchServices(label, field labels.Selector, resourceVersion u
return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported") return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported")
} }
// ListEndpoints obtains a list of Services.
func (r *Registry) ListEndpoints() (*api.EndpointsList, error) {
list := &api.EndpointsList{}
err := r.ExtractList("/registry/services/endpoints", &list.Items, &list.ResourceVersion)
return list, err
}
// UpdateEndpoints update Endpoints of a Service. // UpdateEndpoints update Endpoints of a Service.
func (r *Registry) UpdateEndpoints(e api.Endpoints) error { func (r *Registry) UpdateEndpoints(e api.Endpoints) error {
// TODO: this is a really bad misuse of AtomicUpdate, need to compute a diff inside the loop. // TODO: this is a really bad misuse of AtomicUpdate, need to compute a diff inside the loop.

View File

@ -679,7 +679,7 @@ func TestEtcdListServices(t *testing.T) {
} }
if len(services.Items) != 2 || services.Items[0].ID != "foo" || services.Items[1].ID != "bar" { if len(services.Items) != 2 || services.Items[0].ID != "foo" || services.Items[1].ID != "bar" {
t.Errorf("Unexpected pod list: %#v", services) t.Errorf("Unexpected service list: %#v", services)
} }
} }
@ -804,6 +804,35 @@ func TestEtcdUpdateService(t *testing.T) {
} }
} }
func TestEtcdListEndpoints(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
key := "/registry/services/endpoints"
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{
{
Value: runtime.EncodeOrDie(api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{"127.0.0.1:8345"}}),
},
{
Value: runtime.EncodeOrDie(api.Endpoints{JSONBase: api.JSONBase{ID: "bar"}}),
},
},
},
},
E: nil,
}
registry := NewTestEtcdRegistry(fakeClient)
services, err := registry.ListEndpoints()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(services.Items) != 2 || services.Items[0].ID != "foo" || services.Items[1].ID != "bar" {
t.Errorf("Unexpected endpoints list: %#v", services)
}
}
func TestEtcdGetEndpoints(t *testing.T) { func TestEtcdGetEndpoints(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)

View File

@ -27,10 +27,11 @@ func NewServiceRegistry() *ServiceRegistry {
} }
type ServiceRegistry struct { type ServiceRegistry struct {
List api.ServiceList List api.ServiceList
Service *api.Service Service *api.Service
Err error Err error
Endpoints api.Endpoints Endpoints api.Endpoints
EndpointsList api.EndpointsList
DeletedID string DeletedID string
GottenID string GottenID string
@ -66,6 +67,10 @@ func (r *ServiceRegistry) WatchServices(label, field labels.Selector, resourceVe
return nil, r.Err return nil, r.Err
} }
func (r *ServiceRegistry) ListEndpoints() (*api.EndpointsList, error) {
return &r.EndpointsList, r.Err
}
func (r *ServiceRegistry) GetEndpoints(id string) (*api.Endpoints, error) { func (r *ServiceRegistry) GetEndpoints(id string) (*api.Endpoints, error) {
r.GottenID = id r.GottenID = id
return &r.Endpoints, r.Err return &r.Endpoints, r.Err