Make create atomic on etcd for Services/ReplControllers

This commit is contained in:
Clayton Coleman 2014-08-03 13:07:40 -04:00
parent 83aeae7658
commit 325f9ef005
5 changed files with 99 additions and 31 deletions

View File

@ -33,25 +33,6 @@ import (
"github.com/golang/glog"
)
// errNotFound is an error which indicates that a specified resource is not found.
type errNotFound string
// Error returns a string representation of the err.
func (err errNotFound) Error() string {
return string(err)
}
// IsNotFound determines if the err is an error which indicates that a specified resource was not found.
func IsNotFound(err error) bool {
_, ok := err.(errNotFound)
return ok
}
// NewNotFoundErr returns a new error which indicates that the resource of the kind and the name was not found.
func NewNotFoundErr(kind, name string) error {
return errNotFound(fmt.Sprintf("%s %q not found", kind, name))
}
// APIServer is an HTTPHandler that delegates to RESTStorage objects.
// It handles URLs of the form:
// ${prefix}/${storage_key}[/${object_name}]

View File

@ -21,6 +21,44 @@ import (
"net/http"
)
// errNotFound is an error which indicates that a specified resource is not found.
type errNotFound string
// Error returns a string representation of the err.
func (err errNotFound) Error() string {
return string(err)
}
// IsNotFound determines if the err is an error which indicates that a specified resource was not found.
func IsNotFound(err error) bool {
_, ok := err.(errNotFound)
return ok
}
// NewNotFoundErr returns a new error which indicates that the resource of the kind and the name was not found.
func NewNotFoundErr(kind, name string) error {
return errNotFound(fmt.Sprintf("%s %q not found", kind, name))
}
// errAlreadyExists is an error which indicates that a specified resource already exists.
type errAlreadyExists string
// Error returns a string representation of the err.
func (err errAlreadyExists) Error() string {
return string(err)
}
// IsAlreadyExists determines if the err is an error which indicates that a specified resource already exists.
func IsAlreadyExists(err error) bool {
_, ok := err.(errAlreadyExists)
return ok
}
// NewAlreadyExistsErr returns a new error which indicates that the resource of the kind and the name was not found.
func NewAlreadyExistsErr(kind, name string) error {
return errAlreadyExists(fmt.Sprintf("%s %q already exists", kind, name))
}
// internalError renders a generic error to the response
func internalError(err error, w http.ResponseWriter) {
w.WriteHeader(http.StatusInternalServerError)

View File

@ -232,8 +232,11 @@ func (registry *EtcdRegistry) GetController(controllerID string) (*api.Replicati
// CreateController creates a new ReplicationController.
func (registry *EtcdRegistry) CreateController(controller api.ReplicationController) error {
// TODO : check for existence here and error.
return registry.UpdateController(controller)
err := registry.helper.CreateObj(makeControllerKey(controller.ID), controller)
if tools.IsEtcdNodeExist(err) {
return apiserver.NewAlreadyExistsErr("replicationController", controller.ID)
}
return err
}
// UpdateController replaces an existing ReplicationController.
@ -264,7 +267,11 @@ func (registry *EtcdRegistry) ListServices() (api.ServiceList, error) {
// CreateService creates a new Service.
func (registry *EtcdRegistry) CreateService(svc api.Service) error {
return registry.helper.SetObj(makeServiceKey(svc.ID), svc)
err := registry.helper.CreateObj(makeServiceKey(svc.ID), svc)
if tools.IsEtcdNodeExist(err) {
return apiserver.NewAlreadyExistsErr("service", svc.ID)
}
return err
}
// GetService obtains a Service specified by its name.
@ -305,12 +312,11 @@ func (registry *EtcdRegistry) DeleteService(name string) error {
// UpdateService replaces an existing Service.
func (registry *EtcdRegistry) UpdateService(svc api.Service) error {
// TODO : check for existence here and error.
return registry.CreateService(svc)
return registry.helper.SetObj(makeServiceKey(svc.ID), svc)
}
// UpdateEndpoints update Endpoints of a Service.
func (registry *EtcdRegistry) UpdateEndpoints(e api.Endpoints) error {
updateFunc := func(interface{}) (interface{}, error) { return e, nil }
return registry.helper.AtomicUpdate("/registry/services/endpoints/"+e.ID, &api.Endpoints{}, updateFunc)
return registry.helper.AtomicUpdate(makeServiceEndpointsKey(e.ID), &api.Endpoints{}, updateFunc)
}

View File

@ -21,6 +21,7 @@ import (
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -574,6 +575,21 @@ func TestEtcdCreateController(t *testing.T) {
}
}
func TestEtcdCreateControllerAlreadyExisting(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.Set("/registry/controllers/foo", util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.CreateController(api.ReplicationController{
JSONBase: api.JSONBase{
ID: "foo",
},
})
if !apiserver.IsAlreadyExists(err) {
t.Errorf("expected already exists err, got %#v", err)
}
}
func TestEtcdUpdateController(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.TestIndex = true
@ -627,12 +643,6 @@ func TestEtcdListServices(t *testing.T) {
func TestEtcdCreateService(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/services/specs/foo"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: tools.EtcdErrorNotFound,
}
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.CreateService(api.Service{
JSONBase: api.JSONBase{ID: "foo"},
@ -657,6 +667,18 @@ func TestEtcdCreateService(t *testing.T) {
}
}
func TestEtcdCreateServiceAlreadyExisting(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.CreateService(api.Service{
JSONBase: api.JSONBase{ID: "foo"},
})
if !apiserver.IsAlreadyExists(err) {
t.Errorf("expected already exists err, got %#v", err)
}
}
func TestEtcdGetService(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0)

View File

@ -17,6 +17,7 @@ limitations under the License.
package tools
import (
"errors"
"fmt"
"reflect"
"sync"
@ -87,6 +88,11 @@ func IsEtcdNotFound(err error) bool {
return isEtcdErrorNum(err, EtcdErrorCodeNotFound)
}
// Returns true iff err is an etcd key node exists error.
func IsEtcdNodeExist(err error) bool {
return isEtcdErrorNum(err, EtcdErrorCodeNodeExist)
}
// IsEtcdTestFailed returns true iff err is an etcd write conflict.
func IsEtcdTestFailed(err error) bool {
return isEtcdErrorNum(err, EtcdErrorCodeTestFailed)
@ -172,6 +178,21 @@ func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNot
return body, response.Node.ModifiedIndex, err
}
func (h *EtcdHelper) CreateObj(key string, obj interface{}) error {
data, err := h.Encoding.Encode(obj)
if err != nil {
return err
}
if h.Versioning != nil {
if version, err := h.Versioning.ResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion may not be set on objects to be created")
}
}
_, err = h.Client.Create(key, string(data), 0)
return err
}
// SetObj marshals obj via json, and stores under key. Will do an
// atomic update if obj's ResourceVersion field is set.
func (h *EtcdHelper) SetObj(key string, obj interface{}) error {