port endpoints to generic etcd

This commit is contained in:
Masahiro Sano 2015-03-15 15:03:46 +09:00
parent 012ba0e3b0
commit 410e11c305
15 changed files with 573 additions and 487 deletions

View File

@ -151,6 +151,13 @@ func ValidateSecretName(name string, prefix bool) (bool, string) {
return nameIsDNSSubdomain(name, prefix)
}
// ValidateEndpointsName can be used to check whether the given endpoints name is valid.
// Prefix indicates this name will be used as part of generation, in which case
// trailing dashes are allowed.
func ValidateEndpointsName(name string, prefix bool) (bool, string) {
return nameIsDNSSubdomain(name, prefix)
}
// nameIsDNSSubdomain is a ValidateNameFunc for names that must be a DNS subdomain.
func nameIsDNSSubdomain(name string, prefix bool) (bool, string) {
if prefix {
@ -1047,7 +1054,7 @@ func validateFinalizerName(stringValue string) errs.ValidationErrorList {
return errs.ValidationErrorList{}
}
// ValidateNamespaceUpdate tests to make sure a mamespace update can be applied. Modifies oldNamespace.
// ValidateNamespaceUpdate tests to make sure a namespace update can be applied. Modifies oldNamespace.
func ValidateNamespaceUpdate(oldNamespace *api.Namespace, namespace *api.Namespace) errs.ValidationErrorList {
allErrs := errs.ValidationErrorList{}
allErrs = append(allErrs, ValidateObjectMetaUpdate(&oldNamespace.ObjectMeta, &namespace.ObjectMeta).Prefix("metadata")...)
@ -1087,3 +1094,17 @@ func ValidateNamespaceFinalizeUpdate(newNamespace, oldNamespace *api.Namespace)
fmt.Printf("NEW NAMESPACE FINALIZERS : %v\n", newNamespace.Spec.Finalizers)
return allErrs
}
// ValidateEndpoints tests if required fields are set.
func ValidateEndpoints(endpoints *api.Endpoints) errs.ValidationErrorList {
allErrs := errs.ValidationErrorList{}
allErrs = append(allErrs, ValidateObjectMeta(&endpoints.ObjectMeta, true, ValidateEndpointsName).Prefix("metadata")...)
return allErrs
}
// ValidateEndpointsUpdate tests to make sure an endpoints update can be applied.
func ValidateEndpointsUpdate(oldEndpoints *api.Endpoints, endpoints *api.Endpoints) errs.ValidationErrorList {
allErrs := errs.ValidationErrorList{}
allErrs = append(allErrs, ValidateObjectMetaUpdate(&oldEndpoints.ObjectMeta, &endpoints.ObjectMeta).Prefix("metadata")...)
return allErrs
}

View File

@ -44,6 +44,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
controlleretcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint"
endpointsetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/event"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/limitrange"
@ -361,11 +362,13 @@ func (m *Master) init(c *Config) {
namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewStorage(c.EtcdHelper)
m.namespaceRegistry = namespace.NewRegistry(namespaceStorage)
endpointsStorage := endpointsetcd.NewStorage(c.EtcdHelper)
m.endpointRegistry = endpoint.NewRegistry(endpointsStorage)
// TODO: split me up into distinct storage registries
registry := etcd.NewRegistry(c.EtcdHelper, podRegistry)
registry := etcd.NewRegistry(c.EtcdHelper, podRegistry, m.endpointRegistry)
m.serviceRegistry = registry
m.endpointRegistry = registry
m.nodeRegistry = registry
nodeStorage := minion.NewStorage(m.nodeRegistry, c.KubeletClient)
@ -394,8 +397,8 @@ func (m *Master) init(c *Config) {
"bindings": bindingStorage,
"replicationControllers": controllerStorage,
"services": service.NewStorage(m.serviceRegistry, c.Cloud, m.nodeRegistry, m.portalNet, c.ClusterName),
"endpoints": endpoint.NewStorage(m.endpointRegistry),
"services": service.NewStorage(m.serviceRegistry, c.Cloud, m.nodeRegistry, m.endpointRegistry, m.portalNet, c.ClusterName),
"endpoints": endpointsStorage,
"minions": nodeStorage,
"nodes": nodeStorage,
"events": event.NewStorage(eventRegistry),

View File

@ -0,0 +1,62 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic"
etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
)
// rest implements a RESTStorage for endpoints against etcd
type REST struct {
*etcdgeneric.Etcd
}
// NewStorage returns a RESTStorage object that will work against endpoints.
func NewStorage(h tools.EtcdHelper) *REST {
prefix := "/registry/services/endpoints"
return &REST{
&etcdgeneric.Etcd{
NewFunc: func() runtime.Object { return &api.Endpoints{} },
NewListFunc: func() runtime.Object { return &api.EndpointsList{} },
KeyRootFunc: func(ctx api.Context) string {
return etcdgeneric.NamespaceKeyRootFunc(ctx, prefix)
},
KeyFunc: func(ctx api.Context, name string) (string, error) {
return etcdgeneric.NamespaceKeyFunc(ctx, prefix, name)
},
ObjectNameFunc: func(obj runtime.Object) (string, error) {
return obj.(*api.Endpoints).Name, nil
},
PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher {
return endpoint.MatchEndpoints(label, field)
},
EndpointName: "endpoints",
CreateStrategy: endpoint.Strategy,
UpdateStrategy: endpoint.Strategy,
Helper: h,
},
}
}

View File

@ -0,0 +1,299 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest/resttest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
)
func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) {
fakeEtcdClient := tools.NewFakeEtcdClient(t)
fakeEtcdClient.TestIndex = true
helper := tools.NewEtcdHelper(fakeEtcdClient, latest.Codec)
return fakeEtcdClient, helper
}
func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) {
fakeEtcdClient, h := newHelper(t)
storage := NewStorage(h)
return storage, fakeEtcdClient
}
func validNewEndpoints() *api.Endpoints {
return &api.Endpoints{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: api.NamespaceDefault,
},
Protocol: "TCP",
Endpoints: []api.Endpoint{{IP: "baz"}},
}
}
func validChangedEndpoints() *api.Endpoints {
endpoints := validNewEndpoints()
endpoints.ResourceVersion = "1"
endpoints.Endpoints = []api.Endpoint{{IP: "baz"}, {IP: "bar"}}
return endpoints
}
func TestCreate(t *testing.T) {
storage, fakeEtcdClient := newStorage(t)
test := resttest.New(t, storage, fakeEtcdClient.SetError)
endpoints := validNewEndpoints()
endpoints.ObjectMeta = api.ObjectMeta{}
test.TestCreate(
// valid
endpoints,
// invalid
&api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "_-a123-a_"},
},
)
}
func TestDelete(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeEtcdClient := newStorage(t)
test := resttest.New(t, storage, fakeEtcdClient.SetError)
endpoints := validChangedEndpoints()
key, _ := storage.KeyFunc(ctx, endpoints.Name)
createFn := func() runtime.Object {
fakeEtcdClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(latest.Codec, endpoints),
ModifiedIndex: 1,
},
},
}
return endpoints
}
gracefulSetFn := func() bool {
if fakeEtcdClient.Data[key].R.Node == nil {
return false
}
return fakeEtcdClient.Data[key].R.Node.TTL == 30
}
test.TestDeleteNoGraceful(createFn, gracefulSetFn)
}
func TestEtcdListEndpoints(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeClient := newStorage(t)
key := storage.KeyRootFunc(ctx)
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Protocol: "TCP", Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 8345}}}),
},
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar"}, Protocol: "TCP"}),
},
},
},
},
E: nil,
}
endpointsObj, err := storage.List(ctx, labels.Everything(), fields.Everything())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
endpoints := endpointsObj.(*api.EndpointsList)
if len(endpoints.Items) != 2 || endpoints.Items[0].Name != "foo" || endpoints.Items[1].Name != "bar" {
t.Errorf("Unexpected endpoints list: %#v", endpoints)
}
}
func TestEtcdGetEndpoints(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeClient := newStorage(t)
endpoints := validNewEndpoints()
name := endpoints.Name
key, _ := storage.KeyFunc(ctx, name)
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, endpoints), 0)
response, err := fakeClient.Get(key, false, false)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
var endpointsOut api.Endpoints
err = latest.Codec.DecodeInto([]byte(response.Node.Value), &endpointsOut)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
obj, err := storage.Get(ctx, name)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
got := obj.(*api.Endpoints)
endpoints.ObjectMeta.ResourceVersion = got.ObjectMeta.ResourceVersion
if e, a := endpoints, got; !api.Semantic.DeepEqual(*e, *a) {
t.Errorf("Unexpected endpoints: %#v, expected %#v", e, a)
}
}
func TestListEmptyEndpointsList(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeClient := newStorage(t)
fakeClient.ChangeIndex = 1
key := storage.KeyRootFunc(ctx)
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{},
E: fakeClient.NewError(tools.EtcdErrorCodeNotFound),
}
endpoints, err := storage.List(ctx, labels.Everything(), fields.Everything())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(endpoints.(*api.EndpointsList).Items) != 0 {
t.Errorf("Unexpected non-zero pod list: %#v", endpoints)
}
if endpoints.(*api.EndpointsList).ResourceVersion != "1" {
t.Errorf("Unexpected resource version: %#v", endpoints)
}
}
func TestListEndpointsList(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeClient := newStorage(t)
fakeClient.ChangeIndex = 1
key := storage.KeyRootFunc(ctx)
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
}),
},
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar"},
}),
},
},
},
},
}
endpointsObj, err := storage.List(ctx, labels.Everything(), fields.Everything())
endpoints := endpointsObj.(*api.EndpointsList)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(endpoints.Items) != 2 {
t.Errorf("Unexpected endpoints list: %#v", endpoints)
}
if endpoints.Items[0].Name != "foo" {
t.Errorf("Unexpected endpoints: %#v", endpoints.Items[0])
}
if endpoints.Items[1].Name != "bar" {
t.Errorf("Unexpected endpoints: %#v", endpoints.Items[1])
}
}
func TestEndpointsDecode(t *testing.T) {
storage, _ := newStorage(t)
expected := validNewEndpoints()
body, err := latest.Codec.Encode(expected)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
actual := storage.New()
if err := latest.Codec.DecodeInto(body, actual); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !api.Semantic.DeepEqual(expected, actual) {
t.Errorf("mismatch: %s", util.ObjectDiff(expected, actual))
}
}
func TestEtcdUpdateEndpoints(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeClient := newStorage(t)
endpoints := validChangedEndpoints()
key, _ := storage.KeyFunc(ctx, "foo")
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, validNewEndpoints()), 0)
_, _, err := storage.Update(ctx, endpoints)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
response, err := fakeClient.Get(key, false, false)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
var endpointsOut api.Endpoints
err = latest.Codec.DecodeInto([]byte(response.Node.Value), &endpointsOut)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
endpoints.ObjectMeta.ResourceVersion = endpointsOut.ObjectMeta.ResourceVersion
if !api.Semantic.DeepEqual(endpoints, &endpointsOut) {
t.Errorf("Unexpected endpoints: %#v, expected %#v", &endpointsOut, endpoints)
}
}
func TestDeleteEndpoints(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeClient := newStorage(t)
endpoints := validNewEndpoints()
name := endpoints.Name
key, _ := storage.KeyFunc(ctx, name)
fakeClient.ChangeIndex = 1
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(latest.Codec, endpoints),
ModifiedIndex: 1,
CreatedIndex: 1,
},
},
}
_, err := storage.Delete(ctx, name, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}

View File

@ -18,6 +18,7 @@ package endpoint
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
@ -29,4 +30,46 @@ type Registry interface {
GetEndpoints(ctx api.Context, name string) (*api.Endpoints, error)
WatchEndpoints(ctx api.Context, labels labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error)
UpdateEndpoints(ctx api.Context, e *api.Endpoints) error
DeleteEndpoints(ctx api.Context, name string) error
}
// storage puts strong typing around storage calls
type storage struct {
rest.StandardStorage
}
// NewRegistry returns a new Registry interface for the given Storage. Any mismatched
// types will panic.
func NewRegistry(s rest.StandardStorage) Registry {
return &storage{s}
}
func (s *storage) ListEndpoints(ctx api.Context) (*api.EndpointsList, error) {
obj, err := s.List(ctx, labels.Everything(), fields.Everything())
if err != nil {
return nil, err
}
return obj.(*api.EndpointsList), nil
}
func (s *storage) WatchEndpoints(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) {
return s.Watch(ctx, label, field, resourceVersion)
}
func (s *storage) GetEndpoints(ctx api.Context, name string) (*api.Endpoints, error) {
obj, err := s.Get(ctx, name)
if err != nil {
return nil, err
}
return obj.(*api.Endpoints), nil
}
func (s *storage) UpdateEndpoints(ctx api.Context, endpoints *api.Endpoints) error {
_, _, err := s.Update(ctx, endpoints)
return err
}
func (s *storage) DeleteEndpoints(ctx api.Context, name string) error {
_, err := s.Delete(ctx, name, nil)
return err
}

View File

@ -20,84 +20,65 @@ import (
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/fielderrors"
)
// REST adapts endpoints into apiserver's RESTStorage model.
type REST struct {
registry Registry
// endpointsStrategy implements behavior for Endpoints
type endpointsStrategy struct {
runtime.ObjectTyper
api.NameGenerator
}
// NewStorage returns a new rest.Storage implementation for endpoints
func NewStorage(registry Registry) *REST {
return &REST{
registry: registry,
}
// Strategy is the default logic that applies when creating and updating Endpoint
// objects via the REST API.
var Strategy = endpointsStrategy{api.Scheme, api.SimpleNameGenerator}
// NamespaceScoped is true for endpoints.
func (endpointsStrategy) NamespaceScoped() bool {
return true
}
// Get satisfies the RESTStorage interface.
func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
return rs.registry.GetEndpoints(ctx, id)
// ResetBeforeCreate clears fields that are not allowed to be set by end users on creation.
func (endpointsStrategy) ResetBeforeCreate(obj runtime.Object) {
_ = obj.(*api.Endpoints)
}
// List satisfies the RESTStorage interface.
func (rs *REST) List(ctx api.Context, label labels.Selector, field fields.Selector) (runtime.Object, error) {
if !label.Empty() || !field.Empty() {
return nil, errors.NewBadRequest("label/field selectors are not supported on endpoints")
}
return rs.registry.ListEndpoints(ctx)
// Validate validates a new endpoints.
func (endpointsStrategy) Validate(obj runtime.Object) fielderrors.ValidationErrorList {
return validation.ValidateEndpoints(obj.(*api.Endpoints))
}
// Watch returns Endpoint events via a watch.Interface.
// It implements rest.Watcher.
func (rs *REST) Watch(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) {
return rs.registry.WatchEndpoints(ctx, label, field, resourceVersion)
// AllowCreateOnUpdate is true for endpoints.
func (endpointsStrategy) AllowCreateOnUpdate() bool {
return true
}
// Create satisfies the RESTStorage interface.
func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) {
// ValidateUpdate is the default update validation for an end user.
func (endpointsStrategy) ValidateUpdate(obj, old runtime.Object) fielderrors.ValidationErrorList {
return validation.ValidateEndpointsUpdate(old.(*api.Endpoints), obj.(*api.Endpoints))
}
// MatchEndpoints returns a generic matcher for a given label and field selector.
func MatchEndpoints(label labels.Selector, field fields.Selector) generic.Matcher {
return generic.MatcherFunc(func(obj runtime.Object) (bool, error) {
endpoints, ok := obj.(*api.Endpoints)
if !ok {
return nil, fmt.Errorf("not an endpoints: %#v", obj)
return false, fmt.Errorf("not a endpoints")
}
if len(endpoints.Name) == 0 {
return nil, fmt.Errorf("id is required: %#v", obj)
}
if !api.ValidNamespace(ctx, &endpoints.ObjectMeta) {
return nil, errors.NewConflict("endpoints", endpoints.Namespace, fmt.Errorf("Endpoints.Namespace does not match the provided context"))
}
api.FillObjectMetaSystemFields(ctx, &endpoints.ObjectMeta)
err := rs.registry.UpdateEndpoints(ctx, endpoints)
if err != nil {
return nil, err
}
return rs.registry.GetEndpoints(ctx, endpoints.Name)
fields := EndpointsToSelectableFields(endpoints)
return label.Matches(labels.Set(endpoints.Labels)) && field.Matches(fields), nil
})
}
// Update satisfies the RESTStorage interface.
func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) {
endpoints, ok := obj.(*api.Endpoints)
if !ok {
return nil, false, fmt.Errorf("not an endpoints: %#v", obj)
// EndpointsToSelectableFields returns a label set that represents the object
// TODO: fields are not labels, and the validation rules for them do not apply.
func EndpointsToSelectableFields(endpoints *api.Endpoints) labels.Set {
return labels.Set{
"name": endpoints.Name,
}
err := rs.registry.UpdateEndpoints(ctx, endpoints)
if err != nil {
return nil, false, err
}
out, err := rs.registry.GetEndpoints(ctx, endpoints.Name)
return out, false, err
}
// New implements the RESTStorage interface.
func (rs REST) New() runtime.Object {
return &api.Endpoints{}
}
func (*REST) NewList() runtime.Object {
return &api.EndpointsList{}
}

View File

@ -1,99 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package endpoint
import (
"reflect"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
)
func TestGetEndpoints(t *testing.T) {
registry := &registrytest.ServiceRegistry{
Endpoints: api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}},
},
}
storage := NewStorage(registry)
ctx := api.NewContext()
obj, err := storage.Get(ctx, "foo")
if err != nil {
t.Fatalf("unexpected error: %#v", err)
}
if !reflect.DeepEqual([]api.Endpoint{{IP: "127.0.0.1", Port: 9000}}, obj.(*api.Endpoints).Endpoints) {
t.Errorf("unexpected endpoints: %#v", obj)
}
}
func TestGetEndpointsMissingService(t *testing.T) {
registry := &registrytest.ServiceRegistry{
Err: errors.NewNotFound("service", "foo"),
}
storage := NewStorage(registry)
ctx := api.NewContext()
// returns service not found
_, err := storage.Get(ctx, "foo")
if !errors.IsNotFound(err) || !reflect.DeepEqual(err, errors.NewNotFound("service", "foo")) {
t.Errorf("expected NotFound error, got %#v", err)
}
// returns empty endpoints
registry.Err = nil
registry.Service = &api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo"},
}
obj, err := storage.Get(ctx, "foo")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if obj.(*api.Endpoints).Endpoints != nil {
t.Errorf("unexpected endpoints: %#v", obj)
}
}
func TestEndpointsRegistryList(t *testing.T) {
registry := registrytest.NewServiceRegistry()
storage := NewStorage(registry)
registry.EndpointsList = api.EndpointsList{
ListMeta: api.ListMeta{ResourceVersion: "1"},
Items: []api.Endpoints{
{ObjectMeta: api.ObjectMeta{Name: "foo"}},
{ObjectMeta: api.ObjectMeta{Name: "bar"}},
},
}
ctx := api.NewContext()
s, _ := storage.List(ctx, labels.Everything(), fields.Everything())
sl := s.(*api.EndpointsList)
if len(sl.Items) != 2 {
t.Fatalf("Expected 2 endpoints, but got %v", len(sl.Items))
}
if e, a := "foo", sl.Items[0].Name; e != a {
t.Errorf("Expected %v, but got %v", e, a)
}
if e, a := "bar", sl.Items[1].Name; e != a {
t.Errorf("Expected %v, but got %v", e, a)
}
if sl.ResourceVersion != "1" {
t.Errorf("Unexpected resource version: %#v", sl)
}
}

View File

@ -20,9 +20,11 @@ import (
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
@ -36,8 +38,6 @@ const (
ControllerPath string = "/registry/controllers"
// ServicePath is the path to service resources in etcd
ServicePath string = "/registry/services/specs"
// ServiceEndpointPath is the path to service endpoints resources in etcd
ServiceEndpointPath string = "/registry/services/endpoints"
// NodePath is the path to node resources in etcd
NodePath string = "/registry/minions"
)
@ -50,13 +50,15 @@ const (
type Registry struct {
tools.EtcdHelper
pods pod.Registry
endpoints endpoint.Registry
}
// NewRegistry creates an etcd registry.
func NewRegistry(helper tools.EtcdHelper, pods pod.Registry) *Registry {
func NewRegistry(helper tools.EtcdHelper, pods pod.Registry, endpoints endpoint.Registry) *Registry {
registry := &Registry{
EtcdHelper: helper,
pods: pods,
endpoints: endpoints,
}
return registry
}
@ -226,30 +228,6 @@ func (r *Registry) GetService(ctx api.Context, name string) (*api.Service, error
return &svc, nil
}
// GetEndpoints obtains the endpoints for the service identified by 'name'.
func (r *Registry) GetEndpoints(ctx api.Context, name string) (*api.Endpoints, error) {
var endpoints api.Endpoints
key, err := makeServiceEndpointsKey(ctx, name)
if err != nil {
return nil, err
}
err = r.ExtractObj(key, &endpoints, false)
if err != nil {
return nil, etcderr.InterpretGetError(err, "endpoints", name)
}
return &endpoints, nil
}
// makeServiceEndpointsListKey constructs etcd paths to service endpoint directories enforcing namespace rules.
func makeServiceEndpointsListKey(ctx api.Context) string {
return MakeEtcdListKey(ctx, ServiceEndpointPath)
}
// makeServiceEndpointsListKey constructs etcd paths to service endpoint items enforcing namespace rules.
func makeServiceEndpointsKey(ctx api.Context, name string) (string, error) {
return MakeEtcdItemKey(ctx, ServiceEndpointPath, name)
}
// DeleteService deletes a Service specified by its name.
func (r *Registry) DeleteService(ctx api.Context, name string) error {
key, err := makeServiceKey(ctx, name)
@ -263,13 +241,10 @@ func (r *Registry) DeleteService(ctx api.Context, name string) error {
// TODO: can leave dangling endpoints, and potentially return incorrect
// endpoints if a new service is created with the same name
key, err = makeServiceEndpointsKey(ctx, name)
if err != nil {
err = r.endpoints.DeleteEndpoints(ctx, name)
if err != nil && !errors.IsNotFound(err) {
return err
}
if err := r.Delete(key, true); err != nil && !tools.IsEtcdNotFound(err) {
return etcderr.InterpretDeleteError(err, "endpoints", name)
}
return nil
}
@ -306,51 +281,6 @@ func (r *Registry) WatchServices(ctx api.Context, label labels.Selector, field f
return nil, fmt.Errorf("only the 'name' and default (everything) field selectors are supported")
}
// ListEndpoints obtains a list of Services.
func (r *Registry) ListEndpoints(ctx api.Context) (*api.EndpointsList, error) {
list := &api.EndpointsList{}
key := makeServiceEndpointsListKey(ctx)
err := r.ExtractToList(key, list)
return list, err
}
// UpdateEndpoints update Endpoints of a Service.
func (r *Registry) UpdateEndpoints(ctx api.Context, endpoints *api.Endpoints) error {
key, err := makeServiceEndpointsKey(ctx, endpoints.Name)
if err != nil {
return err
}
// TODO: this is a really bad misuse of AtomicUpdate, need to compute a diff inside the loop.
err = r.AtomicUpdate(key, &api.Endpoints{}, true,
func(input runtime.Object) (runtime.Object, uint64, error) {
// TODO: racy - label query is returning different results for two simultaneous updaters
return endpoints, 0, nil
})
return etcderr.InterpretUpdateError(err, "endpoints", endpoints.Name)
}
// WatchEndpoints begins watching for new, changed, or deleted endpoint configurations.
func (r *Registry) WatchEndpoints(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) {
version, err := tools.ParseWatchResourceVersion(resourceVersion, "endpoints")
if err != nil {
return nil, err
}
if !label.Empty() {
return nil, fmt.Errorf("label selectors are not supported on endpoints")
}
if value, found := field.RequiresExactMatch("name"); found {
key, err := makeServiceEndpointsKey(ctx, value)
if err != nil {
return nil, err
}
return r.Watch(key, version), nil
}
if field.Empty() {
return r.WatchList(makeServiceEndpointsListKey(ctx), version, tools.Everything)
}
return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported")
}
func makeNodeKey(nodeID string) string {
return NodePath + "/" + nodeID
}

View File

@ -26,6 +26,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint"
endpointetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint/etcd"
etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
podetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod/etcd"
@ -36,14 +38,15 @@ import (
)
func NewTestEtcdRegistry(client tools.EtcdClient) *Registry {
registry := NewRegistry(tools.NewEtcdHelper(client, latest.Codec), nil)
registry := NewRegistry(tools.NewEtcdHelper(client, latest.Codec), nil, nil)
return registry
}
func NewTestEtcdRegistryWithPods(client tools.EtcdClient) *Registry {
helper := tools.NewEtcdHelper(client, latest.Codec)
podStorage, _, _ := podetcd.NewStorage(helper)
registry := NewRegistry(helper, pod.NewRegistry(podStorage))
endpointStorage := endpointetcd.NewStorage(helper)
registry := NewRegistry(helper, pod.NewRegistry(podStorage), endpoint.NewRegistry(endpointStorage))
return registry
}
@ -532,10 +535,10 @@ func TestEtcdGetServiceNotFound(t *testing.T) {
func TestEtcdDeleteService(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
registry := NewTestEtcdRegistryWithPods(fakeClient)
key, _ := makeServiceKey(ctx, "foo")
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0)
endpointsKey, _ := makeServiceEndpointsKey(ctx, "foo")
endpointsKey, _ := etcdgeneric.NamespaceKeyFunc(ctx, "/registry/services/endpoints", "foo")
fakeClient.Set(endpointsKey, runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Protocol: "TCP"}), 0)
err := registry.DeleteService(ctx, "foo")
@ -594,89 +597,6 @@ func TestEtcdUpdateService(t *testing.T) {
}
}
func TestEtcdListEndpoints(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
key := makeServiceEndpointsListKey(ctx)
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Protocol: "TCP", Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 8345}}}),
},
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar"}, Protocol: "TCP"}),
},
},
},
},
E: nil,
}
registry := NewTestEtcdRegistry(fakeClient)
services, err := registry.ListEndpoints(ctx)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(services.Items) != 2 || services.Items[0].Name != "foo" || services.Items[1].Name != "bar" {
t.Errorf("Unexpected endpoints list: %#v", services)
}
}
func TestEtcdGetEndpoints(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
endpoints := &api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Protocol: "TCP",
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 34855}},
}
key, _ := makeServiceEndpointsKey(ctx, "foo")
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, endpoints), 0)
got, err := registry.GetEndpoints(ctx, "foo")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if e, a := endpoints, got; !api.Semantic.DeepEqual(e, a) {
t.Errorf("Unexpected endpoints: %#v, expected %#v", e, a)
}
}
func TestEtcdUpdateEndpoints(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
registry := NewTestEtcdRegistry(fakeClient)
endpoints := api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Protocol: "TCP",
Endpoints: []api.Endpoint{{IP: "baz"}, {IP: "bar"}},
}
key, _ := makeServiceEndpointsKey(ctx, "foo")
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Endpoints{}), 0)
err := registry.UpdateEndpoints(ctx, &endpoints)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
response, err := fakeClient.Get(key, false, false)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
var endpointsOut api.Endpoints
err = latest.Codec.DecodeInto([]byte(response.Node.Value), &endpointsOut)
if !api.Semantic.DeepEqual(endpoints, endpointsOut) {
t.Errorf("Unexpected endpoints: %#v, expected %#v", endpointsOut, endpoints)
}
}
func TestEtcdWatchServices(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
@ -733,8 +653,8 @@ func TestEtcdWatchServicesBadSelector(t *testing.T) {
func TestEtcdWatchEndpoints(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
watching, err := registry.WatchEndpoints(
registry := NewTestEtcdRegistryWithPods(fakeClient)
watching, err := registry.endpoints.WatchEndpoints(
ctx,
labels.Everything(),
fields.SelectorFromSet(fields.Set{"name": "foo"}),
@ -762,8 +682,8 @@ func TestEtcdWatchEndpoints(t *testing.T) {
func TestEtcdWatchEndpointsAcrossNamespaces(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
watching, err := registry.WatchEndpoints(
registry := NewTestEtcdRegistryWithPods(fakeClient)
watching, err := registry.endpoints.WatchEndpoints(
ctx,
labels.Everything(),
fields.Everything(),
@ -788,31 +708,6 @@ func TestEtcdWatchEndpointsAcrossNamespaces(t *testing.T) {
watching.Stop()
}
func TestEtcdWatchEndpointsBadSelector(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
_, err := registry.WatchEndpoints(
ctx,
labels.Everything(),
fields.SelectorFromSet(fields.Set{"Field.Selector": "foo"}),
"",
)
if err == nil {
t.Errorf("unexpected non-error: %v", err)
}
_, err = registry.WatchEndpoints(
ctx,
labels.SelectorFromSet(labels.Set{"Label.Selector": "foo"}),
fields.Everything(),
"",
)
if err == nil {
t.Errorf("unexpected non-error: %v", err)
}
}
func TestEtcdListMinions(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t)

View File

@ -91,3 +91,7 @@ func (e *EndpointRegistry) UpdateEndpoints(ctx api.Context, endpoints *api.Endpo
e.Endpoints.Items = append(e.Endpoints.Items, *endpoints)
return nil
}
func (e *EndpointRegistry) DeleteEndpoints(ctx api.Context, name string) error {
return fmt.Errorf("unimplemented!")
}

View File

@ -34,8 +34,6 @@ type ServiceRegistry struct {
List api.ServiceList
Service *api.Service
Err error
Endpoints api.Endpoints
EndpointsList api.EndpointsList
DeletedID string
GottenID string
@ -114,33 +112,3 @@ func (r *ServiceRegistry) WatchServices(ctx api.Context, label labels.Selector,
return nil, r.Err
}
func (r *ServiceRegistry) ListEndpoints(ctx api.Context) (*api.EndpointsList, error) {
r.mu.Lock()
defer r.mu.Unlock()
return &r.EndpointsList, r.Err
}
func (r *ServiceRegistry) GetEndpoints(ctx api.Context, id string) (*api.Endpoints, error) {
r.mu.Lock()
defer r.mu.Unlock()
r.GottenID = id
return &r.Endpoints, r.Err
}
func (r *ServiceRegistry) UpdateEndpoints(ctx api.Context, e *api.Endpoints) error {
r.mu.Lock()
defer r.mu.Unlock()
r.Endpoints = *e
return r.Err
}
func (r *ServiceRegistry) WatchEndpoints(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) {
r.mu.Lock()
defer r.mu.Unlock()
return nil, r.Err
}

View File

@ -20,7 +20,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
@ -32,8 +31,4 @@ type Registry interface {
DeleteService(ctx api.Context, name string) error
UpdateService(ctx api.Context, svc *api.Service) (*api.Service, error)
WatchServices(ctx api.Context, labels labels.Selector, fields fields.Selector, resourceVersion string) (watch.Interface, error)
// TODO: endpoints and their implementation should be separated, setting endpoints should be
// supported via the API, and the endpoints-controller should use the API to update endpoints.
endpoint.Registry
}

View File

@ -31,6 +31,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/fielderrors"
@ -43,12 +44,13 @@ type REST struct {
registry Registry
cloud cloudprovider.Interface
machines minion.Registry
endpoints endpoint.Registry
portalMgr *ipAllocator
clusterName string
}
// NewStorage returns a new REST.
func NewStorage(registry Registry, cloud cloudprovider.Interface, machines minion.Registry, portalNet *net.IPNet,
func NewStorage(registry Registry, cloud cloudprovider.Interface, machines minion.Registry, endpoints endpoint.Registry, portalNet *net.IPNet,
clusterName string) *REST {
// TODO: Before we can replicate masters, this has to be synced (e.g. lives in etcd)
ipa := newIPAllocator(portalNet)
@ -61,6 +63,7 @@ func NewStorage(registry Registry, cloud cloudprovider.Interface, machines minio
registry: registry,
cloud: cloud,
machines: machines,
endpoints: endpoints,
portalMgr: ipa,
clusterName: clusterName,
}
@ -230,7 +233,7 @@ var _ = rest.Redirector(&REST{})
// ResourceLocation returns a URL to which one can send traffic for the specified service.
func (rs *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.RoundTripper, error) {
eps, err := rs.registry.GetEndpoints(ctx, id)
eps, err := rs.endpoints.GetEndpoints(ctx, id)
if err != nil {
return nil, nil, err
}

View File

@ -32,6 +32,18 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
)
func NewTestREST(t *testing.T, endpoints *api.EndpointsList) (*REST, *registrytest.ServiceRegistry, *cloud.FakeCloud) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
endpointRegistry := &registrytest.EndpointRegistry{
Endpoints: endpoints,
}
nodeRegistry := registrytest.NewMinionRegistry(machines, api.NodeResources{})
storage := NewStorage(registry, fakeCloud, nodeRegistry, endpointRegistry, makeIPNet(t), "kubernetes")
return storage, registry, fakeCloud
}
func makeIPNet(t *testing.T) *net.IPNet {
_, net, err := net.ParseCIDR("1.2.3.0/24")
if err != nil {
@ -41,10 +53,7 @@ func makeIPNet(t *testing.T) *net.IPNet {
}
func TestServiceRegistryCreate(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes")
storage, registry, fakeCloud := NewTestREST(t, nil)
storage.portalMgr.randomAttempts = 0
svc := &api.Service{
@ -84,8 +93,7 @@ func TestServiceRegistryCreate(t *testing.T) {
}
func TestServiceStorageValidatesCreate(t *testing.T) {
registry := registrytest.NewServiceRegistry()
storage := NewStorage(registry, nil, nil, makeIPNet(t), "kubernetes")
storage, _, _ := NewTestREST(t, nil)
failureCases := map[string]api.Service{
"empty ID": {
ObjectMeta: api.ObjectMeta{Name: ""},
@ -120,7 +128,7 @@ func TestServiceStorageValidatesCreate(t *testing.T) {
func TestServiceRegistryUpdate(t *testing.T) {
ctx := api.NewDefaultContext()
registry := registrytest.NewServiceRegistry()
storage, registry, _ := NewTestREST(t, nil)
registry.CreateService(ctx, &api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Spec: api.ServiceSpec{
@ -128,7 +136,6 @@ func TestServiceRegistryUpdate(t *testing.T) {
Selector: map[string]string{"bar": "baz1"},
},
})
storage := NewStorage(registry, nil, nil, makeIPNet(t), "kubernetes")
updated_svc, created, err := storage.Update(ctx, &api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
@ -158,7 +165,7 @@ func TestServiceRegistryUpdate(t *testing.T) {
func TestServiceStorageValidatesUpdate(t *testing.T) {
ctx := api.NewDefaultContext()
registry := registrytest.NewServiceRegistry()
storage, registry, _ := NewTestREST(t, nil)
registry.CreateService(ctx, &api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
@ -166,7 +173,6 @@ func TestServiceStorageValidatesUpdate(t *testing.T) {
Selector: map[string]string{"bar": "baz"},
},
})
storage := NewStorage(registry, nil, nil, makeIPNet(t), "kubernetes")
failureCases := map[string]api.Service{
"empty ID": {
ObjectMeta: api.ObjectMeta{Name: ""},
@ -200,10 +206,7 @@ func TestServiceStorageValidatesUpdate(t *testing.T) {
func TestServiceRegistryExternalService(t *testing.T) {
ctx := api.NewDefaultContext()
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes")
storage, registry, fakeCloud := NewTestREST(t, nil)
svc := &api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
@ -231,12 +234,8 @@ func TestServiceRegistryExternalService(t *testing.T) {
}
func TestServiceRegistryExternalServiceError(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{
Err: fmt.Errorf("test error"),
}
machines := []string{"foo", "bar", "baz"}
storage := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes")
storage, registry, fakeCloud := NewTestREST(t, nil)
fakeCloud.Err = fmt.Errorf("test error")
svc := &api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
@ -259,10 +258,7 @@ func TestServiceRegistryExternalServiceError(t *testing.T) {
func TestServiceRegistryDelete(t *testing.T) {
ctx := api.NewDefaultContext()
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes")
storage, registry, fakeCloud := NewTestREST(t, nil)
svc := &api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
@ -283,10 +279,7 @@ func TestServiceRegistryDelete(t *testing.T) {
func TestServiceRegistryDeleteExternal(t *testing.T) {
ctx := api.NewDefaultContext()
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes")
storage, registry, fakeCloud := NewTestREST(t, nil)
svc := &api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
@ -308,10 +301,7 @@ func TestServiceRegistryDeleteExternal(t *testing.T) {
func TestServiceRegistryUpdateExternalService(t *testing.T) {
ctx := api.NewDefaultContext()
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes")
storage, _, fakeCloud := NewTestREST(t, nil)
// Create non-external load balancer.
svc1 := &api.Service{
@ -352,10 +342,7 @@ func TestServiceRegistryUpdateExternalService(t *testing.T) {
func TestServiceRegistryGet(t *testing.T) {
ctx := api.NewDefaultContext()
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes")
storage, registry, fakeCloud := NewTestREST(t, nil)
registry.CreateService(ctx, &api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
@ -373,11 +360,24 @@ func TestServiceRegistryGet(t *testing.T) {
func TestServiceRegistryResourceLocation(t *testing.T) {
ctx := api.NewDefaultContext()
registry := registrytest.NewServiceRegistry()
registry.Endpoints = api.Endpoints{Endpoints: []api.Endpoint{{IP: "foo", Port: 80}}}
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes")
endpoints := &api.EndpointsList{
Items: []api.Endpoints{
{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: api.NamespaceDefault,
},
Endpoints: []api.Endpoint{
{
IP: "100.100.100.100",
Port: 80,
},
},
Protocol: api.ProtocolTCP,
},
},
}
storage, registry, _ := NewTestREST(t, endpoints)
registry.CreateService(ctx, &api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
@ -392,26 +392,19 @@ func TestServiceRegistryResourceLocation(t *testing.T) {
if location == nil {
t.Errorf("Unexpected nil: %v", location)
}
if e, a := "//foo:80", location.String(); e != a {
t.Errorf("Expected %v, but got %v", e, a)
}
if e, a := "foo", registry.GottenID; e != a {
if e, a := "//100.100.100.100:80", location.String(); e != a {
t.Errorf("Expected %v, but got %v", e, a)
}
// Test error path
registry.Err = fmt.Errorf("fake error")
if _, _, err = redirector.ResourceLocation(ctx, "foo"); err == nil {
if _, _, err = redirector.ResourceLocation(ctx, "bar"); err == nil {
t.Errorf("unexpected nil error")
}
}
func TestServiceRegistryList(t *testing.T) {
ctx := api.NewDefaultContext()
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes")
storage, registry, fakeCloud := NewTestREST(t, nil)
registry.CreateService(ctx, &api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Spec: api.ServiceSpec{
@ -445,10 +438,7 @@ func TestServiceRegistryList(t *testing.T) {
}
func TestServiceRegistryIPAllocation(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
rest := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes")
rest, _, _ := NewTestREST(t, nil)
rest.portalMgr.randomAttempts = 0
svc1 := &api.Service{
@ -507,10 +497,7 @@ func TestServiceRegistryIPAllocation(t *testing.T) {
}
func TestServiceRegistryIPReallocation(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
rest := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes")
rest, _, _ := NewTestREST(t, nil)
rest.portalMgr.randomAttempts = 0
svc1 := &api.Service{
@ -555,10 +542,7 @@ func TestServiceRegistryIPReallocation(t *testing.T) {
}
func TestServiceRegistryIPUpdate(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
rest := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes")
rest, _, _ := NewTestREST(t, nil)
rest.portalMgr.randomAttempts = 0
svc := &api.Service{
@ -601,10 +585,7 @@ func TestServiceRegistryIPUpdate(t *testing.T) {
}
func TestServiceRegistryIPExternalLoadBalancer(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
rest := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes")
rest, _, fakeCloud := NewTestREST(t, nil)
rest.portalMgr.randomAttempts = 0
svc := &api.Service{
@ -643,7 +624,9 @@ func TestServiceRegistryIPReloadFromStorage(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
rest1 := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes")
nodeRegistry := registrytest.NewMinionRegistry(machines, api.NodeResources{})
endpoints := &registrytest.EndpointRegistry{}
rest1 := NewStorage(registry, fakeCloud, nodeRegistry, endpoints, makeIPNet(t), "kubernetes")
rest1.portalMgr.randomAttempts = 0
svc := &api.Service{
@ -669,7 +652,8 @@ func TestServiceRegistryIPReloadFromStorage(t *testing.T) {
rest1.Create(ctx, svc)
// This will reload from storage, finding the previous 2
rest2 := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes")
nodeRegistry = registrytest.NewMinionRegistry(machines, api.NodeResources{})
rest2 := NewStorage(registry, fakeCloud, nodeRegistry, endpoints, makeIPNet(t), "kubernetes")
rest2.portalMgr.randomAttempts = 0
svc = &api.Service{
@ -726,10 +710,7 @@ func TestUpdateServiceWithConflictingNamespace(t *testing.T) {
}
func TestCreate(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
rest := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes")
rest, registry, _ := NewTestREST(t, nil)
rest.portalMgr.randomAttempts = 0
test := resttest.New(t, rest, registry.SetError)

View File

@ -247,7 +247,7 @@ func getTestRequests() []struct {
{"PUT", "/api/v1beta1/endpoints/a" + timeoutFlag, aEndpoints, code200},
{"GET", "/api/v1beta1/endpoints", "", code200},
{"GET", "/api/v1beta1/endpoints/a", "", code200},
{"DELETE", "/api/v1beta1/endpoints/a" + timeoutFlag, "", code405},
{"DELETE", "/api/v1beta1/endpoints/a" + timeoutFlag, "", code200},
// Normal methods on minions
{"GET", "/api/v1beta1/minions", "", code200},