add delete precondition

This commit is contained in:
Chao Xu
2016-03-20 23:15:00 -07:00
parent 590038dcf1
commit 31b425b3a1
71 changed files with 1600 additions and 372 deletions

View File

@@ -239,8 +239,8 @@ func (c *Cacher) Set(ctx context.Context, key string, obj, out runtime.Object, t
}
// Implements storage.Interface.
func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object) error {
return c.storage.Delete(ctx, key, out)
func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions) error {
return c.storage.Delete(ctx, key, out, preconditions)
}
// Implements storage.Interface.
@@ -347,8 +347,8 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, f
}
// Implements storage.Interface.
func (c *Cacher) GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate UpdateFunc) error {
return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, tryUpdate)
func (c *Cacher) GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, preconditions *Preconditions, tryUpdate UpdateFunc) error {
return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate)
}
// Implements storage.Interface.

View File

@@ -107,7 +107,7 @@ func TestList(t *testing.T) {
_ = updatePod(t, etcdStorage, podFooPrime, fooCreated)
deleted := api.Pod{}
if err := etcdStorage.Delete(context.TODO(), etcdtest.AddPrefix("pods/ns/bar"), &deleted); err != nil {
if err := etcdStorage.Delete(context.TODO(), etcdtest.AddPrefix("pods/ns/bar"), &deleted, nil); err != nil {
t.Errorf("Unexpected error: %v", err)
}
@@ -315,7 +315,7 @@ func TestFiltering(t *testing.T) {
_ = updatePod(t, etcdStorage, podFooPrime, fooUnfiltered)
deleted := api.Pod{}
if err := etcdStorage.Delete(context.TODO(), etcdtest.AddPrefix("pods/ns/foo"), &deleted); err != nil {
if err := etcdStorage.Delete(context.TODO(), etcdtest.AddPrefix("pods/ns/foo"), &deleted, nil); err != nil {
t.Errorf("Unexpected error: %v", err)
}

View File

@@ -16,7 +16,11 @@ limitations under the License.
package storage
import "fmt"
import (
"fmt"
"k8s.io/kubernetes/pkg/util/validation/field"
)
const (
ErrCodeKeyNotFound int = iota + 1
@@ -104,3 +108,47 @@ func isErrCode(err error, code int) bool {
}
return false
}
// InvalidError is generated when an error caused by invalid API object occurs
// in the storage package.
type InvalidError struct {
Errs field.ErrorList
}
func (e InvalidError) Error() string {
return e.Errs.ToAggregate().Error()
}
// IsInvalidError returns true if and only if err is an InvalidError.
func IsInvalidError(err error) bool {
_, ok := err.(InvalidError)
return ok
}
func NewInvalidError(errors field.ErrorList) InvalidError {
return InvalidError{errors}
}
// InternalError is generated when an error occurs in the storage package, i.e.,
// not from the underlying storage backend (e.g., etcd).
type InternalError struct {
Reason string
}
func (e InternalError) Error() string {
return e.Reason
}
// IsInternalError returns true if and only if err is an InternalError.
func IsInternalError(err error) bool {
_, ok := err.(InternalError)
return ok
}
func NewInternalError(reason string) InternalError {
return InternalError{reason}
}
func NewInternalErrorf(format string, a ...interface{}) InternalError {
return InternalError{fmt.Sprintf(format, a)}
}

View File

@@ -291,26 +291,76 @@ func (h *etcdHelper) Set(ctx context.Context, key string, obj, out runtime.Objec
return err
}
func checkPreconditions(preconditions *storage.Preconditions, out runtime.Object) error {
if preconditions == nil {
return nil
}
objMeta, err := api.ObjectMetaFor(out)
if err != nil {
return storage.NewInternalErrorf("can't enforce preconditions %v on un-introspectable object %v, got error: %v", *preconditions, out, err)
}
if preconditions.UID != nil && *preconditions.UID != objMeta.UID {
return etcd.Error{Code: etcd.ErrorCodeTestFailed, Message: fmt.Sprintf("the UID in the precondition (%s) does not match the UID in record (%s). The object might have been deleted and then recreated", *preconditions.UID, objMeta.UID)}
}
return nil
}
// Implements storage.Interface.
func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object) error {
func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions) error {
if ctx == nil {
glog.Errorf("Context is nil")
}
key = h.prefixEtcdKey(key)
if _, err := conversion.EnforcePtr(out); err != nil {
v, err := conversion.EnforcePtr(out)
if err != nil {
panic("unable to convert output object to pointer")
}
startTime := time.Now()
response, err := h.etcdKeysAPI.Delete(ctx, key, nil)
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
if !etcdutil.IsEtcdNotFound(err) {
// if the object that existed prior to the delete is returned by etcd, update out.
if err != nil || response.PrevNode != nil {
_, _, err = h.extractObj(response, err, out, false, true)
if preconditions == nil {
startTime := time.Now()
response, err := h.etcdKeysAPI.Delete(ctx, key, nil)
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
if !etcdutil.IsEtcdNotFound(err) {
// if the object that existed prior to the delete is returned by etcd, update the out object.
if err != nil || response.PrevNode != nil {
_, _, err = h.extractObj(response, err, out, false, true)
}
}
return toStorageErr(err, key, 0)
}
// Check the preconditions match.
obj := reflect.New(v.Type()).Interface().(runtime.Object)
for {
_, node, res, err := h.bodyAndExtractObj(ctx, key, obj, false)
if err != nil {
return toStorageErr(err, key, 0)
}
if err := checkPreconditions(preconditions, obj); err != nil {
return toStorageErr(err, key, 0)
}
index := uint64(0)
if node != nil {
index = node.ModifiedIndex
} else if res != nil {
index = res.Index
}
opt := etcd.DeleteOptions{PrevIndex: index}
startTime := time.Now()
response, err := h.etcdKeysAPI.Delete(ctx, key, &opt)
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
if etcdutil.IsEtcdTestFailed(err) {
glog.Infof("deletion of %s failed because of a conflict, going to retry", key)
} else {
if !etcdutil.IsEtcdNotFound(err) {
// if the object that existed prior to the delete is returned by etcd, update the out object.
if err != nil || response.PrevNode != nil {
_, _, err = h.extractObj(response, err, out, false, true)
}
}
return toStorageErr(err, key, 0)
}
}
return toStorageErr(err, key, 0)
}
// Implements storage.Interface.
@@ -547,7 +597,7 @@ func (h *etcdHelper) listEtcdNode(ctx context.Context, key string) ([]*etcd.Node
}
// Implements storage.Interface.
func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate storage.UpdateFunc) error {
func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc) error {
if ctx == nil {
glog.Errorf("Context is nil")
}
@@ -561,7 +611,10 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType
obj := reflect.New(v.Type()).Interface().(runtime.Object)
origBody, node, res, err := h.bodyAndExtractObj(ctx, key, obj, ignoreNotFound)
if err != nil {
return err
return toStorageErr(err, key, 0)
}
if err := checkPreconditions(preconditions, obj); err != nil {
return toStorageErr(err, key, 0)
}
meta := storage.ResponseMeta{}
if node != nil {
@@ -574,7 +627,7 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType
// Get the object to be written by calling tryUpdate.
ret, newTTL, err := tryUpdate(obj, meta)
if err != nil {
return err
return toStorageErr(err, key, 0)
}
index := uint64(0)

View File

@@ -371,7 +371,7 @@ func TestGuaranteedUpdate(t *testing.T) {
helper := newEtcdHelper(server.Client, codec, key)
obj := &storagetesting.TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
return obj, nil
}))
if err != nil {
@@ -381,7 +381,7 @@ func TestGuaranteedUpdate(t *testing.T) {
// Update an existing node.
callbackCalled := false
objUpdate := &storagetesting.TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 2}
err = helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
err = helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
callbackCalled = true
if in.(*storagetesting.TestResource).Value != 1 {
@@ -416,7 +416,7 @@ func TestGuaranteedUpdateNoChange(t *testing.T) {
helper := newEtcdHelper(server.Client, codec, key)
obj := &storagetesting.TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
return obj, nil
}))
if err != nil {
@@ -426,7 +426,7 @@ func TestGuaranteedUpdateNoChange(t *testing.T) {
// Update an existing node with the same data
callbackCalled := false
objUpdate := &storagetesting.TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
err = helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
err = helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
callbackCalled = true
return objUpdate, nil
}))
@@ -453,13 +453,13 @@ func TestGuaranteedUpdateKeyNotFound(t *testing.T) {
})
ignoreNotFound := false
err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, ignoreNotFound, f)
err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, ignoreNotFound, nil, f)
if err == nil {
t.Errorf("Expected error for key not found.")
}
ignoreNotFound = true
err = helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, ignoreNotFound, f)
err = helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, ignoreNotFound, nil, f)
if err != nil {
t.Errorf("Unexpected error %v.", err)
}
@@ -484,7 +484,7 @@ func TestGuaranteedUpdate_CreateCollision(t *testing.T) {
defer wgDone.Done()
firstCall := true
err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
defer func() { firstCall = false }()
if firstCall {
@@ -514,6 +514,26 @@ func TestGuaranteedUpdate_CreateCollision(t *testing.T) {
}
}
func TestGuaranteedUpdateUIDMismatch(t *testing.T) {
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
prefix := path.Join("/", etcdtest.PathPrefix())
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), prefix)
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", UID: "A"}}
podPtr := &api.Pod{}
err := helper.Create(context.TODO(), "/some/key", obj, podPtr, 0)
if err != nil {
t.Fatalf("Unexpected error %#v", err)
}
err = helper.GuaranteedUpdate(context.TODO(), "/some/key", podPtr, true, storage.NewUIDPreconditions("B"), storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
return obj, nil
}))
if !storage.IsTestFailed(err) {
t.Fatalf("Expect a Test Failed (write conflict) error, got: %v", err)
}
}
func TestPrefixEtcdKey(t *testing.T) {
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
@@ -534,3 +554,79 @@ func TestPrefixEtcdKey(t *testing.T) {
assert.Equal(t, keyBefore, keyAfter, "Prefix incorrectly added by EtcdHelper")
}
func TestDeleteUIDMismatch(t *testing.T) {
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
prefix := path.Join("/", etcdtest.PathPrefix())
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), prefix)
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", UID: "A"}}
podPtr := &api.Pod{}
err := helper.Create(context.TODO(), "/some/key", obj, podPtr, 0)
if err != nil {
t.Fatalf("Unexpected error %#v", err)
}
err = helper.Delete(context.TODO(), "/some/key", obj, storage.NewUIDPreconditions("B"))
if !storage.IsTestFailed(err) {
t.Fatalf("Expect a Test Failed (write conflict) error, got: %v", err)
}
}
type getFunc func(ctx context.Context, key string, opts *etcd.GetOptions) (*etcd.Response, error)
type fakeDeleteKeysAPI struct {
etcd.KeysAPI
fakeGetFunc getFunc
getCount int
// The fakeGetFunc will be called fakeGetCap times before the KeysAPI's Get will be called.
fakeGetCap int
}
func (f *fakeDeleteKeysAPI) Get(ctx context.Context, key string, opts *etcd.GetOptions) (*etcd.Response, error) {
f.getCount++
if f.getCount < f.fakeGetCap {
return f.fakeGetFunc(ctx, key, opts)
}
return f.KeysAPI.Get(ctx, key, opts)
}
// This is to emulate the case where another party updates the object when
// etcdHelper.Delete has verified the preconditions, but hasn't carried out the
// deletion yet. Etcd will fail the deletion and report the conflict. etcdHelper
// should retry until there is no conflict.
func TestDeleteWithRetry(t *testing.T) {
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
prefix := path.Join("/", etcdtest.PathPrefix())
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", UID: "A"}}
// fakeGet returns a large ModifiedIndex to emulate the case that another
// party has updated the object.
fakeGet := func(ctx context.Context, key string, opts *etcd.GetOptions) (*etcd.Response, error) {
data, _ := runtime.Encode(testapi.Default.Codec(), obj)
return &etcd.Response{Node: &etcd.Node{Value: string(data), ModifiedIndex: 99}}, nil
}
expectedRetries := 3
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), prefix)
fake := &fakeDeleteKeysAPI{KeysAPI: helper.etcdKeysAPI, fakeGetCap: expectedRetries, fakeGetFunc: fakeGet}
helper.etcdKeysAPI = fake
returnedObj := &api.Pod{}
err := helper.Create(context.TODO(), "/some/key", obj, returnedObj, 0)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
err = helper.Delete(context.TODO(), "/some/key", obj, storage.NewUIDPreconditions("A"))
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
if fake.getCount != expectedRetries {
t.Errorf("Expect %d retries, got %d", expectedRetries, fake.getCount)
}
err = helper.Get(context.TODO(), "/some/key", obj, false)
if !storage.IsNotFound(err) {
t.Errorf("Expect an NotFound error, got %v", err)
}
}

View File

@@ -42,6 +42,7 @@ const (
EtcdSet = "set"
EtcdCAS = "compareAndSwap"
EtcdDelete = "delete"
EtcdCAD = "compareAndDelete"
EtcdExpire = "expire"
)
@@ -450,7 +451,7 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) {
w.sendAdd(res)
case EtcdSet, EtcdCAS:
w.sendModify(res)
case EtcdDelete, EtcdExpire:
case EtcdDelete, EtcdExpire, EtcdCAD:
w.sendDelete(res)
default:
utilruntime.HandleError(fmt.Errorf("unknown action: %v", res.Action))

View File

@@ -21,6 +21,7 @@ import (
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/watch"
)
@@ -69,6 +70,18 @@ func Everything(runtime.Object) bool {
// See the comment for GuaranteedUpdate for more details.
type UpdateFunc func(input runtime.Object, res ResponseMeta) (output runtime.Object, ttl *uint64, err error)
// Preconditions must be fulfilled before an operation (update, delete, etc.) is carried out.
type Preconditions struct {
// Specifies the target UID.
UID *types.UID `json:"uid,omitempty"`
}
// NewUIDPreconditions returns a Preconditions with UID set.
func NewUIDPreconditions(uid string) *Preconditions {
u := types.UID(uid)
return &Preconditions{UID: &u}
}
// Interface offers a common interface for object marshaling/unmarshling operations and
// hides all the storage-related operations behind it.
type Interface interface {
@@ -91,7 +104,7 @@ type Interface interface {
Set(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error
// Delete removes the specified key and returns the value that existed at that spot.
Delete(ctx context.Context, key string, out runtime.Object) error
Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions) error
// Watch begins watching the specified key. Events are decoded into API objects,
// and any items passing 'filter' are sent down to returned watch.Interface.
@@ -146,7 +159,7 @@ type Interface interface {
// return cur, nil, nil
// }
// })
GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate UpdateFunc) error
GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, precondtions *Preconditions, tryUpdate UpdateFunc) error
// Codec provides access to the underlying codec being used by the implementation.
Codec() runtime.Codec

View File

@@ -20,9 +20,7 @@ import (
"fmt"
"strconv"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/validation"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/validation/field"
@@ -48,7 +46,7 @@ func ParseWatchResourceVersion(resourceVersion string) (uint64, error) {
}
version, err := strconv.ParseUint(resourceVersion, 10, 64)
if err != nil {
return 0, errors.NewInvalid(unversioned.GroupKind{}, "", field.ErrorList{
return 0, NewInvalidError(field.ErrorList{
// Validation errors are supposed to return version-specific field
// paths, but this is probably close enough.
field.Invalid(field.NewPath("resourceVersion"), resourceVersion, err.Error()),

View File

@@ -16,11 +16,7 @@ limitations under the License.
package storage
import (
"testing"
"k8s.io/kubernetes/pkg/api/errors"
)
import "testing"
func TestEtcdParseWatchResourceVersion(t *testing.T) {
testCases := []struct {
@@ -42,7 +38,7 @@ func TestEtcdParseWatchResourceVersion(t *testing.T) {
t.Errorf("%s: unexpected non-error", testCase.Version)
continue
}
if !errors.IsInvalid(err) {
if !IsInvalidError(err) {
t.Errorf("%s: unexpected error: %v", testCase.Version, err)
continue
}