Services and Endpoints weren't properly sync'ing

They need incremental changes and a resync on start.
This commit is contained in:
Clayton Coleman
2014-08-28 22:31:41 -04:00
parent 6c5568261e
commit 01e668187c
14 changed files with 326 additions and 30 deletions

View File

@@ -24,6 +24,7 @@ import (
// Registry is an interface for things that know how to store endpoints.
type Registry interface {
ListEndpoints() (*api.EndpointsList, error)
GetEndpoints(name string) (*api.Endpoints, error)
WatchEndpoints(labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error)
UpdateEndpoints(e api.Endpoints) error

View File

@@ -37,14 +37,17 @@ func NewStorage(registry Registry) apiserver.RESTStorage {
}
}
// Get satisfies the RESTStorage interface but is unimplemented.
// Get satisfies the RESTStorage interface.
func (rs *Storage) Get(id string) (interface{}, error) {
return rs.registry.GetEndpoints(id)
}
// List satisfies the RESTStorage interface but is unimplemented.
// List satisfies the RESTStorage interface.
func (rs *Storage) List(selector labels.Selector) (interface{}, error) {
return nil, errors.New("unimplemented")
if !selector.Empty() {
return nil, errors.New("label selectors are not supported on endpoints")
}
return rs.registry.ListEndpoints()
}
// Watch returns Endpoint events via a watch.Interface.

View File

@@ -22,6 +22,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
)
@@ -67,3 +68,29 @@ func TestGetEndpointsMissingService(t *testing.T) {
t.Errorf("unexpected endpoints: %#v", obj)
}
}
func TestEndpointsRegistryList(t *testing.T) {
registry := registrytest.NewServiceRegistry()
storage := NewStorage(registry)
registry.EndpointsList = api.EndpointsList{
JSONBase: api.JSONBase{ResourceVersion: 1},
Items: []api.Endpoints{
{JSONBase: api.JSONBase{ID: "foo"}},
{JSONBase: api.JSONBase{ID: "bar"}},
},
}
s, _ := storage.List(labels.Everything())
sl := s.(*api.EndpointsList)
if len(sl.Items) != 2 {
t.Fatalf("Expected 2 endpoints, but got %v", len(sl.Items))
}
if e, a := "foo", sl.Items[0].ID; e != a {
t.Errorf("Expected %v, but got %v", e, a)
}
if e, a := "bar", sl.Items[1].ID; e != a {
t.Errorf("Expected %v, but got %v", e, a)
}
if sl.ResourceVersion != 1 {
t.Errorf("Unexpected resource version: %#v", sl)
}
}

View File

@@ -370,6 +370,13 @@ func (r *Registry) WatchServices(label, field labels.Selector, resourceVersion u
return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported")
}
// ListEndpoints obtains a list of Services.
func (r *Registry) ListEndpoints() (*api.EndpointsList, error) {
list := &api.EndpointsList{}
err := r.ExtractList("/registry/services/endpoints", &list.Items, &list.ResourceVersion)
return list, err
}
// UpdateEndpoints update Endpoints of a Service.
func (r *Registry) UpdateEndpoints(e api.Endpoints) error {
// TODO: this is a really bad misuse of AtomicUpdate, need to compute a diff inside the loop.

View File

@@ -679,7 +679,7 @@ func TestEtcdListServices(t *testing.T) {
}
if len(services.Items) != 2 || services.Items[0].ID != "foo" || services.Items[1].ID != "bar" {
t.Errorf("Unexpected pod list: %#v", services)
t.Errorf("Unexpected service list: %#v", services)
}
}
@@ -804,6 +804,35 @@ func TestEtcdUpdateService(t *testing.T) {
}
}
func TestEtcdListEndpoints(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
key := "/registry/services/endpoints"
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{
{
Value: runtime.EncodeOrDie(api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{"127.0.0.1:8345"}}),
},
{
Value: runtime.EncodeOrDie(api.Endpoints{JSONBase: api.JSONBase{ID: "bar"}}),
},
},
},
},
E: nil,
}
registry := NewTestEtcdRegistry(fakeClient)
services, err := registry.ListEndpoints()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(services.Items) != 2 || services.Items[0].ID != "foo" || services.Items[1].ID != "bar" {
t.Errorf("Unexpected endpoints list: %#v", services)
}
}
func TestEtcdGetEndpoints(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)

View File

@@ -27,10 +27,11 @@ func NewServiceRegistry() *ServiceRegistry {
}
type ServiceRegistry struct {
List api.ServiceList
Service *api.Service
Err error
Endpoints api.Endpoints
List api.ServiceList
Service *api.Service
Err error
Endpoints api.Endpoints
EndpointsList api.EndpointsList
DeletedID string
GottenID string
@@ -66,6 +67,10 @@ func (r *ServiceRegistry) WatchServices(label, field labels.Selector, resourceVe
return nil, r.Err
}
func (r *ServiceRegistry) ListEndpoints() (*api.EndpointsList, error) {
return &r.EndpointsList, r.Err
}
func (r *ServiceRegistry) GetEndpoints(id string) (*api.Endpoints, error) {
r.GottenID = id
return &r.Endpoints, r.Err