etcd (v3) store: implements KV methods of storage.Interface

This implements Get(), Create(), Delete(), GetToList(),
List(), GuaranteedUpdate().
This commit is contained in:
Hongchao Deng 2016-03-23 10:29:54 -07:00
parent e2ef27ee8a
commit 00ddf0671d
5 changed files with 979 additions and 10 deletions

View File

@ -169,3 +169,13 @@ func (m MultiObjectTyper) IsUnversioned(obj Object) (bool, bool) {
}
return false, false
}
// SetZeroValue would set the object of objPtr to zero value of its type.
func SetZeroValue(objPtr Object) error {
v, err := conversion.EnforcePtr(objPtr)
if err != nil {
return err
}
v.Set(reflect.Zero(v.Type()))
return nil
}

View File

@ -26,6 +26,7 @@ const (
ErrCodeKeyNotFound int = iota + 1
ErrCodeKeyExists
ErrCodeResourceVersionConflicts
ErrCodeInvalidObj
ErrCodeUnreachable
)
@ -33,6 +34,7 @@ var errCodeToMessage = map[int]string{
ErrCodeKeyNotFound: "key not found",
ErrCodeKeyExists: "key exists",
ErrCodeResourceVersionConflicts: "resource version conflicts",
ErrCodeInvalidObj: "invalid object",
ErrCodeUnreachable: "server unreachable",
}
@ -68,15 +70,24 @@ func NewUnreachableError(key string, rv int64) *StorageError {
}
}
func NewInvalidObjError(key, msg string) *StorageError {
return &StorageError{
Code: ErrCodeInvalidObj,
Key: key,
AdditionalErrorMsg: msg,
}
}
type StorageError struct {
Code int
Key string
ResourceVersion int64
Code int
Key string
ResourceVersion int64
AdditionalErrorMsg string
}
func (e *StorageError) Error() string {
return fmt.Sprintf("StorageError: %s, Code: %d, Key: %s, ResourceVersion: %d",
errCodeToMessage[e.Code], e.Code, e.Key, e.ResourceVersion)
return fmt.Sprintf("StorageError: %s, Code: %d, Key: %s, ResourceVersion: %d, AdditionalErrorMsg: %s",
errCodeToMessage[e.Code], e.Code, e.Key, e.ResourceVersion, e.AdditionalErrorMsg)
}
// IsNotFound returns true if and only if err is "key" not found error.
@ -96,15 +107,24 @@ func IsUnreachable(err error) bool {
// IsTestFailed returns true if and only if err is a write conflict.
func IsTestFailed(err error) bool {
return isErrCode(err, ErrCodeResourceVersionConflicts)
return isErrCode(err, ErrCodeResourceVersionConflicts, ErrCodeInvalidObj)
}
func isErrCode(err error, code int) bool {
// IsInvalidUID returns true if and only if err is invalid UID error
func IsInvalidObj(err error) bool {
return isErrCode(err, ErrCodeInvalidObj)
}
func isErrCode(err error, codes ...int) bool {
if err == nil {
return false
}
if e, ok := err.(*StorageError); ok {
return e.Code == code
for _, code := range codes {
if e.Code == code {
return true
}
}
}
return false
}

424
pkg/storage/etcd3/store.go Normal file
View File

@ -0,0 +1,424 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"bytes"
"errors"
"fmt"
"path"
"reflect"
"strings"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/watch"
"github.com/coreos/etcd/clientv3"
"github.com/golang/glog"
"golang.org/x/net/context"
)
type store struct {
client *clientv3.Client
codec runtime.Codec
versioner storage.Versioner
pathPrefix string
}
type elemForDecode struct {
data []byte
rev uint64
}
type objState struct {
obj runtime.Object
meta *storage.ResponseMeta
rev int64
data []byte
}
func newStore(c *clientv3.Client, codec runtime.Codec, prefix string) *store {
return &store{
client: c,
versioner: etcd.APIObjectVersioner{},
codec: codec,
pathPrefix: prefix,
}
}
// Backends implements storage.Interface.Backends.
func (s *store) Backends(ctx context.Context) []string {
resp, err := s.client.MemberList(ctx)
if err != nil {
glog.Errorf("Error obtaining etcd members list: %q", err)
return nil
}
var mlist []string
for _, member := range resp.Members {
mlist = append(mlist, member.ClientURLs...)
}
return mlist
}
// Codec implements storage.Interface.Codec.
func (s *store) Codec() runtime.Codec {
return s.codec
}
// Versioner implements storage.Interface.Versioner.
func (s *store) Versioner() storage.Versioner {
return s.versioner
}
// Get implements storage.Interface.Get.
func (s *store) Get(ctx context.Context, key string, out runtime.Object, ignoreNotFound bool) error {
key = keyWithPrefix(s.pathPrefix, key)
getResp, err := s.client.KV.Get(ctx, key)
if err != nil {
return err
}
if len(getResp.Kvs) == 0 {
if ignoreNotFound {
return runtime.SetZeroValue(out)
}
return storage.NewKeyNotFoundError(key, 0)
}
kv := getResp.Kvs[0]
return decode(s.codec, s.versioner, kv.Value, out, kv.ModRevision)
}
// Create implements storage.Interface.Create.
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion should not be set on objects to be created")
}
data, err := runtime.Encode(s.codec, obj)
if err != nil {
return err
}
key = keyWithPrefix(s.pathPrefix, key)
txnResp, err := s.client.KV.Txn(ctx).If(
notFound(key),
).Then(
clientv3.OpPut(key, string(data)),
).Commit()
if err != nil {
return err
}
if !txnResp.Succeeded {
return storage.NewKeyExistsError(key, 0)
}
if out != nil {
putResp := txnResp.Responses[0].GetResponsePut()
return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
}
return nil
}
// Delete implements storage.Interface.Delete.
func (s *store) Delete(ctx context.Context, key string, out runtime.Object, precondtions *storage.Preconditions) error {
v, err := conversion.EnforcePtr(out)
if err != nil {
panic("unable to convert output object to pointer")
}
key = keyWithPrefix(s.pathPrefix, key)
if precondtions == nil {
return s.unconditionalDelete(ctx, key, out)
}
return s.conditionalDelete(ctx, key, out, v, precondtions)
}
func (s *store) unconditionalDelete(ctx context.Context, key string, out runtime.Object) error {
// We need to do get and delete in single transaction in order to
// know the value and revision before deleting it.
txnResp, err := s.client.KV.Txn(ctx).If().Then(
clientv3.OpGet(key),
clientv3.OpDelete(key),
).Commit()
if err != nil {
return err
}
getResp := txnResp.Responses[0].GetResponseRange()
if len(getResp.Kvs) == 0 {
return storage.NewKeyNotFoundError(key, 0)
}
kv := getResp.Kvs[0]
return decode(s.codec, s.versioner, kv.Value, out, kv.ModRevision)
}
func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, precondtions *storage.Preconditions) error {
getResp, err := s.client.KV.Get(ctx, key)
if err != nil {
return err
}
for {
origState, err := s.getState(getResp, key, v, false)
if err != nil {
return err
}
if err := checkPreconditions(key, precondtions, origState.obj); err != nil {
return err
}
txnResp, err := s.client.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModifiedRevision(key), "=", origState.rev),
).Then(
clientv3.OpDelete(key),
).Else(
clientv3.OpGet(key),
).Commit()
if err != nil {
return err
}
if !txnResp.Succeeded {
getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
glog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
continue
}
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
}
}
// GuaranteedUpdate implements storage.Interface.GuaranteedUpdate.
func (s *store) GuaranteedUpdate(ctx context.Context, key string, out runtime.Object, ignoreNotFound bool, precondtions *storage.Preconditions, tryUpdate storage.UpdateFunc) error {
v, err := conversion.EnforcePtr(out)
if err != nil {
panic("unable to convert output object to pointer")
}
key = keyWithPrefix(s.pathPrefix, key)
getResp, err := s.client.KV.Get(ctx, key)
if err != nil {
return err
}
for {
origState, err := s.getState(getResp, key, v, ignoreNotFound)
if err != nil {
return err
}
if err := checkPreconditions(key, precondtions, origState.obj); err != nil {
return err
}
ret, err := s.updateState(origState, tryUpdate)
if err != nil {
return err
}
data, err := runtime.Encode(s.codec, ret)
if err != nil {
return err
}
if bytes.Equal(data, origState.data) {
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
}
txnResp, err := s.client.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModifiedRevision(key), "=", origState.rev),
).Then(
clientv3.OpPut(key, string(data)),
).Else(
clientv3.OpGet(key),
).Commit()
if err != nil {
return err
}
if !txnResp.Succeeded {
getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
glog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key)
continue
}
putResp := txnResp.Responses[0].GetResponsePut()
return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
}
}
// GetToList implements storage.Interface.GetToList.
func (s *store) GetToList(ctx context.Context, key string, filter storage.FilterFunc, listObj runtime.Object) error {
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
}
key = keyWithPrefix(s.pathPrefix, key)
getResp, err := s.client.KV.Get(ctx, key)
if err != nil {
return err
}
if len(getResp.Kvs) == 0 {
return nil
}
elems := []*elemForDecode{{
data: getResp.Kvs[0].Value,
rev: uint64(getResp.Kvs[0].ModRevision),
}}
if err := decodeList(elems, filter, listPtr, s.codec, s.versioner); err != nil {
return err
}
// update version with cluster level revision
return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision))
}
// List implements storage.Interface.List.
func (s *store) List(ctx context.Context, key, resourceVersion string, filter storage.FilterFunc, listObj runtime.Object) error {
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
}
key = keyWithPrefix(s.pathPrefix, key)
// We need to make sure the key ended with "/" so that we only get children "directories".
// e.g. if we have key "/a", "/a/b", "/ab", getting keys with prefix "/a" will return all three,
// while with prefix "/a/" will return only "/a/b" which is the correct answer.
if !strings.HasSuffix(key, "/") {
key += "/"
}
getResp, err := s.client.KV.Get(ctx, key, clientv3.WithPrefix())
if err != nil {
return err
}
elems := make([]*elemForDecode, len(getResp.Kvs))
for i, kv := range getResp.Kvs {
elems[i] = &elemForDecode{
data: kv.Value,
rev: uint64(kv.ModRevision),
}
}
if err := decodeList(elems, filter, listPtr, s.codec, s.versioner); err != nil {
return err
}
// update version with cluster level revision
return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision))
}
// Watch implements storage.Interface.Watch.
func (s *store) Watch(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc) (watch.Interface, error) {
panic("TODO: unimplemented")
}
// WatchList implements storage.Interface.WatchList.
func (s *store) WatchList(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc) (watch.Interface, error) {
panic("TODO: unimplemented")
}
func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {
state := &objState{
obj: reflect.New(v.Type()).Interface().(runtime.Object),
meta: &storage.ResponseMeta{},
}
if len(getResp.Kvs) == 0 {
if !ignoreNotFound {
return nil, storage.NewKeyNotFoundError(key, 0)
}
if err := runtime.SetZeroValue(state.obj); err != nil {
return nil, err
}
} else {
state.rev = getResp.Kvs[0].ModRevision
state.meta.ResourceVersion = uint64(state.rev)
state.data = getResp.Kvs[0].Value
if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil {
return nil, err
}
}
return state, nil
}
func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtime.Object, error) {
ret, _, err := userUpdate(st.obj, *st.meta)
version, err := s.versioner.ObjectResourceVersion(ret)
if err != nil {
return nil, err
}
if version != 0 {
// We cannot store object with resourceVersion in etcd. We need to reset it.
if err := s.versioner.UpdateObject(ret, nil, 0); err != nil {
return nil, fmt.Errorf("UpdateObject failed: %v", err)
}
}
return ret, nil
}
func keyWithPrefix(prefix, key string) string {
if strings.HasPrefix(key, prefix) {
return key
}
return path.Join(prefix, key)
}
// decode decodes value of bytes into object. It will also set the object resource version to rev.
// On success, objPtr would be set to the object.
func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objPtr runtime.Object, rev int64) error {
if _, err := conversion.EnforcePtr(objPtr); err != nil {
panic("unable to convert output object to pointer")
}
_, _, err := codec.Decode(value, nil, objPtr)
if err != nil {
return err
}
// being unable to set the version does not prevent the object from being extracted
versioner.UpdateObject(objPtr, nil, uint64(rev))
return nil
}
// decodeList decodes a list of values into a list of objects, with resource version set to corresponding rev.
// On success, ListPtr would be set to the list of objects.
func decodeList(elems []*elemForDecode, filter storage.FilterFunc, ListPtr interface{}, codec runtime.Codec, versioner storage.Versioner) error {
v, err := conversion.EnforcePtr(ListPtr)
if err != nil || v.Kind() != reflect.Slice {
panic("need ptr to slice")
}
for _, elem := range elems {
obj, _, err := codec.Decode(elem.data, nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object))
if err != nil {
return err
}
// being unable to set the version does not prevent the object from being extracted
versioner.UpdateObject(obj, nil, elem.rev)
if filter(obj) {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
}
}
return nil
}
func checkPreconditions(key string, preconditions *storage.Preconditions, out runtime.Object) error {
if preconditions == nil {
return nil
}
objMeta, err := api.ObjectMetaFor(out)
if err != nil {
return storage.NewInternalErrorf("can't enforce preconditions %v on un-introspectable object %v, got error: %v", *preconditions, out, err)
}
if preconditions.UID != nil && *preconditions.UID != objMeta.UID {
errMsg := fmt.Sprintf("Precondition failed: UID in precondition: %v, UID in object meta: %v", preconditions.UID, objMeta.UID)
return storage.NewInvalidObjError(key, errMsg)
}
return nil
}
func notFound(key string) clientv3.Cmp {
return clientv3.Compare(clientv3.ModifiedRevision(key), "=", 0)
}

View File

@ -0,0 +1,510 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"fmt"
"reflect"
"sync"
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"github.com/coreos/etcd/integration"
"golang.org/x/net/context"
)
func TestCreate(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
etcdClient := cluster.RandClient()
key := "/testkey"
out := &api.Pod{}
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
// verify that kv pair is empty before set
getResp, err := etcdClient.KV.Get(ctx, key)
if err != nil {
t.Fatalf("etcdClient.KV.Get failed: %v", err)
}
if len(getResp.Kvs) != 0 {
t.Fatalf("expecting empty result on key: %s", key)
}
err = store.Create(ctx, key, obj, out, 0)
if err != nil {
t.Fatalf("Set failed: %v", err)
}
// basic tests of the output
if obj.ObjectMeta.Name != out.ObjectMeta.Name {
t.Errorf("pod name want=%s, get=%s", obj.ObjectMeta.Name, out.ObjectMeta.Name)
}
if out.ResourceVersion == "" {
t.Errorf("output should have non-empty resource version")
}
// verify that kv pair is not empty after set
getResp, err = etcdClient.KV.Get(ctx, key)
if err != nil {
t.Fatalf("etcdClient.KV.Get failed: %v", err)
}
if len(getResp.Kvs) == 0 {
t.Fatalf("expecting non empty result on key: %s", key)
}
}
func TestCreateWithKeyExist(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
key, _ := testPropogateStore(t, store, ctx, obj)
out := &api.Pod{}
err := store.Create(ctx, key, obj, out, 0)
if err == nil || !storage.IsNodeExist(err) {
t.Errorf("expecting key exists error, but get: %s", err)
}
}
func TestGet(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
key, storedObj := testPropogateStore(t, store, ctx, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}})
tests := []struct {
key string
ignoreNotFound bool
expectNotFoundErr bool
expectedOut *api.Pod
}{{ // test get on existing item
key: key,
ignoreNotFound: false,
expectNotFoundErr: false,
expectedOut: storedObj,
}, { // test get on non-existing item with ignoreNotFound=false
key: "/non-existing",
ignoreNotFound: false,
expectNotFoundErr: true,
}, { // test get on non-existing item with ignoreNotFound=true
key: "/non-existing",
ignoreNotFound: true,
expectNotFoundErr: false,
expectedOut: &api.Pod{},
}}
for i, tt := range tests {
out := &api.Pod{}
err := store.Get(ctx, tt.key, out, tt.ignoreNotFound)
if tt.expectNotFoundErr {
if err == nil || !storage.IsNotFound(err) {
t.Errorf("#%d: expecting not found error, but get: %s", i, err)
}
continue
}
if err != nil {
t.Fatalf("Get failed: %v", err)
}
if !reflect.DeepEqual(tt.expectedOut, out) {
t.Errorf("#%d: pod want=%#v, get=%#v", i, tt.expectedOut, out)
}
}
}
func TestUnconditionalDelete(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
key, storedObj := testPropogateStore(t, store, ctx, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}})
tests := []struct {
key string
expectedObj *api.Pod
expectNotFoundErr bool
}{{ // test unconditional delete on existing key
key: key,
expectedObj: storedObj,
expectNotFoundErr: false,
}, { // test unconditional delete on non-existing key
key: "/non-existing",
expectedObj: nil,
expectNotFoundErr: true,
}}
for i, tt := range tests {
out := &api.Pod{} // reset
err := store.Delete(ctx, tt.key, out, nil)
if tt.expectNotFoundErr {
if err == nil || !storage.IsNotFound(err) {
t.Errorf("#%d: expecting not found error, but get: %s", i, err)
}
continue
}
if err != nil {
t.Fatalf("Delete failed: %v", err)
}
if !reflect.DeepEqual(tt.expectedObj, out) {
t.Errorf("#%d: pod want=%#v, get=%#v", i, tt.expectedObj, out)
}
}
}
func TestConditionalDelete(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
key, storedObj := testPropogateStore(t, store, ctx, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", UID: "A"}})
tests := []struct {
precondition *storage.Preconditions
expectInvalidObjErr bool
}{{ // test conditional delete with UID match
precondition: storage.NewUIDPreconditions("A"),
expectInvalidObjErr: false,
}, { // test conditional delete with UID mismatch
precondition: storage.NewUIDPreconditions("B"),
expectInvalidObjErr: true,
}}
for i, tt := range tests {
out := &api.Pod{}
err := store.Delete(ctx, key, out, tt.precondition)
if tt.expectInvalidObjErr {
if err == nil || !storage.IsInvalidObj(err) {
t.Errorf("#%d: expecting invalid UID error, but get: %s", i, err)
}
continue
}
if err != nil {
t.Fatalf("Delete failed: %v", err)
}
if !reflect.DeepEqual(storedObj, out) {
t.Errorf("#%d: pod want=%#v, get=%#v", i, storedObj, out)
}
key, storedObj = testPropogateStore(t, store, ctx, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", UID: "A"}})
}
}
func TestGetToList(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
key, storedObj := testPropogateStore(t, store, ctx, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}})
tests := []struct {
key string
filter storage.FilterFunc
expectedOut []*api.Pod
}{{ // test GetToList on existing key
key: key,
filter: storage.Everything,
expectedOut: []*api.Pod{storedObj},
}, { // test GetToList on non-existing key
key: "/non-existing",
filter: storage.Everything,
expectedOut: nil,
}, { // test GetToList with filter to reject the pod
key: "/non-existing",
filter: func(obj runtime.Object) bool {
pod, ok := obj.(*api.Pod)
if !ok {
t.Fatal("It should be able to convert obj to *api.Pod")
}
return pod.Name != storedObj.Name
},
expectedOut: nil,
}}
for i, tt := range tests {
out := &api.PodList{}
err := store.GetToList(ctx, tt.key, tt.filter, out)
if err != nil {
t.Fatalf("GetToList failed: %v", err)
}
if len(out.Items) != len(tt.expectedOut) {
t.Errorf("#%d: length of list want=%d, get=%d", i, len(tt.expectedOut), len(out.Items))
continue
}
for j, wantPod := range tt.expectedOut {
getPod := &out.Items[j]
if !reflect.DeepEqual(wantPod, getPod) {
t.Errorf("#%d: pod want=%#v, get=%#v", i, wantPod, getPod)
}
}
}
}
func TestGuaranteedUpdate(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
key, storeObj := testPropogateStore(t, store, ctx, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", UID: "A"}})
tests := []struct {
key string
name string
ignoreNotFound bool
precondition *storage.Preconditions
expectNotFoundErr bool
expectInvalidObjErr bool
expectNoUpdate bool
}{{ // GuaranteedUpdate on non-existing key with ignoreNotFound=false
key: "/non-existing",
ignoreNotFound: false,
precondition: nil,
expectNotFoundErr: true,
expectInvalidObjErr: false,
expectNoUpdate: false,
}, { // GuaranteedUpdate on non-existing key with ignoreNotFound=true
key: "/non-existing",
ignoreNotFound: true,
precondition: nil,
expectNotFoundErr: false,
expectInvalidObjErr: false,
expectNoUpdate: false,
}, { // GuaranteedUpdate on existing key
key: key,
ignoreNotFound: false,
precondition: nil,
expectNotFoundErr: false,
expectInvalidObjErr: false,
expectNoUpdate: false,
}, { // GuaranteedUpdate with same data
key: key,
ignoreNotFound: false,
precondition: nil,
expectNotFoundErr: false,
expectInvalidObjErr: false,
expectNoUpdate: true,
}, { // GuaranteedUpdate with UID match
key: key,
ignoreNotFound: false,
precondition: storage.NewUIDPreconditions("A"),
expectNotFoundErr: false,
expectInvalidObjErr: false,
expectNoUpdate: true,
}, { // GuaranteedUpdate with UID mismatch
key: key,
ignoreNotFound: false,
precondition: storage.NewUIDPreconditions("B"),
expectNotFoundErr: false,
expectInvalidObjErr: true,
expectNoUpdate: true,
}}
for i, tt := range tests {
out := &api.Pod{}
name := fmt.Sprintf("foo-%d", i)
if tt.expectNoUpdate {
name = storeObj.Name
}
version := storeObj.ResourceVersion
err := store.GuaranteedUpdate(ctx, tt.key, out, tt.ignoreNotFound, tt.precondition,
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
if tt.expectNotFoundErr && tt.ignoreNotFound {
if pod := obj.(*api.Pod); pod.Name != "" {
t.Errorf("#%d: expecting zero value, but get=%#v", i, pod)
}
}
pod := *storeObj
pod.Name = name
return &pod, nil
}))
if tt.expectNotFoundErr {
if err == nil || !storage.IsNotFound(err) {
t.Errorf("#%d: expecting not found error, but get: %v", i, err)
}
continue
}
if tt.expectInvalidObjErr {
if err == nil || !storage.IsInvalidObj(err) {
t.Errorf("#%d: expecting invalid UID error, but get: %s", i, err)
}
continue
}
if err != nil {
t.Fatalf("GuaranteedUpdate failed: %v", err)
}
if out.ObjectMeta.Name != name {
t.Errorf("#%d: pod name want=%s, get=%s", i, name, out.ObjectMeta.Name)
}
switch tt.expectNoUpdate {
case true:
if version != out.ResourceVersion {
t.Errorf("#%d: expect no version change, before=%s, after=%s", i, version, out.ResourceVersion)
}
case false:
if version == out.ResourceVersion {
t.Errorf("#%d: expect version change, but get the same version=%s", i, version)
}
}
storeObj = out
}
}
func TestGuaranteedUpdateWithConflict(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
key, _ := testPropogateStore(t, store, ctx, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}})
errChan := make(chan error, 1)
var firstToFinish sync.WaitGroup
var secondToEnter sync.WaitGroup
firstToFinish.Add(1)
secondToEnter.Add(1)
go func() {
err := store.GuaranteedUpdate(ctx, key, &api.Pod{}, false, nil,
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
pod := obj.(*api.Pod)
pod.Name = "foo-1"
secondToEnter.Wait()
return pod, nil
}))
firstToFinish.Done()
errChan <- err
}()
updateCount := 0
err := store.GuaranteedUpdate(ctx, key, &api.Pod{}, false, nil,
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
if updateCount == 0 {
secondToEnter.Done()
firstToFinish.Wait()
}
updateCount++
pod := obj.(*api.Pod)
pod.Name = "foo-2"
return pod, nil
}))
if err != nil {
t.Fatalf("Second GuaranteedUpdate error %#v", err)
}
if err := <-errChan; err != nil {
t.Fatalf("First GuaranteedUpdate error %#v", err)
}
if updateCount != 2 {
t.Errorf("Should have conflict and called update func twice")
}
}
func TestList(t *testing.T) {
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), testapi.Default.Codec(), "")
ctx := context.Background()
// Setup storage with the following structure:
// /
// - one-level/
// | - test
// |
// - two-level/
// - 1/
// | - test
// |
// - 2/
// - test
preset := []struct {
key string
obj *api.Pod
storedObj *api.Pod
}{{
key: "/one-level/test",
obj: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}},
}, {
key: "/two-level/1/test",
obj: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}},
}, {
key: "/two-level/2/test",
obj: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}},
}}
for i, ps := range preset {
preset[i].storedObj = &api.Pod{}
err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0)
if err != nil {
t.Fatalf("Set failed: %v", err)
}
}
tests := []struct {
prefix string
filter storage.FilterFunc
expectedOut []*api.Pod
}{{ // test List on existing key
prefix: "/one-level/",
filter: storage.Everything,
expectedOut: []*api.Pod{preset[0].storedObj},
}, { // test List on non-existing key
prefix: "/non-existing/",
filter: storage.Everything,
expectedOut: nil,
}, { // test List with filter
prefix: "/one-level/",
filter: func(obj runtime.Object) bool {
pod, ok := obj.(*api.Pod)
if !ok {
t.Fatal("It should be able to convert obj to *api.Pod")
}
return pod.Name != preset[0].storedObj.Name
},
expectedOut: nil,
}, { // test List with multiple levels of directories and expect flattened result
prefix: "/two-level/",
filter: storage.Everything,
expectedOut: []*api.Pod{preset[1].storedObj, preset[2].storedObj},
}}
for i, tt := range tests {
out := &api.PodList{}
err := store.List(ctx, tt.prefix, "0", tt.filter, out)
if err != nil {
t.Fatalf("List failed: %v", err)
}
if len(tt.expectedOut) != len(out.Items) {
t.Errorf("#%d: length of list want=%d, get=%d", i, len(tt.expectedOut), len(out.Items))
continue
}
for j, wantPod := range tt.expectedOut {
getPod := &out.Items[j]
if !reflect.DeepEqual(wantPod, getPod) {
t.Errorf("#%d: pod want=%#v, get=%#v", i, wantPod, getPod)
}
}
}
}
func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
store := newStore(cluster.RandClient(), testapi.Default.Codec(), "")
ctx := context.Background()
return ctx, store, cluster
}
// testPropogateStore helps propogates store with objects, automates key generation, and returns
// keys and stored objects.
func testPropogateStore(t *testing.T, store *store, ctx context.Context, obj *api.Pod) (string, *api.Pod) {
// Setup store with a key and grab the output for returning.
key := "/testkey"
setOutput := &api.Pod{}
err := store.Create(ctx, key, obj, setOutput, 0)
if err != nil {
t.Fatalf("Set failed: %v", err)
}
return key, setOutput
}

View File

@ -104,6 +104,7 @@ type Interface interface {
Set(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error
// Delete removes the specified key and returns the value that existed at that spot.
// If key didn't exist, it will return NotFound storage error.
Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions) error
// Watch begins watching the specified key. Events are decoded into API objects,
@ -137,9 +138,13 @@ type Interface interface {
// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType')
// retrying the update until success if there is index conflict.
// Note that object passed to tryUpdate may change acress incovations of tryUpdate() if
// other writers are simultaneously updateing it, to tryUpdate() needs to take into account
// Note that object passed to tryUpdate may change across invocations of tryUpdate() if
// other writers are simultaneously updating it, to tryUpdate() needs to take into account
// the current contents of the object when deciding how the update object should look.
// If the key doesn't exist, it will return NotFound storage error if ignoreNotFound=false
// or zero value in 'ptrToType' parameter otherwise.
// If the object to update has the same value as previous, it won't do any update
// but will return the object in 'ptrToType' parameter.
//
// Example:
//