From 325f9ef0051b9039555877acbfb8b30e9a907e4b Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sun, 3 Aug 2014 13:07:40 -0400 Subject: [PATCH] Make create atomic on etcd for Services/ReplControllers --- pkg/apiserver/apiserver.go | 19 ---------------- pkg/apiserver/errors.go | 38 +++++++++++++++++++++++++++++++ pkg/registry/etcdregistry.go | 18 ++++++++++----- pkg/registry/etcdregistry_test.go | 34 ++++++++++++++++++++++----- pkg/tools/etcd_tools.go | 21 +++++++++++++++++ 5 files changed, 99 insertions(+), 31 deletions(-) diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index e627dd1c66c..0bde466ae69 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -33,25 +33,6 @@ import ( "github.com/golang/glog" ) -// errNotFound is an error which indicates that a specified resource is not found. -type errNotFound string - -// Error returns a string representation of the err. -func (err errNotFound) Error() string { - return string(err) -} - -// IsNotFound determines if the err is an error which indicates that a specified resource was not found. -func IsNotFound(err error) bool { - _, ok := err.(errNotFound) - return ok -} - -// NewNotFoundErr returns a new error which indicates that the resource of the kind and the name was not found. -func NewNotFoundErr(kind, name string) error { - return errNotFound(fmt.Sprintf("%s %q not found", kind, name)) -} - // APIServer is an HTTPHandler that delegates to RESTStorage objects. // It handles URLs of the form: // ${prefix}/${storage_key}[/${object_name}] diff --git a/pkg/apiserver/errors.go b/pkg/apiserver/errors.go index 34bf293038a..14a11501d1f 100644 --- a/pkg/apiserver/errors.go +++ b/pkg/apiserver/errors.go @@ -21,6 +21,44 @@ import ( "net/http" ) +// errNotFound is an error which indicates that a specified resource is not found. +type errNotFound string + +// Error returns a string representation of the err. +func (err errNotFound) Error() string { + return string(err) +} + +// IsNotFound determines if the err is an error which indicates that a specified resource was not found. +func IsNotFound(err error) bool { + _, ok := err.(errNotFound) + return ok +} + +// NewNotFoundErr returns a new error which indicates that the resource of the kind and the name was not found. +func NewNotFoundErr(kind, name string) error { + return errNotFound(fmt.Sprintf("%s %q not found", kind, name)) +} + +// errAlreadyExists is an error which indicates that a specified resource already exists. +type errAlreadyExists string + +// Error returns a string representation of the err. +func (err errAlreadyExists) Error() string { + return string(err) +} + +// IsAlreadyExists determines if the err is an error which indicates that a specified resource already exists. +func IsAlreadyExists(err error) bool { + _, ok := err.(errAlreadyExists) + return ok +} + +// NewAlreadyExistsErr returns a new error which indicates that the resource of the kind and the name was not found. +func NewAlreadyExistsErr(kind, name string) error { + return errAlreadyExists(fmt.Sprintf("%s %q already exists", kind, name)) +} + // internalError renders a generic error to the response func internalError(err error, w http.ResponseWriter) { w.WriteHeader(http.StatusInternalServerError) diff --git a/pkg/registry/etcdregistry.go b/pkg/registry/etcdregistry.go index 3ce50ddcd13..04e04c552ea 100644 --- a/pkg/registry/etcdregistry.go +++ b/pkg/registry/etcdregistry.go @@ -232,8 +232,11 @@ func (registry *EtcdRegistry) GetController(controllerID string) (*api.Replicati // CreateController creates a new ReplicationController. func (registry *EtcdRegistry) CreateController(controller api.ReplicationController) error { - // TODO : check for existence here and error. - return registry.UpdateController(controller) + err := registry.helper.CreateObj(makeControllerKey(controller.ID), controller) + if tools.IsEtcdNodeExist(err) { + return apiserver.NewAlreadyExistsErr("replicationController", controller.ID) + } + return err } // UpdateController replaces an existing ReplicationController. @@ -264,7 +267,11 @@ func (registry *EtcdRegistry) ListServices() (api.ServiceList, error) { // CreateService creates a new Service. func (registry *EtcdRegistry) CreateService(svc api.Service) error { - return registry.helper.SetObj(makeServiceKey(svc.ID), svc) + err := registry.helper.CreateObj(makeServiceKey(svc.ID), svc) + if tools.IsEtcdNodeExist(err) { + return apiserver.NewAlreadyExistsErr("service", svc.ID) + } + return err } // GetService obtains a Service specified by its name. @@ -305,12 +312,11 @@ func (registry *EtcdRegistry) DeleteService(name string) error { // UpdateService replaces an existing Service. func (registry *EtcdRegistry) UpdateService(svc api.Service) error { - // TODO : check for existence here and error. - return registry.CreateService(svc) + return registry.helper.SetObj(makeServiceKey(svc.ID), svc) } // UpdateEndpoints update Endpoints of a Service. func (registry *EtcdRegistry) UpdateEndpoints(e api.Endpoints) error { updateFunc := func(interface{}) (interface{}, error) { return e, nil } - return registry.helper.AtomicUpdate("/registry/services/endpoints/"+e.ID, &api.Endpoints{}, updateFunc) + return registry.helper.AtomicUpdate(makeServiceEndpointsKey(e.ID), &api.Endpoints{}, updateFunc) } diff --git a/pkg/registry/etcdregistry_test.go b/pkg/registry/etcdregistry_test.go index e033d9aca72..9713016087f 100644 --- a/pkg/registry/etcdregistry_test.go +++ b/pkg/registry/etcdregistry_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -574,6 +575,21 @@ func TestEtcdCreateController(t *testing.T) { } } +func TestEtcdCreateControllerAlreadyExisting(t *testing.T) { + fakeClient := tools.MakeFakeEtcdClient(t) + fakeClient.Set("/registry/controllers/foo", util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0) + + registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) + err := registry.CreateController(api.ReplicationController{ + JSONBase: api.JSONBase{ + ID: "foo", + }, + }) + if !apiserver.IsAlreadyExists(err) { + t.Errorf("expected already exists err, got %#v", err) + } +} + func TestEtcdUpdateController(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) fakeClient.TestIndex = true @@ -627,12 +643,6 @@ func TestEtcdListServices(t *testing.T) { func TestEtcdCreateService(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) - fakeClient.Data["/registry/services/specs/foo"] = tools.EtcdResponseWithError{ - R: &etcd.Response{ - Node: nil, - }, - E: tools.EtcdErrorNotFound, - } registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) err := registry.CreateService(api.Service{ JSONBase: api.JSONBase{ID: "foo"}, @@ -657,6 +667,18 @@ func TestEtcdCreateService(t *testing.T) { } } +func TestEtcdCreateServiceAlreadyExisting(t *testing.T) { + fakeClient := tools.MakeFakeEtcdClient(t) + fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0) + registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) + err := registry.CreateService(api.Service{ + JSONBase: api.JSONBase{ID: "foo"}, + }) + if !apiserver.IsAlreadyExists(err) { + t.Errorf("expected already exists err, got %#v", err) + } +} + func TestEtcdGetService(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0) diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index 101b43586d3..0fd04120970 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -17,6 +17,7 @@ limitations under the License. package tools import ( + "errors" "fmt" "reflect" "sync" @@ -87,6 +88,11 @@ func IsEtcdNotFound(err error) bool { return isEtcdErrorNum(err, EtcdErrorCodeNotFound) } +// Returns true iff err is an etcd key node exists error. +func IsEtcdNodeExist(err error) bool { + return isEtcdErrorNum(err, EtcdErrorCodeNodeExist) +} + // IsEtcdTestFailed returns true iff err is an etcd write conflict. func IsEtcdTestFailed(err error) bool { return isEtcdErrorNum(err, EtcdErrorCodeTestFailed) @@ -172,6 +178,21 @@ func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNot return body, response.Node.ModifiedIndex, err } +func (h *EtcdHelper) CreateObj(key string, obj interface{}) error { + data, err := h.Encoding.Encode(obj) + if err != nil { + return err + } + if h.Versioning != nil { + if version, err := h.Versioning.ResourceVersion(obj); err == nil && version != 0 { + return errors.New("resourceVersion may not be set on objects to be created") + } + } + + _, err = h.Client.Create(key, string(data), 0) + return err +} + // SetObj marshals obj via json, and stores under key. Will do an // atomic update if obj's ResourceVersion field is set. func (h *EtcdHelper) SetObj(key string, obj interface{}) error {