From 00ddf0671d91a9ad6e4316e70af07c38cefb9d35 Mon Sep 17 00:00:00 2001 From: Hongchao Deng Date: Wed, 23 Mar 2016 10:29:54 -0700 Subject: [PATCH] etcd (v3) store: implements KV methods of storage.Interface This implements Get(), Create(), Delete(), GetToList(), List(), GuaranteedUpdate(). --- pkg/runtime/helper.go | 10 + pkg/storage/errors.go | 36 ++- pkg/storage/etcd3/store.go | 424 ++++++++++++++++++++++++++ pkg/storage/etcd3/store_test.go | 510 ++++++++++++++++++++++++++++++++ pkg/storage/interfaces.go | 9 +- 5 files changed, 979 insertions(+), 10 deletions(-) create mode 100644 pkg/storage/etcd3/store.go create mode 100644 pkg/storage/etcd3/store_test.go diff --git a/pkg/runtime/helper.go b/pkg/runtime/helper.go index 351cef52a25..ac23e3a2689 100644 --- a/pkg/runtime/helper.go +++ b/pkg/runtime/helper.go @@ -169,3 +169,13 @@ func (m MultiObjectTyper) IsUnversioned(obj Object) (bool, bool) { } return false, false } + +// SetZeroValue would set the object of objPtr to zero value of its type. +func SetZeroValue(objPtr Object) error { + v, err := conversion.EnforcePtr(objPtr) + if err != nil { + return err + } + v.Set(reflect.Zero(v.Type())) + return nil +} diff --git a/pkg/storage/errors.go b/pkg/storage/errors.go index 89d55980ba9..61b3cba52c7 100644 --- a/pkg/storage/errors.go +++ b/pkg/storage/errors.go @@ -26,6 +26,7 @@ const ( ErrCodeKeyNotFound int = iota + 1 ErrCodeKeyExists ErrCodeResourceVersionConflicts + ErrCodeInvalidObj ErrCodeUnreachable ) @@ -33,6 +34,7 @@ var errCodeToMessage = map[int]string{ ErrCodeKeyNotFound: "key not found", ErrCodeKeyExists: "key exists", ErrCodeResourceVersionConflicts: "resource version conflicts", + ErrCodeInvalidObj: "invalid object", ErrCodeUnreachable: "server unreachable", } @@ -68,15 +70,24 @@ func NewUnreachableError(key string, rv int64) *StorageError { } } +func NewInvalidObjError(key, msg string) *StorageError { + return &StorageError{ + Code: ErrCodeInvalidObj, + Key: key, + AdditionalErrorMsg: msg, + } +} + type StorageError struct { - Code int - Key string - ResourceVersion int64 + Code int + Key string + ResourceVersion int64 + AdditionalErrorMsg string } func (e *StorageError) Error() string { - return fmt.Sprintf("StorageError: %s, Code: %d, Key: %s, ResourceVersion: %d", - errCodeToMessage[e.Code], e.Code, e.Key, e.ResourceVersion) + return fmt.Sprintf("StorageError: %s, Code: %d, Key: %s, ResourceVersion: %d, AdditionalErrorMsg: %s", + errCodeToMessage[e.Code], e.Code, e.Key, e.ResourceVersion, e.AdditionalErrorMsg) } // IsNotFound returns true if and only if err is "key" not found error. @@ -96,15 +107,24 @@ func IsUnreachable(err error) bool { // IsTestFailed returns true if and only if err is a write conflict. func IsTestFailed(err error) bool { - return isErrCode(err, ErrCodeResourceVersionConflicts) + return isErrCode(err, ErrCodeResourceVersionConflicts, ErrCodeInvalidObj) } -func isErrCode(err error, code int) bool { +// IsInvalidUID returns true if and only if err is invalid UID error +func IsInvalidObj(err error) bool { + return isErrCode(err, ErrCodeInvalidObj) +} + +func isErrCode(err error, codes ...int) bool { if err == nil { return false } if e, ok := err.(*StorageError); ok { - return e.Code == code + for _, code := range codes { + if e.Code == code { + return true + } + } } return false } diff --git a/pkg/storage/etcd3/store.go b/pkg/storage/etcd3/store.go new file mode 100644 index 00000000000..5ccf5e57872 --- /dev/null +++ b/pkg/storage/etcd3/store.go @@ -0,0 +1,424 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd3 + +import ( + "bytes" + "errors" + "fmt" + "path" + "reflect" + "strings" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/meta" + "k8s.io/kubernetes/pkg/conversion" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" + "k8s.io/kubernetes/pkg/storage/etcd" + "k8s.io/kubernetes/pkg/watch" + + "github.com/coreos/etcd/clientv3" + "github.com/golang/glog" + "golang.org/x/net/context" +) + +type store struct { + client *clientv3.Client + codec runtime.Codec + versioner storage.Versioner + pathPrefix string +} + +type elemForDecode struct { + data []byte + rev uint64 +} + +type objState struct { + obj runtime.Object + meta *storage.ResponseMeta + rev int64 + data []byte +} + +func newStore(c *clientv3.Client, codec runtime.Codec, prefix string) *store { + return &store{ + client: c, + versioner: etcd.APIObjectVersioner{}, + codec: codec, + pathPrefix: prefix, + } +} + +// Backends implements storage.Interface.Backends. +func (s *store) Backends(ctx context.Context) []string { + resp, err := s.client.MemberList(ctx) + if err != nil { + glog.Errorf("Error obtaining etcd members list: %q", err) + return nil + } + var mlist []string + for _, member := range resp.Members { + mlist = append(mlist, member.ClientURLs...) + } + return mlist +} + +// Codec implements storage.Interface.Codec. +func (s *store) Codec() runtime.Codec { + return s.codec +} + +// Versioner implements storage.Interface.Versioner. +func (s *store) Versioner() storage.Versioner { + return s.versioner +} + +// Get implements storage.Interface.Get. +func (s *store) Get(ctx context.Context, key string, out runtime.Object, ignoreNotFound bool) error { + key = keyWithPrefix(s.pathPrefix, key) + getResp, err := s.client.KV.Get(ctx, key) + if err != nil { + return err + } + + if len(getResp.Kvs) == 0 { + if ignoreNotFound { + return runtime.SetZeroValue(out) + } + return storage.NewKeyNotFoundError(key, 0) + } + kv := getResp.Kvs[0] + return decode(s.codec, s.versioner, kv.Value, out, kv.ModRevision) +} + +// Create implements storage.Interface.Create. +func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { + if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 { + return errors.New("resourceVersion should not be set on objects to be created") + } + data, err := runtime.Encode(s.codec, obj) + if err != nil { + return err + } + key = keyWithPrefix(s.pathPrefix, key) + + txnResp, err := s.client.KV.Txn(ctx).If( + notFound(key), + ).Then( + clientv3.OpPut(key, string(data)), + ).Commit() + if err != nil { + return err + } + if !txnResp.Succeeded { + return storage.NewKeyExistsError(key, 0) + } + + if out != nil { + putResp := txnResp.Responses[0].GetResponsePut() + return decode(s.codec, s.versioner, data, out, putResp.Header.Revision) + } + return nil +} + +// Delete implements storage.Interface.Delete. +func (s *store) Delete(ctx context.Context, key string, out runtime.Object, precondtions *storage.Preconditions) error { + v, err := conversion.EnforcePtr(out) + if err != nil { + panic("unable to convert output object to pointer") + } + key = keyWithPrefix(s.pathPrefix, key) + if precondtions == nil { + return s.unconditionalDelete(ctx, key, out) + } + return s.conditionalDelete(ctx, key, out, v, precondtions) +} + +func (s *store) unconditionalDelete(ctx context.Context, key string, out runtime.Object) error { + // We need to do get and delete in single transaction in order to + // know the value and revision before deleting it. + txnResp, err := s.client.KV.Txn(ctx).If().Then( + clientv3.OpGet(key), + clientv3.OpDelete(key), + ).Commit() + if err != nil { + return err + } + getResp := txnResp.Responses[0].GetResponseRange() + if len(getResp.Kvs) == 0 { + return storage.NewKeyNotFoundError(key, 0) + } + + kv := getResp.Kvs[0] + return decode(s.codec, s.versioner, kv.Value, out, kv.ModRevision) +} + +func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, precondtions *storage.Preconditions) error { + getResp, err := s.client.KV.Get(ctx, key) + if err != nil { + return err + } + for { + origState, err := s.getState(getResp, key, v, false) + if err != nil { + return err + } + if err := checkPreconditions(key, precondtions, origState.obj); err != nil { + return err + } + txnResp, err := s.client.KV.Txn(ctx).If( + clientv3.Compare(clientv3.ModifiedRevision(key), "=", origState.rev), + ).Then( + clientv3.OpDelete(key), + ).Else( + clientv3.OpGet(key), + ).Commit() + if err != nil { + return err + } + if !txnResp.Succeeded { + getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange()) + glog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key) + continue + } + return decode(s.codec, s.versioner, origState.data, out, origState.rev) + } +} + +// GuaranteedUpdate implements storage.Interface.GuaranteedUpdate. +func (s *store) GuaranteedUpdate(ctx context.Context, key string, out runtime.Object, ignoreNotFound bool, precondtions *storage.Preconditions, tryUpdate storage.UpdateFunc) error { + v, err := conversion.EnforcePtr(out) + if err != nil { + panic("unable to convert output object to pointer") + } + key = keyWithPrefix(s.pathPrefix, key) + getResp, err := s.client.KV.Get(ctx, key) + if err != nil { + return err + } + for { + origState, err := s.getState(getResp, key, v, ignoreNotFound) + if err != nil { + return err + } + + if err := checkPreconditions(key, precondtions, origState.obj); err != nil { + return err + } + + ret, err := s.updateState(origState, tryUpdate) + if err != nil { + return err + } + + data, err := runtime.Encode(s.codec, ret) + if err != nil { + return err + } + if bytes.Equal(data, origState.data) { + return decode(s.codec, s.versioner, origState.data, out, origState.rev) + } + + txnResp, err := s.client.KV.Txn(ctx).If( + clientv3.Compare(clientv3.ModifiedRevision(key), "=", origState.rev), + ).Then( + clientv3.OpPut(key, string(data)), + ).Else( + clientv3.OpGet(key), + ).Commit() + if err != nil { + return err + } + if !txnResp.Succeeded { + getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange()) + glog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key) + continue + } + putResp := txnResp.Responses[0].GetResponsePut() + return decode(s.codec, s.versioner, data, out, putResp.Header.Revision) + } +} + +// GetToList implements storage.Interface.GetToList. +func (s *store) GetToList(ctx context.Context, key string, filter storage.FilterFunc, listObj runtime.Object) error { + listPtr, err := meta.GetItemsPtr(listObj) + if err != nil { + return err + } + key = keyWithPrefix(s.pathPrefix, key) + + getResp, err := s.client.KV.Get(ctx, key) + if err != nil { + return err + } + if len(getResp.Kvs) == 0 { + return nil + } + elems := []*elemForDecode{{ + data: getResp.Kvs[0].Value, + rev: uint64(getResp.Kvs[0].ModRevision), + }} + if err := decodeList(elems, filter, listPtr, s.codec, s.versioner); err != nil { + return err + } + // update version with cluster level revision + return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision)) +} + +// List implements storage.Interface.List. +func (s *store) List(ctx context.Context, key, resourceVersion string, filter storage.FilterFunc, listObj runtime.Object) error { + listPtr, err := meta.GetItemsPtr(listObj) + if err != nil { + return err + } + key = keyWithPrefix(s.pathPrefix, key) + // We need to make sure the key ended with "/" so that we only get children "directories". + // e.g. if we have key "/a", "/a/b", "/ab", getting keys with prefix "/a" will return all three, + // while with prefix "/a/" will return only "/a/b" which is the correct answer. + if !strings.HasSuffix(key, "/") { + key += "/" + } + getResp, err := s.client.KV.Get(ctx, key, clientv3.WithPrefix()) + if err != nil { + return err + } + + elems := make([]*elemForDecode, len(getResp.Kvs)) + for i, kv := range getResp.Kvs { + elems[i] = &elemForDecode{ + data: kv.Value, + rev: uint64(kv.ModRevision), + } + } + if err := decodeList(elems, filter, listPtr, s.codec, s.versioner); err != nil { + return err + } + // update version with cluster level revision + return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision)) +} + +// Watch implements storage.Interface.Watch. +func (s *store) Watch(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc) (watch.Interface, error) { + panic("TODO: unimplemented") +} + +// WatchList implements storage.Interface.WatchList. +func (s *store) WatchList(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc) (watch.Interface, error) { + panic("TODO: unimplemented") +} + +func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) { + state := &objState{ + obj: reflect.New(v.Type()).Interface().(runtime.Object), + meta: &storage.ResponseMeta{}, + } + if len(getResp.Kvs) == 0 { + if !ignoreNotFound { + return nil, storage.NewKeyNotFoundError(key, 0) + } + if err := runtime.SetZeroValue(state.obj); err != nil { + return nil, err + } + } else { + state.rev = getResp.Kvs[0].ModRevision + state.meta.ResourceVersion = uint64(state.rev) + state.data = getResp.Kvs[0].Value + if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil { + return nil, err + } + } + return state, nil +} + +func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtime.Object, error) { + ret, _, err := userUpdate(st.obj, *st.meta) + version, err := s.versioner.ObjectResourceVersion(ret) + if err != nil { + return nil, err + } + if version != 0 { + // We cannot store object with resourceVersion in etcd. We need to reset it. + if err := s.versioner.UpdateObject(ret, nil, 0); err != nil { + return nil, fmt.Errorf("UpdateObject failed: %v", err) + } + } + return ret, nil +} + +func keyWithPrefix(prefix, key string) string { + if strings.HasPrefix(key, prefix) { + return key + } + return path.Join(prefix, key) +} + +// decode decodes value of bytes into object. It will also set the object resource version to rev. +// On success, objPtr would be set to the object. +func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objPtr runtime.Object, rev int64) error { + if _, err := conversion.EnforcePtr(objPtr); err != nil { + panic("unable to convert output object to pointer") + } + _, _, err := codec.Decode(value, nil, objPtr) + if err != nil { + return err + } + // being unable to set the version does not prevent the object from being extracted + versioner.UpdateObject(objPtr, nil, uint64(rev)) + return nil +} + +// decodeList decodes a list of values into a list of objects, with resource version set to corresponding rev. +// On success, ListPtr would be set to the list of objects. +func decodeList(elems []*elemForDecode, filter storage.FilterFunc, ListPtr interface{}, codec runtime.Codec, versioner storage.Versioner) error { + v, err := conversion.EnforcePtr(ListPtr) + if err != nil || v.Kind() != reflect.Slice { + panic("need ptr to slice") + } + for _, elem := range elems { + obj, _, err := codec.Decode(elem.data, nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object)) + if err != nil { + return err + } + // being unable to set the version does not prevent the object from being extracted + versioner.UpdateObject(obj, nil, elem.rev) + if filter(obj) { + v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) + } + } + return nil +} + +func checkPreconditions(key string, preconditions *storage.Preconditions, out runtime.Object) error { + if preconditions == nil { + return nil + } + objMeta, err := api.ObjectMetaFor(out) + if err != nil { + return storage.NewInternalErrorf("can't enforce preconditions %v on un-introspectable object %v, got error: %v", *preconditions, out, err) + } + if preconditions.UID != nil && *preconditions.UID != objMeta.UID { + errMsg := fmt.Sprintf("Precondition failed: UID in precondition: %v, UID in object meta: %v", preconditions.UID, objMeta.UID) + return storage.NewInvalidObjError(key, errMsg) + } + return nil +} + +func notFound(key string) clientv3.Cmp { + return clientv3.Compare(clientv3.ModifiedRevision(key), "=", 0) +} diff --git a/pkg/storage/etcd3/store_test.go b/pkg/storage/etcd3/store_test.go new file mode 100644 index 00000000000..917a64af8e0 --- /dev/null +++ b/pkg/storage/etcd3/store_test.go @@ -0,0 +1,510 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd3 + +import ( + "fmt" + "reflect" + "sync" + "testing" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" + + "github.com/coreos/etcd/integration" + "golang.org/x/net/context" +) + +func TestCreate(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + etcdClient := cluster.RandClient() + + key := "/testkey" + out := &api.Pod{} + obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} + + // verify that kv pair is empty before set + getResp, err := etcdClient.KV.Get(ctx, key) + if err != nil { + t.Fatalf("etcdClient.KV.Get failed: %v", err) + } + if len(getResp.Kvs) != 0 { + t.Fatalf("expecting empty result on key: %s", key) + } + + err = store.Create(ctx, key, obj, out, 0) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + // basic tests of the output + if obj.ObjectMeta.Name != out.ObjectMeta.Name { + t.Errorf("pod name want=%s, get=%s", obj.ObjectMeta.Name, out.ObjectMeta.Name) + } + if out.ResourceVersion == "" { + t.Errorf("output should have non-empty resource version") + } + + // verify that kv pair is not empty after set + getResp, err = etcdClient.KV.Get(ctx, key) + if err != nil { + t.Fatalf("etcdClient.KV.Get failed: %v", err) + } + if len(getResp.Kvs) == 0 { + t.Fatalf("expecting non empty result on key: %s", key) + } +} + +func TestCreateWithKeyExist(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} + key, _ := testPropogateStore(t, store, ctx, obj) + out := &api.Pod{} + err := store.Create(ctx, key, obj, out, 0) + if err == nil || !storage.IsNodeExist(err) { + t.Errorf("expecting key exists error, but get: %s", err) + } +} + +func TestGet(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + key, storedObj := testPropogateStore(t, store, ctx, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}) + + tests := []struct { + key string + ignoreNotFound bool + expectNotFoundErr bool + expectedOut *api.Pod + }{{ // test get on existing item + key: key, + ignoreNotFound: false, + expectNotFoundErr: false, + expectedOut: storedObj, + }, { // test get on non-existing item with ignoreNotFound=false + key: "/non-existing", + ignoreNotFound: false, + expectNotFoundErr: true, + }, { // test get on non-existing item with ignoreNotFound=true + key: "/non-existing", + ignoreNotFound: true, + expectNotFoundErr: false, + expectedOut: &api.Pod{}, + }} + + for i, tt := range tests { + out := &api.Pod{} + err := store.Get(ctx, tt.key, out, tt.ignoreNotFound) + if tt.expectNotFoundErr { + if err == nil || !storage.IsNotFound(err) { + t.Errorf("#%d: expecting not found error, but get: %s", i, err) + } + continue + } + if err != nil { + t.Fatalf("Get failed: %v", err) + } + if !reflect.DeepEqual(tt.expectedOut, out) { + t.Errorf("#%d: pod want=%#v, get=%#v", i, tt.expectedOut, out) + } + } +} + +func TestUnconditionalDelete(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + key, storedObj := testPropogateStore(t, store, ctx, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}) + + tests := []struct { + key string + expectedObj *api.Pod + expectNotFoundErr bool + }{{ // test unconditional delete on existing key + key: key, + expectedObj: storedObj, + expectNotFoundErr: false, + }, { // test unconditional delete on non-existing key + key: "/non-existing", + expectedObj: nil, + expectNotFoundErr: true, + }} + + for i, tt := range tests { + out := &api.Pod{} // reset + err := store.Delete(ctx, tt.key, out, nil) + if tt.expectNotFoundErr { + if err == nil || !storage.IsNotFound(err) { + t.Errorf("#%d: expecting not found error, but get: %s", i, err) + } + continue + } + if err != nil { + t.Fatalf("Delete failed: %v", err) + } + if !reflect.DeepEqual(tt.expectedObj, out) { + t.Errorf("#%d: pod want=%#v, get=%#v", i, tt.expectedObj, out) + } + } +} + +func TestConditionalDelete(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + key, storedObj := testPropogateStore(t, store, ctx, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", UID: "A"}}) + + tests := []struct { + precondition *storage.Preconditions + expectInvalidObjErr bool + }{{ // test conditional delete with UID match + precondition: storage.NewUIDPreconditions("A"), + expectInvalidObjErr: false, + }, { // test conditional delete with UID mismatch + precondition: storage.NewUIDPreconditions("B"), + expectInvalidObjErr: true, + }} + + for i, tt := range tests { + out := &api.Pod{} + err := store.Delete(ctx, key, out, tt.precondition) + if tt.expectInvalidObjErr { + if err == nil || !storage.IsInvalidObj(err) { + t.Errorf("#%d: expecting invalid UID error, but get: %s", i, err) + } + continue + } + if err != nil { + t.Fatalf("Delete failed: %v", err) + } + if !reflect.DeepEqual(storedObj, out) { + t.Errorf("#%d: pod want=%#v, get=%#v", i, storedObj, out) + } + key, storedObj = testPropogateStore(t, store, ctx, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", UID: "A"}}) + } +} + +func TestGetToList(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + key, storedObj := testPropogateStore(t, store, ctx, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}) + + tests := []struct { + key string + filter storage.FilterFunc + expectedOut []*api.Pod + }{{ // test GetToList on existing key + key: key, + filter: storage.Everything, + expectedOut: []*api.Pod{storedObj}, + }, { // test GetToList on non-existing key + key: "/non-existing", + filter: storage.Everything, + expectedOut: nil, + }, { // test GetToList with filter to reject the pod + key: "/non-existing", + filter: func(obj runtime.Object) bool { + pod, ok := obj.(*api.Pod) + if !ok { + t.Fatal("It should be able to convert obj to *api.Pod") + } + return pod.Name != storedObj.Name + }, + expectedOut: nil, + }} + + for i, tt := range tests { + out := &api.PodList{} + err := store.GetToList(ctx, tt.key, tt.filter, out) + if err != nil { + t.Fatalf("GetToList failed: %v", err) + } + if len(out.Items) != len(tt.expectedOut) { + t.Errorf("#%d: length of list want=%d, get=%d", i, len(tt.expectedOut), len(out.Items)) + continue + } + for j, wantPod := range tt.expectedOut { + getPod := &out.Items[j] + if !reflect.DeepEqual(wantPod, getPod) { + t.Errorf("#%d: pod want=%#v, get=%#v", i, wantPod, getPod) + } + } + } +} + +func TestGuaranteedUpdate(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + key, storeObj := testPropogateStore(t, store, ctx, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", UID: "A"}}) + + tests := []struct { + key string + name string + ignoreNotFound bool + precondition *storage.Preconditions + expectNotFoundErr bool + expectInvalidObjErr bool + expectNoUpdate bool + }{{ // GuaranteedUpdate on non-existing key with ignoreNotFound=false + key: "/non-existing", + ignoreNotFound: false, + precondition: nil, + expectNotFoundErr: true, + expectInvalidObjErr: false, + expectNoUpdate: false, + }, { // GuaranteedUpdate on non-existing key with ignoreNotFound=true + key: "/non-existing", + ignoreNotFound: true, + precondition: nil, + expectNotFoundErr: false, + expectInvalidObjErr: false, + expectNoUpdate: false, + }, { // GuaranteedUpdate on existing key + key: key, + ignoreNotFound: false, + precondition: nil, + expectNotFoundErr: false, + expectInvalidObjErr: false, + expectNoUpdate: false, + }, { // GuaranteedUpdate with same data + key: key, + ignoreNotFound: false, + precondition: nil, + expectNotFoundErr: false, + expectInvalidObjErr: false, + expectNoUpdate: true, + }, { // GuaranteedUpdate with UID match + key: key, + ignoreNotFound: false, + precondition: storage.NewUIDPreconditions("A"), + expectNotFoundErr: false, + expectInvalidObjErr: false, + expectNoUpdate: true, + }, { // GuaranteedUpdate with UID mismatch + key: key, + ignoreNotFound: false, + precondition: storage.NewUIDPreconditions("B"), + expectNotFoundErr: false, + expectInvalidObjErr: true, + expectNoUpdate: true, + }} + + for i, tt := range tests { + out := &api.Pod{} + name := fmt.Sprintf("foo-%d", i) + if tt.expectNoUpdate { + name = storeObj.Name + } + version := storeObj.ResourceVersion + err := store.GuaranteedUpdate(ctx, tt.key, out, tt.ignoreNotFound, tt.precondition, + storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { + if tt.expectNotFoundErr && tt.ignoreNotFound { + if pod := obj.(*api.Pod); pod.Name != "" { + t.Errorf("#%d: expecting zero value, but get=%#v", i, pod) + } + } + pod := *storeObj + pod.Name = name + return &pod, nil + })) + + if tt.expectNotFoundErr { + if err == nil || !storage.IsNotFound(err) { + t.Errorf("#%d: expecting not found error, but get: %v", i, err) + } + continue + } + if tt.expectInvalidObjErr { + if err == nil || !storage.IsInvalidObj(err) { + t.Errorf("#%d: expecting invalid UID error, but get: %s", i, err) + } + continue + } + if err != nil { + t.Fatalf("GuaranteedUpdate failed: %v", err) + } + if out.ObjectMeta.Name != name { + t.Errorf("#%d: pod name want=%s, get=%s", i, name, out.ObjectMeta.Name) + } + switch tt.expectNoUpdate { + case true: + if version != out.ResourceVersion { + t.Errorf("#%d: expect no version change, before=%s, after=%s", i, version, out.ResourceVersion) + } + case false: + if version == out.ResourceVersion { + t.Errorf("#%d: expect version change, but get the same version=%s", i, version) + } + } + storeObj = out + } +} + +func TestGuaranteedUpdateWithConflict(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + key, _ := testPropogateStore(t, store, ctx, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}) + + errChan := make(chan error, 1) + var firstToFinish sync.WaitGroup + var secondToEnter sync.WaitGroup + firstToFinish.Add(1) + secondToEnter.Add(1) + + go func() { + err := store.GuaranteedUpdate(ctx, key, &api.Pod{}, false, nil, + storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { + pod := obj.(*api.Pod) + pod.Name = "foo-1" + secondToEnter.Wait() + return pod, nil + })) + firstToFinish.Done() + errChan <- err + }() + + updateCount := 0 + err := store.GuaranteedUpdate(ctx, key, &api.Pod{}, false, nil, + storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { + if updateCount == 0 { + secondToEnter.Done() + firstToFinish.Wait() + } + updateCount++ + pod := obj.(*api.Pod) + pod.Name = "foo-2" + return pod, nil + })) + if err != nil { + t.Fatalf("Second GuaranteedUpdate error %#v", err) + } + if err := <-errChan; err != nil { + t.Fatalf("First GuaranteedUpdate error %#v", err) + } + + if updateCount != 2 { + t.Errorf("Should have conflict and called update func twice") + } +} + +func TestList(t *testing.T) { + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + store := newStore(cluster.RandClient(), testapi.Default.Codec(), "") + ctx := context.Background() + + // Setup storage with the following structure: + // / + // - one-level/ + // | - test + // | + // - two-level/ + // - 1/ + // | - test + // | + // - 2/ + // - test + preset := []struct { + key string + obj *api.Pod + storedObj *api.Pod + }{{ + key: "/one-level/test", + obj: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}, + }, { + key: "/two-level/1/test", + obj: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}, + }, { + key: "/two-level/2/test", + obj: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}}, + }} + + for i, ps := range preset { + preset[i].storedObj = &api.Pod{} + err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + } + + tests := []struct { + prefix string + filter storage.FilterFunc + expectedOut []*api.Pod + }{{ // test List on existing key + prefix: "/one-level/", + filter: storage.Everything, + expectedOut: []*api.Pod{preset[0].storedObj}, + }, { // test List on non-existing key + prefix: "/non-existing/", + filter: storage.Everything, + expectedOut: nil, + }, { // test List with filter + prefix: "/one-level/", + filter: func(obj runtime.Object) bool { + pod, ok := obj.(*api.Pod) + if !ok { + t.Fatal("It should be able to convert obj to *api.Pod") + } + return pod.Name != preset[0].storedObj.Name + }, + expectedOut: nil, + }, { // test List with multiple levels of directories and expect flattened result + prefix: "/two-level/", + filter: storage.Everything, + expectedOut: []*api.Pod{preset[1].storedObj, preset[2].storedObj}, + }} + + for i, tt := range tests { + out := &api.PodList{} + err := store.List(ctx, tt.prefix, "0", tt.filter, out) + if err != nil { + t.Fatalf("List failed: %v", err) + } + if len(tt.expectedOut) != len(out.Items) { + t.Errorf("#%d: length of list want=%d, get=%d", i, len(tt.expectedOut), len(out.Items)) + continue + } + for j, wantPod := range tt.expectedOut { + getPod := &out.Items[j] + if !reflect.DeepEqual(wantPod, getPod) { + t.Errorf("#%d: pod want=%#v, get=%#v", i, wantPod, getPod) + } + } + } +} + +func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) { + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + store := newStore(cluster.RandClient(), testapi.Default.Codec(), "") + ctx := context.Background() + return ctx, store, cluster +} + +// testPropogateStore helps propogates store with objects, automates key generation, and returns +// keys and stored objects. +func testPropogateStore(t *testing.T, store *store, ctx context.Context, obj *api.Pod) (string, *api.Pod) { + // Setup store with a key and grab the output for returning. + key := "/testkey" + setOutput := &api.Pod{} + err := store.Create(ctx, key, obj, setOutput, 0) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + return key, setOutput +} diff --git a/pkg/storage/interfaces.go b/pkg/storage/interfaces.go index 7ac812bd451..4a76e405229 100644 --- a/pkg/storage/interfaces.go +++ b/pkg/storage/interfaces.go @@ -104,6 +104,7 @@ type Interface interface { Set(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error // Delete removes the specified key and returns the value that existed at that spot. + // If key didn't exist, it will return NotFound storage error. Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions) error // Watch begins watching the specified key. Events are decoded into API objects, @@ -137,9 +138,13 @@ type Interface interface { // GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType') // retrying the update until success if there is index conflict. - // Note that object passed to tryUpdate may change acress incovations of tryUpdate() if - // other writers are simultaneously updateing it, to tryUpdate() needs to take into account + // Note that object passed to tryUpdate may change across invocations of tryUpdate() if + // other writers are simultaneously updating it, to tryUpdate() needs to take into account // the current contents of the object when deciding how the update object should look. + // If the key doesn't exist, it will return NotFound storage error if ignoreNotFound=false + // or zero value in 'ptrToType' parameter otherwise. + // If the object to update has the same value as previous, it won't do any update + // but will return the object in 'ptrToType' parameter. // // Example: //