From 410e11c30590c63f1d745871c3468aa06c3286cb Mon Sep 17 00:00:00 2001 From: Masahiro Sano Date: Sun, 15 Mar 2015 15:03:46 +0900 Subject: [PATCH] port endpoints to generic etcd --- pkg/api/validation/validation.go | 23 +- pkg/master/master.go | 11 +- pkg/registry/endpoint/etcd/etcd.go | 62 +++++ pkg/registry/endpoint/etcd/etcd_test.go | 299 ++++++++++++++++++++++++ pkg/registry/endpoint/registry.go | 43 ++++ pkg/registry/endpoint/rest.go | 125 +++++----- pkg/registry/endpoint/rest_test.go | 99 -------- pkg/registry/etcd/etcd.go | 86 +------ pkg/registry/etcd/etcd_test.go | 127 +--------- pkg/registry/registrytest/endpoint.go | 4 + pkg/registry/registrytest/service.go | 40 +--- pkg/registry/service/registry.go | 5 - pkg/registry/service/rest.go | 7 +- pkg/registry/service/rest_test.go | 127 +++++----- test/integration/auth_test.go | 2 +- 15 files changed, 573 insertions(+), 487 deletions(-) create mode 100644 pkg/registry/endpoint/etcd/etcd.go create mode 100644 pkg/registry/endpoint/etcd/etcd_test.go delete mode 100644 pkg/registry/endpoint/rest_test.go diff --git a/pkg/api/validation/validation.go b/pkg/api/validation/validation.go index 5a6459b7c85..52031cb035b 100644 --- a/pkg/api/validation/validation.go +++ b/pkg/api/validation/validation.go @@ -151,6 +151,13 @@ func ValidateSecretName(name string, prefix bool) (bool, string) { return nameIsDNSSubdomain(name, prefix) } +// ValidateEndpointsName can be used to check whether the given endpoints name is valid. +// Prefix indicates this name will be used as part of generation, in which case +// trailing dashes are allowed. +func ValidateEndpointsName(name string, prefix bool) (bool, string) { + return nameIsDNSSubdomain(name, prefix) +} + // nameIsDNSSubdomain is a ValidateNameFunc for names that must be a DNS subdomain. func nameIsDNSSubdomain(name string, prefix bool) (bool, string) { if prefix { @@ -1047,7 +1054,7 @@ func validateFinalizerName(stringValue string) errs.ValidationErrorList { return errs.ValidationErrorList{} } -// ValidateNamespaceUpdate tests to make sure a mamespace update can be applied. Modifies oldNamespace. +// ValidateNamespaceUpdate tests to make sure a namespace update can be applied. Modifies oldNamespace. func ValidateNamespaceUpdate(oldNamespace *api.Namespace, namespace *api.Namespace) errs.ValidationErrorList { allErrs := errs.ValidationErrorList{} allErrs = append(allErrs, ValidateObjectMetaUpdate(&oldNamespace.ObjectMeta, &namespace.ObjectMeta).Prefix("metadata")...) @@ -1087,3 +1094,17 @@ func ValidateNamespaceFinalizeUpdate(newNamespace, oldNamespace *api.Namespace) fmt.Printf("NEW NAMESPACE FINALIZERS : %v\n", newNamespace.Spec.Finalizers) return allErrs } + +// ValidateEndpoints tests if required fields are set. +func ValidateEndpoints(endpoints *api.Endpoints) errs.ValidationErrorList { + allErrs := errs.ValidationErrorList{} + allErrs = append(allErrs, ValidateObjectMeta(&endpoints.ObjectMeta, true, ValidateEndpointsName).Prefix("metadata")...) + return allErrs +} + +// ValidateEndpointsUpdate tests to make sure an endpoints update can be applied. +func ValidateEndpointsUpdate(oldEndpoints *api.Endpoints, endpoints *api.Endpoints) errs.ValidationErrorList { + allErrs := errs.ValidationErrorList{} + allErrs = append(allErrs, ValidateObjectMetaUpdate(&oldEndpoints.ObjectMeta, &endpoints.ObjectMeta).Prefix("metadata")...) + return allErrs +} diff --git a/pkg/master/master.go b/pkg/master/master.go index fcd1898f196..8ec6f7436cf 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -44,6 +44,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" controlleretcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint" + endpointsetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/event" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/limitrange" @@ -361,11 +362,13 @@ func (m *Master) init(c *Config) { namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewStorage(c.EtcdHelper) m.namespaceRegistry = namespace.NewRegistry(namespaceStorage) + endpointsStorage := endpointsetcd.NewStorage(c.EtcdHelper) + m.endpointRegistry = endpoint.NewRegistry(endpointsStorage) + // TODO: split me up into distinct storage registries - registry := etcd.NewRegistry(c.EtcdHelper, podRegistry) + registry := etcd.NewRegistry(c.EtcdHelper, podRegistry, m.endpointRegistry) m.serviceRegistry = registry - m.endpointRegistry = registry m.nodeRegistry = registry nodeStorage := minion.NewStorage(m.nodeRegistry, c.KubeletClient) @@ -394,8 +397,8 @@ func (m *Master) init(c *Config) { "bindings": bindingStorage, "replicationControllers": controllerStorage, - "services": service.NewStorage(m.serviceRegistry, c.Cloud, m.nodeRegistry, m.portalNet, c.ClusterName), - "endpoints": endpoint.NewStorage(m.endpointRegistry), + "services": service.NewStorage(m.serviceRegistry, c.Cloud, m.nodeRegistry, m.endpointRegistry, m.portalNet, c.ClusterName), + "endpoints": endpointsStorage, "minions": nodeStorage, "nodes": nodeStorage, "events": event.NewStorage(eventRegistry), diff --git a/pkg/registry/endpoint/etcd/etcd.go b/pkg/registry/endpoint/etcd/etcd.go new file mode 100644 index 00000000000..8094767165f --- /dev/null +++ b/pkg/registry/endpoint/etcd/etcd.go @@ -0,0 +1,62 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" + etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" +) + +// rest implements a RESTStorage for endpoints against etcd +type REST struct { + *etcdgeneric.Etcd +} + +// NewStorage returns a RESTStorage object that will work against endpoints. +func NewStorage(h tools.EtcdHelper) *REST { + prefix := "/registry/services/endpoints" + return &REST{ + &etcdgeneric.Etcd{ + NewFunc: func() runtime.Object { return &api.Endpoints{} }, + NewListFunc: func() runtime.Object { return &api.EndpointsList{} }, + KeyRootFunc: func(ctx api.Context) string { + return etcdgeneric.NamespaceKeyRootFunc(ctx, prefix) + }, + KeyFunc: func(ctx api.Context, name string) (string, error) { + return etcdgeneric.NamespaceKeyFunc(ctx, prefix, name) + }, + ObjectNameFunc: func(obj runtime.Object) (string, error) { + return obj.(*api.Endpoints).Name, nil + }, + PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher { + return endpoint.MatchEndpoints(label, field) + }, + EndpointName: "endpoints", + + CreateStrategy: endpoint.Strategy, + UpdateStrategy: endpoint.Strategy, + + Helper: h, + }, + } +} diff --git a/pkg/registry/endpoint/etcd/etcd_test.go b/pkg/registry/endpoint/etcd/etcd_test.go new file mode 100644 index 00000000000..407ad31d504 --- /dev/null +++ b/pkg/registry/endpoint/etcd/etcd_test.go @@ -0,0 +1,299 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd + +import ( + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest/resttest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + + "github.com/coreos/go-etcd/etcd" +) + +func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) { + fakeEtcdClient := tools.NewFakeEtcdClient(t) + fakeEtcdClient.TestIndex = true + helper := tools.NewEtcdHelper(fakeEtcdClient, latest.Codec) + return fakeEtcdClient, helper +} + +func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) { + fakeEtcdClient, h := newHelper(t) + storage := NewStorage(h) + return storage, fakeEtcdClient +} + +func validNewEndpoints() *api.Endpoints { + return &api.Endpoints{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: api.NamespaceDefault, + }, + Protocol: "TCP", + Endpoints: []api.Endpoint{{IP: "baz"}}, + } +} + +func validChangedEndpoints() *api.Endpoints { + endpoints := validNewEndpoints() + endpoints.ResourceVersion = "1" + endpoints.Endpoints = []api.Endpoint{{IP: "baz"}, {IP: "bar"}} + return endpoints +} + +func TestCreate(t *testing.T) { + storage, fakeEtcdClient := newStorage(t) + test := resttest.New(t, storage, fakeEtcdClient.SetError) + endpoints := validNewEndpoints() + endpoints.ObjectMeta = api.ObjectMeta{} + test.TestCreate( + // valid + endpoints, + // invalid + &api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: "_-a123-a_"}, + }, + ) +} + +func TestDelete(t *testing.T) { + ctx := api.NewDefaultContext() + storage, fakeEtcdClient := newStorage(t) + test := resttest.New(t, storage, fakeEtcdClient.SetError) + + endpoints := validChangedEndpoints() + key, _ := storage.KeyFunc(ctx, endpoints.Name) + createFn := func() runtime.Object { + fakeEtcdClient.Data[key] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(latest.Codec, endpoints), + ModifiedIndex: 1, + }, + }, + } + return endpoints + } + gracefulSetFn := func() bool { + if fakeEtcdClient.Data[key].R.Node == nil { + return false + } + return fakeEtcdClient.Data[key].R.Node.TTL == 30 + } + test.TestDeleteNoGraceful(createFn, gracefulSetFn) +} + +func TestEtcdListEndpoints(t *testing.T) { + ctx := api.NewDefaultContext() + storage, fakeClient := newStorage(t) + key := storage.KeyRootFunc(ctx) + fakeClient.Data[key] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Nodes: []*etcd.Node{ + { + Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Protocol: "TCP", Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 8345}}}), + }, + { + Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar"}, Protocol: "TCP"}), + }, + }, + }, + }, + E: nil, + } + + endpointsObj, err := storage.List(ctx, labels.Everything(), fields.Everything()) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + endpoints := endpointsObj.(*api.EndpointsList) + + if len(endpoints.Items) != 2 || endpoints.Items[0].Name != "foo" || endpoints.Items[1].Name != "bar" { + t.Errorf("Unexpected endpoints list: %#v", endpoints) + } +} + +func TestEtcdGetEndpoints(t *testing.T) { + ctx := api.NewDefaultContext() + storage, fakeClient := newStorage(t) + endpoints := validNewEndpoints() + name := endpoints.Name + key, _ := storage.KeyFunc(ctx, name) + fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, endpoints), 0) + + response, err := fakeClient.Get(key, false, false) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + var endpointsOut api.Endpoints + err = latest.Codec.DecodeInto([]byte(response.Node.Value), &endpointsOut) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + obj, err := storage.Get(ctx, name) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + got := obj.(*api.Endpoints) + + endpoints.ObjectMeta.ResourceVersion = got.ObjectMeta.ResourceVersion + if e, a := endpoints, got; !api.Semantic.DeepEqual(*e, *a) { + t.Errorf("Unexpected endpoints: %#v, expected %#v", e, a) + } +} + +func TestListEmptyEndpointsList(t *testing.T) { + ctx := api.NewDefaultContext() + storage, fakeClient := newStorage(t) + fakeClient.ChangeIndex = 1 + key := storage.KeyRootFunc(ctx) + fakeClient.Data[key] = tools.EtcdResponseWithError{ + R: &etcd.Response{}, + E: fakeClient.NewError(tools.EtcdErrorCodeNotFound), + } + + endpoints, err := storage.List(ctx, labels.Everything(), fields.Everything()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(endpoints.(*api.EndpointsList).Items) != 0 { + t.Errorf("Unexpected non-zero pod list: %#v", endpoints) + } + if endpoints.(*api.EndpointsList).ResourceVersion != "1" { + t.Errorf("Unexpected resource version: %#v", endpoints) + } +} + +func TestListEndpointsList(t *testing.T) { + ctx := api.NewDefaultContext() + storage, fakeClient := newStorage(t) + fakeClient.ChangeIndex = 1 + key := storage.KeyRootFunc(ctx) + fakeClient.Data[key] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Nodes: []*etcd.Node{ + { + Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + }), + }, + { + Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: "bar"}, + }), + }, + }, + }, + }, + } + + endpointsObj, err := storage.List(ctx, labels.Everything(), fields.Everything()) + endpoints := endpointsObj.(*api.EndpointsList) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(endpoints.Items) != 2 { + t.Errorf("Unexpected endpoints list: %#v", endpoints) + } + if endpoints.Items[0].Name != "foo" { + t.Errorf("Unexpected endpoints: %#v", endpoints.Items[0]) + } + if endpoints.Items[1].Name != "bar" { + t.Errorf("Unexpected endpoints: %#v", endpoints.Items[1]) + } +} + +func TestEndpointsDecode(t *testing.T) { + storage, _ := newStorage(t) + expected := validNewEndpoints() + body, err := latest.Codec.Encode(expected) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + actual := storage.New() + if err := latest.Codec.DecodeInto(body, actual); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if !api.Semantic.DeepEqual(expected, actual) { + t.Errorf("mismatch: %s", util.ObjectDiff(expected, actual)) + } +} + +func TestEtcdUpdateEndpoints(t *testing.T) { + ctx := api.NewDefaultContext() + storage, fakeClient := newStorage(t) + endpoints := validChangedEndpoints() + + key, _ := storage.KeyFunc(ctx, "foo") + fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, validNewEndpoints()), 0) + + _, _, err := storage.Update(ctx, endpoints) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + response, err := fakeClient.Get(key, false, false) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + var endpointsOut api.Endpoints + err = latest.Codec.DecodeInto([]byte(response.Node.Value), &endpointsOut) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + endpoints.ObjectMeta.ResourceVersion = endpointsOut.ObjectMeta.ResourceVersion + if !api.Semantic.DeepEqual(endpoints, &endpointsOut) { + t.Errorf("Unexpected endpoints: %#v, expected %#v", &endpointsOut, endpoints) + } +} + +func TestDeleteEndpoints(t *testing.T) { + ctx := api.NewDefaultContext() + storage, fakeClient := newStorage(t) + endpoints := validNewEndpoints() + name := endpoints.Name + key, _ := storage.KeyFunc(ctx, name) + fakeClient.ChangeIndex = 1 + fakeClient.Data[key] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(latest.Codec, endpoints), + ModifiedIndex: 1, + CreatedIndex: 1, + }, + }, + } + _, err := storage.Delete(ctx, name, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } +} diff --git a/pkg/registry/endpoint/registry.go b/pkg/registry/endpoint/registry.go index c36b023e77f..adf9a0a112b 100644 --- a/pkg/registry/endpoint/registry.go +++ b/pkg/registry/endpoint/registry.go @@ -18,6 +18,7 @@ package endpoint import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -29,4 +30,46 @@ type Registry interface { GetEndpoints(ctx api.Context, name string) (*api.Endpoints, error) WatchEndpoints(ctx api.Context, labels labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) UpdateEndpoints(ctx api.Context, e *api.Endpoints) error + DeleteEndpoints(ctx api.Context, name string) error +} + +// storage puts strong typing around storage calls +type storage struct { + rest.StandardStorage +} + +// NewRegistry returns a new Registry interface for the given Storage. Any mismatched +// types will panic. +func NewRegistry(s rest.StandardStorage) Registry { + return &storage{s} +} + +func (s *storage) ListEndpoints(ctx api.Context) (*api.EndpointsList, error) { + obj, err := s.List(ctx, labels.Everything(), fields.Everything()) + if err != nil { + return nil, err + } + return obj.(*api.EndpointsList), nil +} + +func (s *storage) WatchEndpoints(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { + return s.Watch(ctx, label, field, resourceVersion) +} + +func (s *storage) GetEndpoints(ctx api.Context, name string) (*api.Endpoints, error) { + obj, err := s.Get(ctx, name) + if err != nil { + return nil, err + } + return obj.(*api.Endpoints), nil +} + +func (s *storage) UpdateEndpoints(ctx api.Context, endpoints *api.Endpoints) error { + _, _, err := s.Update(ctx, endpoints) + return err +} + +func (s *storage) DeleteEndpoints(ctx api.Context, name string) error { + _, err := s.Delete(ctx, name, nil) + return err } diff --git a/pkg/registry/endpoint/rest.go b/pkg/registry/endpoint/rest.go index ae7452fcb12..b41b72709e1 100644 --- a/pkg/registry/endpoint/rest.go +++ b/pkg/registry/endpoint/rest.go @@ -20,84 +20,65 @@ import ( "fmt" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/fielderrors" ) -// REST adapts endpoints into apiserver's RESTStorage model. -type REST struct { - registry Registry +// endpointsStrategy implements behavior for Endpoints +type endpointsStrategy struct { + runtime.ObjectTyper + api.NameGenerator } -// NewStorage returns a new rest.Storage implementation for endpoints -func NewStorage(registry Registry) *REST { - return &REST{ - registry: registry, +// Strategy is the default logic that applies when creating and updating Endpoint +// objects via the REST API. +var Strategy = endpointsStrategy{api.Scheme, api.SimpleNameGenerator} + +// NamespaceScoped is true for endpoints. +func (endpointsStrategy) NamespaceScoped() bool { + return true +} + +// ResetBeforeCreate clears fields that are not allowed to be set by end users on creation. +func (endpointsStrategy) ResetBeforeCreate(obj runtime.Object) { + _ = obj.(*api.Endpoints) +} + +// Validate validates a new endpoints. +func (endpointsStrategy) Validate(obj runtime.Object) fielderrors.ValidationErrorList { + return validation.ValidateEndpoints(obj.(*api.Endpoints)) +} + +// AllowCreateOnUpdate is true for endpoints. +func (endpointsStrategy) AllowCreateOnUpdate() bool { + return true +} + +// ValidateUpdate is the default update validation for an end user. +func (endpointsStrategy) ValidateUpdate(obj, old runtime.Object) fielderrors.ValidationErrorList { + return validation.ValidateEndpointsUpdate(old.(*api.Endpoints), obj.(*api.Endpoints)) +} + +// MatchEndpoints returns a generic matcher for a given label and field selector. +func MatchEndpoints(label labels.Selector, field fields.Selector) generic.Matcher { + return generic.MatcherFunc(func(obj runtime.Object) (bool, error) { + endpoints, ok := obj.(*api.Endpoints) + if !ok { + return false, fmt.Errorf("not a endpoints") + } + fields := EndpointsToSelectableFields(endpoints) + return label.Matches(labels.Set(endpoints.Labels)) && field.Matches(fields), nil + }) +} + +// EndpointsToSelectableFields returns a label set that represents the object +// TODO: fields are not labels, and the validation rules for them do not apply. +func EndpointsToSelectableFields(endpoints *api.Endpoints) labels.Set { + return labels.Set{ + "name": endpoints.Name, } } - -// Get satisfies the RESTStorage interface. -func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { - return rs.registry.GetEndpoints(ctx, id) -} - -// List satisfies the RESTStorage interface. -func (rs *REST) List(ctx api.Context, label labels.Selector, field fields.Selector) (runtime.Object, error) { - if !label.Empty() || !field.Empty() { - return nil, errors.NewBadRequest("label/field selectors are not supported on endpoints") - } - return rs.registry.ListEndpoints(ctx) -} - -// Watch returns Endpoint events via a watch.Interface. -// It implements rest.Watcher. -func (rs *REST) Watch(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { - return rs.registry.WatchEndpoints(ctx, label, field, resourceVersion) -} - -// Create satisfies the RESTStorage interface. -func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { - endpoints, ok := obj.(*api.Endpoints) - if !ok { - return nil, fmt.Errorf("not an endpoints: %#v", obj) - } - if len(endpoints.Name) == 0 { - return nil, fmt.Errorf("id is required: %#v", obj) - } - if !api.ValidNamespace(ctx, &endpoints.ObjectMeta) { - return nil, errors.NewConflict("endpoints", endpoints.Namespace, fmt.Errorf("Endpoints.Namespace does not match the provided context")) - } - api.FillObjectMetaSystemFields(ctx, &endpoints.ObjectMeta) - - err := rs.registry.UpdateEndpoints(ctx, endpoints) - if err != nil { - return nil, err - } - return rs.registry.GetEndpoints(ctx, endpoints.Name) -} - -// Update satisfies the RESTStorage interface. -func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) { - endpoints, ok := obj.(*api.Endpoints) - if !ok { - return nil, false, fmt.Errorf("not an endpoints: %#v", obj) - } - err := rs.registry.UpdateEndpoints(ctx, endpoints) - if err != nil { - return nil, false, err - } - out, err := rs.registry.GetEndpoints(ctx, endpoints.Name) - return out, false, err -} - -// New implements the RESTStorage interface. -func (rs REST) New() runtime.Object { - return &api.Endpoints{} -} - -func (*REST) NewList() runtime.Object { - return &api.EndpointsList{} -} diff --git a/pkg/registry/endpoint/rest_test.go b/pkg/registry/endpoint/rest_test.go deleted file mode 100644 index 445463fe8a9..00000000000 --- a/pkg/registry/endpoint/rest_test.go +++ /dev/null @@ -1,99 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package endpoint - -import ( - "reflect" - "testing" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" - "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" - "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" -) - -func TestGetEndpoints(t *testing.T) { - registry := ®istrytest.ServiceRegistry{ - Endpoints: api.Endpoints{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}, - }, - } - storage := NewStorage(registry) - ctx := api.NewContext() - obj, err := storage.Get(ctx, "foo") - if err != nil { - t.Fatalf("unexpected error: %#v", err) - } - if !reflect.DeepEqual([]api.Endpoint{{IP: "127.0.0.1", Port: 9000}}, obj.(*api.Endpoints).Endpoints) { - t.Errorf("unexpected endpoints: %#v", obj) - } -} - -func TestGetEndpointsMissingService(t *testing.T) { - registry := ®istrytest.ServiceRegistry{ - Err: errors.NewNotFound("service", "foo"), - } - storage := NewStorage(registry) - ctx := api.NewContext() - // returns service not found - _, err := storage.Get(ctx, "foo") - if !errors.IsNotFound(err) || !reflect.DeepEqual(err, errors.NewNotFound("service", "foo")) { - t.Errorf("expected NotFound error, got %#v", err) - } - - // returns empty endpoints - registry.Err = nil - registry.Service = &api.Service{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, - } - obj, err := storage.Get(ctx, "foo") - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if obj.(*api.Endpoints).Endpoints != nil { - t.Errorf("unexpected endpoints: %#v", obj) - } -} - -func TestEndpointsRegistryList(t *testing.T) { - registry := registrytest.NewServiceRegistry() - storage := NewStorage(registry) - registry.EndpointsList = api.EndpointsList{ - ListMeta: api.ListMeta{ResourceVersion: "1"}, - Items: []api.Endpoints{ - {ObjectMeta: api.ObjectMeta{Name: "foo"}}, - {ObjectMeta: api.ObjectMeta{Name: "bar"}}, - }, - } - ctx := api.NewContext() - s, _ := storage.List(ctx, labels.Everything(), fields.Everything()) - sl := s.(*api.EndpointsList) - if len(sl.Items) != 2 { - t.Fatalf("Expected 2 endpoints, but got %v", len(sl.Items)) - } - if e, a := "foo", sl.Items[0].Name; e != a { - t.Errorf("Expected %v, but got %v", e, a) - } - if e, a := "bar", sl.Items[1].Name; e != a { - t.Errorf("Expected %v, but got %v", e, a) - } - if sl.ResourceVersion != "1" { - t.Errorf("Unexpected resource version: %#v", sl) - } -} diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index be03bea3f52..eb6dd4c49b5 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -20,9 +20,11 @@ import ( "fmt" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" @@ -36,8 +38,6 @@ const ( ControllerPath string = "/registry/controllers" // ServicePath is the path to service resources in etcd ServicePath string = "/registry/services/specs" - // ServiceEndpointPath is the path to service endpoints resources in etcd - ServiceEndpointPath string = "/registry/services/endpoints" // NodePath is the path to node resources in etcd NodePath string = "/registry/minions" ) @@ -49,14 +49,16 @@ const ( // MinionRegistry, PodRegistry and ServiceRegistry, backed by etcd. type Registry struct { tools.EtcdHelper - pods pod.Registry + pods pod.Registry + endpoints endpoint.Registry } // NewRegistry creates an etcd registry. -func NewRegistry(helper tools.EtcdHelper, pods pod.Registry) *Registry { +func NewRegistry(helper tools.EtcdHelper, pods pod.Registry, endpoints endpoint.Registry) *Registry { registry := &Registry{ EtcdHelper: helper, pods: pods, + endpoints: endpoints, } return registry } @@ -226,30 +228,6 @@ func (r *Registry) GetService(ctx api.Context, name string) (*api.Service, error return &svc, nil } -// GetEndpoints obtains the endpoints for the service identified by 'name'. -func (r *Registry) GetEndpoints(ctx api.Context, name string) (*api.Endpoints, error) { - var endpoints api.Endpoints - key, err := makeServiceEndpointsKey(ctx, name) - if err != nil { - return nil, err - } - err = r.ExtractObj(key, &endpoints, false) - if err != nil { - return nil, etcderr.InterpretGetError(err, "endpoints", name) - } - return &endpoints, nil -} - -// makeServiceEndpointsListKey constructs etcd paths to service endpoint directories enforcing namespace rules. -func makeServiceEndpointsListKey(ctx api.Context) string { - return MakeEtcdListKey(ctx, ServiceEndpointPath) -} - -// makeServiceEndpointsListKey constructs etcd paths to service endpoint items enforcing namespace rules. -func makeServiceEndpointsKey(ctx api.Context, name string) (string, error) { - return MakeEtcdItemKey(ctx, ServiceEndpointPath, name) -} - // DeleteService deletes a Service specified by its name. func (r *Registry) DeleteService(ctx api.Context, name string) error { key, err := makeServiceKey(ctx, name) @@ -263,13 +241,10 @@ func (r *Registry) DeleteService(ctx api.Context, name string) error { // TODO: can leave dangling endpoints, and potentially return incorrect // endpoints if a new service is created with the same name - key, err = makeServiceEndpointsKey(ctx, name) - if err != nil { + err = r.endpoints.DeleteEndpoints(ctx, name) + if err != nil && !errors.IsNotFound(err) { return err } - if err := r.Delete(key, true); err != nil && !tools.IsEtcdNotFound(err) { - return etcderr.InterpretDeleteError(err, "endpoints", name) - } return nil } @@ -306,51 +281,6 @@ func (r *Registry) WatchServices(ctx api.Context, label labels.Selector, field f return nil, fmt.Errorf("only the 'name' and default (everything) field selectors are supported") } -// ListEndpoints obtains a list of Services. -func (r *Registry) ListEndpoints(ctx api.Context) (*api.EndpointsList, error) { - list := &api.EndpointsList{} - key := makeServiceEndpointsListKey(ctx) - err := r.ExtractToList(key, list) - return list, err -} - -// UpdateEndpoints update Endpoints of a Service. -func (r *Registry) UpdateEndpoints(ctx api.Context, endpoints *api.Endpoints) error { - key, err := makeServiceEndpointsKey(ctx, endpoints.Name) - if err != nil { - return err - } - // TODO: this is a really bad misuse of AtomicUpdate, need to compute a diff inside the loop. - err = r.AtomicUpdate(key, &api.Endpoints{}, true, - func(input runtime.Object) (runtime.Object, uint64, error) { - // TODO: racy - label query is returning different results for two simultaneous updaters - return endpoints, 0, nil - }) - return etcderr.InterpretUpdateError(err, "endpoints", endpoints.Name) -} - -// WatchEndpoints begins watching for new, changed, or deleted endpoint configurations. -func (r *Registry) WatchEndpoints(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { - version, err := tools.ParseWatchResourceVersion(resourceVersion, "endpoints") - if err != nil { - return nil, err - } - if !label.Empty() { - return nil, fmt.Errorf("label selectors are not supported on endpoints") - } - if value, found := field.RequiresExactMatch("name"); found { - key, err := makeServiceEndpointsKey(ctx, value) - if err != nil { - return nil, err - } - return r.Watch(key, version), nil - } - if field.Empty() { - return r.WatchList(makeServiceEndpointsListKey(ctx), version, tools.Everything) - } - return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported") -} - func makeNodeKey(nodeID string) string { return NodePath + "/" + nodeID } diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index 8abba70bdc3..47ef8010145 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -26,6 +26,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint" + endpointetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint/etcd" etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" podetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod/etcd" @@ -36,14 +38,15 @@ import ( ) func NewTestEtcdRegistry(client tools.EtcdClient) *Registry { - registry := NewRegistry(tools.NewEtcdHelper(client, latest.Codec), nil) + registry := NewRegistry(tools.NewEtcdHelper(client, latest.Codec), nil, nil) return registry } func NewTestEtcdRegistryWithPods(client tools.EtcdClient) *Registry { helper := tools.NewEtcdHelper(client, latest.Codec) podStorage, _, _ := podetcd.NewStorage(helper) - registry := NewRegistry(helper, pod.NewRegistry(podStorage)) + endpointStorage := endpointetcd.NewStorage(helper) + registry := NewRegistry(helper, pod.NewRegistry(podStorage), endpoint.NewRegistry(endpointStorage)) return registry } @@ -532,10 +535,10 @@ func TestEtcdGetServiceNotFound(t *testing.T) { func TestEtcdDeleteService(t *testing.T) { ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) - registry := NewTestEtcdRegistry(fakeClient) + registry := NewTestEtcdRegistryWithPods(fakeClient) key, _ := makeServiceKey(ctx, "foo") fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0) - endpointsKey, _ := makeServiceEndpointsKey(ctx, "foo") + endpointsKey, _ := etcdgeneric.NamespaceKeyFunc(ctx, "/registry/services/endpoints", "foo") fakeClient.Set(endpointsKey, runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Protocol: "TCP"}), 0) err := registry.DeleteService(ctx, "foo") @@ -594,89 +597,6 @@ func TestEtcdUpdateService(t *testing.T) { } } -func TestEtcdListEndpoints(t *testing.T) { - ctx := api.NewDefaultContext() - fakeClient := tools.NewFakeEtcdClient(t) - key := makeServiceEndpointsListKey(ctx) - fakeClient.Data[key] = tools.EtcdResponseWithError{ - R: &etcd.Response{ - Node: &etcd.Node{ - Nodes: []*etcd.Node{ - { - Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Protocol: "TCP", Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 8345}}}), - }, - { - Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar"}, Protocol: "TCP"}), - }, - }, - }, - }, - E: nil, - } - registry := NewTestEtcdRegistry(fakeClient) - services, err := registry.ListEndpoints(ctx) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - - if len(services.Items) != 2 || services.Items[0].Name != "foo" || services.Items[1].Name != "bar" { - t.Errorf("Unexpected endpoints list: %#v", services) - } -} - -func TestEtcdGetEndpoints(t *testing.T) { - ctx := api.NewDefaultContext() - fakeClient := tools.NewFakeEtcdClient(t) - registry := NewTestEtcdRegistry(fakeClient) - endpoints := &api.Endpoints{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, - Protocol: "TCP", - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 34855}}, - } - - key, _ := makeServiceEndpointsKey(ctx, "foo") - fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, endpoints), 0) - - got, err := registry.GetEndpoints(ctx, "foo") - if err != nil { - t.Errorf("unexpected error: %v", err) - } - - if e, a := endpoints, got; !api.Semantic.DeepEqual(e, a) { - t.Errorf("Unexpected endpoints: %#v, expected %#v", e, a) - } -} - -func TestEtcdUpdateEndpoints(t *testing.T) { - ctx := api.NewDefaultContext() - fakeClient := tools.NewFakeEtcdClient(t) - fakeClient.TestIndex = true - registry := NewTestEtcdRegistry(fakeClient) - endpoints := api.Endpoints{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, - Protocol: "TCP", - Endpoints: []api.Endpoint{{IP: "baz"}, {IP: "bar"}}, - } - - key, _ := makeServiceEndpointsKey(ctx, "foo") - fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Endpoints{}), 0) - - err := registry.UpdateEndpoints(ctx, &endpoints) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - - response, err := fakeClient.Get(key, false, false) - if err != nil { - t.Fatalf("Unexpected error %v", err) - } - var endpointsOut api.Endpoints - err = latest.Codec.DecodeInto([]byte(response.Node.Value), &endpointsOut) - if !api.Semantic.DeepEqual(endpoints, endpointsOut) { - t.Errorf("Unexpected endpoints: %#v, expected %#v", endpointsOut, endpoints) - } -} - func TestEtcdWatchServices(t *testing.T) { ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) @@ -733,8 +653,8 @@ func TestEtcdWatchServicesBadSelector(t *testing.T) { func TestEtcdWatchEndpoints(t *testing.T) { ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) - registry := NewTestEtcdRegistry(fakeClient) - watching, err := registry.WatchEndpoints( + registry := NewTestEtcdRegistryWithPods(fakeClient) + watching, err := registry.endpoints.WatchEndpoints( ctx, labels.Everything(), fields.SelectorFromSet(fields.Set{"name": "foo"}), @@ -762,8 +682,8 @@ func TestEtcdWatchEndpoints(t *testing.T) { func TestEtcdWatchEndpointsAcrossNamespaces(t *testing.T) { ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) - registry := NewTestEtcdRegistry(fakeClient) - watching, err := registry.WatchEndpoints( + registry := NewTestEtcdRegistryWithPods(fakeClient) + watching, err := registry.endpoints.WatchEndpoints( ctx, labels.Everything(), fields.Everything(), @@ -788,31 +708,6 @@ func TestEtcdWatchEndpointsAcrossNamespaces(t *testing.T) { watching.Stop() } -func TestEtcdWatchEndpointsBadSelector(t *testing.T) { - ctx := api.NewContext() - fakeClient := tools.NewFakeEtcdClient(t) - registry := NewTestEtcdRegistry(fakeClient) - _, err := registry.WatchEndpoints( - ctx, - labels.Everything(), - fields.SelectorFromSet(fields.Set{"Field.Selector": "foo"}), - "", - ) - if err == nil { - t.Errorf("unexpected non-error: %v", err) - } - - _, err = registry.WatchEndpoints( - ctx, - labels.SelectorFromSet(labels.Set{"Label.Selector": "foo"}), - fields.Everything(), - "", - ) - if err == nil { - t.Errorf("unexpected non-error: %v", err) - } -} - func TestEtcdListMinions(t *testing.T) { ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) diff --git a/pkg/registry/registrytest/endpoint.go b/pkg/registry/registrytest/endpoint.go index 1d94ee2b58f..0e4a9dd242b 100644 --- a/pkg/registry/registrytest/endpoint.go +++ b/pkg/registry/registrytest/endpoint.go @@ -91,3 +91,7 @@ func (e *EndpointRegistry) UpdateEndpoints(ctx api.Context, endpoints *api.Endpo e.Endpoints.Items = append(e.Endpoints.Items, *endpoints) return nil } + +func (e *EndpointRegistry) DeleteEndpoints(ctx api.Context, name string) error { + return fmt.Errorf("unimplemented!") +} diff --git a/pkg/registry/registrytest/service.go b/pkg/registry/registrytest/service.go index ecf4af03d05..7f2d814e46b 100644 --- a/pkg/registry/registrytest/service.go +++ b/pkg/registry/registrytest/service.go @@ -30,12 +30,10 @@ func NewServiceRegistry() *ServiceRegistry { } type ServiceRegistry struct { - mu sync.Mutex - List api.ServiceList - Service *api.Service - Err error - Endpoints api.Endpoints - EndpointsList api.EndpointsList + mu sync.Mutex + List api.ServiceList + Service *api.Service + Err error DeletedID string GottenID string @@ -114,33 +112,3 @@ func (r *ServiceRegistry) WatchServices(ctx api.Context, label labels.Selector, return nil, r.Err } - -func (r *ServiceRegistry) ListEndpoints(ctx api.Context) (*api.EndpointsList, error) { - r.mu.Lock() - defer r.mu.Unlock() - - return &r.EndpointsList, r.Err -} - -func (r *ServiceRegistry) GetEndpoints(ctx api.Context, id string) (*api.Endpoints, error) { - r.mu.Lock() - defer r.mu.Unlock() - - r.GottenID = id - return &r.Endpoints, r.Err -} - -func (r *ServiceRegistry) UpdateEndpoints(ctx api.Context, e *api.Endpoints) error { - r.mu.Lock() - defer r.mu.Unlock() - - r.Endpoints = *e - return r.Err -} - -func (r *ServiceRegistry) WatchEndpoints(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { - r.mu.Lock() - defer r.mu.Unlock() - - return nil, r.Err -} diff --git a/pkg/registry/service/registry.go b/pkg/registry/service/registry.go index 2cca93568d8..e6b16ce67e1 100644 --- a/pkg/registry/service/registry.go +++ b/pkg/registry/service/registry.go @@ -20,7 +20,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) @@ -32,8 +31,4 @@ type Registry interface { DeleteService(ctx api.Context, name string) error UpdateService(ctx api.Context, svc *api.Service) (*api.Service, error) WatchServices(ctx api.Context, labels labels.Selector, fields fields.Selector, resourceVersion string) (watch.Interface, error) - - // TODO: endpoints and their implementation should be separated, setting endpoints should be - // supported via the API, and the endpoints-controller should use the API to update endpoints. - endpoint.Registry } diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index deeaea12cfe..9e23f393f24 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -31,6 +31,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/fielderrors" @@ -43,12 +44,13 @@ type REST struct { registry Registry cloud cloudprovider.Interface machines minion.Registry + endpoints endpoint.Registry portalMgr *ipAllocator clusterName string } // NewStorage returns a new REST. -func NewStorage(registry Registry, cloud cloudprovider.Interface, machines minion.Registry, portalNet *net.IPNet, +func NewStorage(registry Registry, cloud cloudprovider.Interface, machines minion.Registry, endpoints endpoint.Registry, portalNet *net.IPNet, clusterName string) *REST { // TODO: Before we can replicate masters, this has to be synced (e.g. lives in etcd) ipa := newIPAllocator(portalNet) @@ -61,6 +63,7 @@ func NewStorage(registry Registry, cloud cloudprovider.Interface, machines minio registry: registry, cloud: cloud, machines: machines, + endpoints: endpoints, portalMgr: ipa, clusterName: clusterName, } @@ -230,7 +233,7 @@ var _ = rest.Redirector(&REST{}) // ResourceLocation returns a URL to which one can send traffic for the specified service. func (rs *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.RoundTripper, error) { - eps, err := rs.registry.GetEndpoints(ctx, id) + eps, err := rs.endpoints.GetEndpoints(ctx, id) if err != nil { return nil, nil, err } diff --git a/pkg/registry/service/rest_test.go b/pkg/registry/service/rest_test.go index 346c61faf1e..506d6a6018a 100644 --- a/pkg/registry/service/rest_test.go +++ b/pkg/registry/service/rest_test.go @@ -32,6 +32,18 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" ) +func NewTestREST(t *testing.T, endpoints *api.EndpointsList) (*REST, *registrytest.ServiceRegistry, *cloud.FakeCloud) { + registry := registrytest.NewServiceRegistry() + fakeCloud := &cloud.FakeCloud{} + machines := []string{"foo", "bar", "baz"} + endpointRegistry := ®istrytest.EndpointRegistry{ + Endpoints: endpoints, + } + nodeRegistry := registrytest.NewMinionRegistry(machines, api.NodeResources{}) + storage := NewStorage(registry, fakeCloud, nodeRegistry, endpointRegistry, makeIPNet(t), "kubernetes") + return storage, registry, fakeCloud +} + func makeIPNet(t *testing.T) *net.IPNet { _, net, err := net.ParseCIDR("1.2.3.0/24") if err != nil { @@ -41,10 +53,7 @@ func makeIPNet(t *testing.T) *net.IPNet { } func TestServiceRegistryCreate(t *testing.T) { - registry := registrytest.NewServiceRegistry() - fakeCloud := &cloud.FakeCloud{} - machines := []string{"foo", "bar", "baz"} - storage := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes") + storage, registry, fakeCloud := NewTestREST(t, nil) storage.portalMgr.randomAttempts = 0 svc := &api.Service{ @@ -84,8 +93,7 @@ func TestServiceRegistryCreate(t *testing.T) { } func TestServiceStorageValidatesCreate(t *testing.T) { - registry := registrytest.NewServiceRegistry() - storage := NewStorage(registry, nil, nil, makeIPNet(t), "kubernetes") + storage, _, _ := NewTestREST(t, nil) failureCases := map[string]api.Service{ "empty ID": { ObjectMeta: api.ObjectMeta{Name: ""}, @@ -120,7 +128,7 @@ func TestServiceStorageValidatesCreate(t *testing.T) { func TestServiceRegistryUpdate(t *testing.T) { ctx := api.NewDefaultContext() - registry := registrytest.NewServiceRegistry() + storage, registry, _ := NewTestREST(t, nil) registry.CreateService(ctx, &api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, Spec: api.ServiceSpec{ @@ -128,7 +136,6 @@ func TestServiceRegistryUpdate(t *testing.T) { Selector: map[string]string{"bar": "baz1"}, }, }) - storage := NewStorage(registry, nil, nil, makeIPNet(t), "kubernetes") updated_svc, created, err := storage.Update(ctx, &api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{ @@ -158,7 +165,7 @@ func TestServiceRegistryUpdate(t *testing.T) { func TestServiceStorageValidatesUpdate(t *testing.T) { ctx := api.NewDefaultContext() - registry := registrytest.NewServiceRegistry() + storage, registry, _ := NewTestREST(t, nil) registry.CreateService(ctx, &api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{ @@ -166,7 +173,6 @@ func TestServiceStorageValidatesUpdate(t *testing.T) { Selector: map[string]string{"bar": "baz"}, }, }) - storage := NewStorage(registry, nil, nil, makeIPNet(t), "kubernetes") failureCases := map[string]api.Service{ "empty ID": { ObjectMeta: api.ObjectMeta{Name: ""}, @@ -200,10 +206,7 @@ func TestServiceStorageValidatesUpdate(t *testing.T) { func TestServiceRegistryExternalService(t *testing.T) { ctx := api.NewDefaultContext() - registry := registrytest.NewServiceRegistry() - fakeCloud := &cloud.FakeCloud{} - machines := []string{"foo", "bar", "baz"} - storage := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes") + storage, registry, fakeCloud := NewTestREST(t, nil) svc := &api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{ @@ -231,12 +234,8 @@ func TestServiceRegistryExternalService(t *testing.T) { } func TestServiceRegistryExternalServiceError(t *testing.T) { - registry := registrytest.NewServiceRegistry() - fakeCloud := &cloud.FakeCloud{ - Err: fmt.Errorf("test error"), - } - machines := []string{"foo", "bar", "baz"} - storage := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes") + storage, registry, fakeCloud := NewTestREST(t, nil) + fakeCloud.Err = fmt.Errorf("test error") svc := &api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{ @@ -259,10 +258,7 @@ func TestServiceRegistryExternalServiceError(t *testing.T) { func TestServiceRegistryDelete(t *testing.T) { ctx := api.NewDefaultContext() - registry := registrytest.NewServiceRegistry() - fakeCloud := &cloud.FakeCloud{} - machines := []string{"foo", "bar", "baz"} - storage := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes") + storage, registry, fakeCloud := NewTestREST(t, nil) svc := &api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{ @@ -283,10 +279,7 @@ func TestServiceRegistryDelete(t *testing.T) { func TestServiceRegistryDeleteExternal(t *testing.T) { ctx := api.NewDefaultContext() - registry := registrytest.NewServiceRegistry() - fakeCloud := &cloud.FakeCloud{} - machines := []string{"foo", "bar", "baz"} - storage := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes") + storage, registry, fakeCloud := NewTestREST(t, nil) svc := &api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{ @@ -308,10 +301,7 @@ func TestServiceRegistryDeleteExternal(t *testing.T) { func TestServiceRegistryUpdateExternalService(t *testing.T) { ctx := api.NewDefaultContext() - registry := registrytest.NewServiceRegistry() - fakeCloud := &cloud.FakeCloud{} - machines := []string{"foo", "bar", "baz"} - storage := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes") + storage, _, fakeCloud := NewTestREST(t, nil) // Create non-external load balancer. svc1 := &api.Service{ @@ -352,10 +342,7 @@ func TestServiceRegistryUpdateExternalService(t *testing.T) { func TestServiceRegistryGet(t *testing.T) { ctx := api.NewDefaultContext() - registry := registrytest.NewServiceRegistry() - fakeCloud := &cloud.FakeCloud{} - machines := []string{"foo", "bar", "baz"} - storage := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes") + storage, registry, fakeCloud := NewTestREST(t, nil) registry.CreateService(ctx, &api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{ @@ -373,11 +360,24 @@ func TestServiceRegistryGet(t *testing.T) { func TestServiceRegistryResourceLocation(t *testing.T) { ctx := api.NewDefaultContext() - registry := registrytest.NewServiceRegistry() - registry.Endpoints = api.Endpoints{Endpoints: []api.Endpoint{{IP: "foo", Port: 80}}} - fakeCloud := &cloud.FakeCloud{} - machines := []string{"foo", "bar", "baz"} - storage := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes") + endpoints := &api.EndpointsList{ + Items: []api.Endpoints{ + { + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: api.NamespaceDefault, + }, + Endpoints: []api.Endpoint{ + { + IP: "100.100.100.100", + Port: 80, + }, + }, + Protocol: api.ProtocolTCP, + }, + }, + } + storage, registry, _ := NewTestREST(t, endpoints) registry.CreateService(ctx, &api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{ @@ -392,26 +392,19 @@ func TestServiceRegistryResourceLocation(t *testing.T) { if location == nil { t.Errorf("Unexpected nil: %v", location) } - if e, a := "//foo:80", location.String(); e != a { - t.Errorf("Expected %v, but got %v", e, a) - } - if e, a := "foo", registry.GottenID; e != a { + if e, a := "//100.100.100.100:80", location.String(); e != a { t.Errorf("Expected %v, but got %v", e, a) } // Test error path - registry.Err = fmt.Errorf("fake error") - if _, _, err = redirector.ResourceLocation(ctx, "foo"); err == nil { + if _, _, err = redirector.ResourceLocation(ctx, "bar"); err == nil { t.Errorf("unexpected nil error") } } func TestServiceRegistryList(t *testing.T) { ctx := api.NewDefaultContext() - registry := registrytest.NewServiceRegistry() - fakeCloud := &cloud.FakeCloud{} - machines := []string{"foo", "bar", "baz"} - storage := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes") + storage, registry, fakeCloud := NewTestREST(t, nil) registry.CreateService(ctx, &api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, Spec: api.ServiceSpec{ @@ -445,10 +438,7 @@ func TestServiceRegistryList(t *testing.T) { } func TestServiceRegistryIPAllocation(t *testing.T) { - registry := registrytest.NewServiceRegistry() - fakeCloud := &cloud.FakeCloud{} - machines := []string{"foo", "bar", "baz"} - rest := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes") + rest, _, _ := NewTestREST(t, nil) rest.portalMgr.randomAttempts = 0 svc1 := &api.Service{ @@ -507,10 +497,7 @@ func TestServiceRegistryIPAllocation(t *testing.T) { } func TestServiceRegistryIPReallocation(t *testing.T) { - registry := registrytest.NewServiceRegistry() - fakeCloud := &cloud.FakeCloud{} - machines := []string{"foo", "bar", "baz"} - rest := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes") + rest, _, _ := NewTestREST(t, nil) rest.portalMgr.randomAttempts = 0 svc1 := &api.Service{ @@ -555,10 +542,7 @@ func TestServiceRegistryIPReallocation(t *testing.T) { } func TestServiceRegistryIPUpdate(t *testing.T) { - registry := registrytest.NewServiceRegistry() - fakeCloud := &cloud.FakeCloud{} - machines := []string{"foo", "bar", "baz"} - rest := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes") + rest, _, _ := NewTestREST(t, nil) rest.portalMgr.randomAttempts = 0 svc := &api.Service{ @@ -601,10 +585,7 @@ func TestServiceRegistryIPUpdate(t *testing.T) { } func TestServiceRegistryIPExternalLoadBalancer(t *testing.T) { - registry := registrytest.NewServiceRegistry() - fakeCloud := &cloud.FakeCloud{} - machines := []string{"foo", "bar", "baz"} - rest := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes") + rest, _, fakeCloud := NewTestREST(t, nil) rest.portalMgr.randomAttempts = 0 svc := &api.Service{ @@ -643,7 +624,9 @@ func TestServiceRegistryIPReloadFromStorage(t *testing.T) { registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} - rest1 := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes") + nodeRegistry := registrytest.NewMinionRegistry(machines, api.NodeResources{}) + endpoints := ®istrytest.EndpointRegistry{} + rest1 := NewStorage(registry, fakeCloud, nodeRegistry, endpoints, makeIPNet(t), "kubernetes") rest1.portalMgr.randomAttempts = 0 svc := &api.Service{ @@ -669,7 +652,8 @@ func TestServiceRegistryIPReloadFromStorage(t *testing.T) { rest1.Create(ctx, svc) // This will reload from storage, finding the previous 2 - rest2 := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes") + nodeRegistry = registrytest.NewMinionRegistry(machines, api.NodeResources{}) + rest2 := NewStorage(registry, fakeCloud, nodeRegistry, endpoints, makeIPNet(t), "kubernetes") rest2.portalMgr.randomAttempts = 0 svc = &api.Service{ @@ -726,10 +710,7 @@ func TestUpdateServiceWithConflictingNamespace(t *testing.T) { } func TestCreate(t *testing.T) { - registry := registrytest.NewServiceRegistry() - fakeCloud := &cloud.FakeCloud{} - machines := []string{"foo", "bar", "baz"} - rest := NewStorage(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t), "kubernetes") + rest, registry, _ := NewTestREST(t, nil) rest.portalMgr.randomAttempts = 0 test := resttest.New(t, rest, registry.SetError) diff --git a/test/integration/auth_test.go b/test/integration/auth_test.go index 93a6bc8f489..b352c1624ea 100644 --- a/test/integration/auth_test.go +++ b/test/integration/auth_test.go @@ -247,7 +247,7 @@ func getTestRequests() []struct { {"PUT", "/api/v1beta1/endpoints/a" + timeoutFlag, aEndpoints, code200}, {"GET", "/api/v1beta1/endpoints", "", code200}, {"GET", "/api/v1beta1/endpoints/a", "", code200}, - {"DELETE", "/api/v1beta1/endpoints/a" + timeoutFlag, "", code405}, + {"DELETE", "/api/v1beta1/endpoints/a" + timeoutFlag, "", code200}, // Normal methods on minions {"GET", "/api/v1beta1/minions", "", code200},