From 4313bc6df3c8a0fa10f593c3686e078435de2202 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Tue, 31 Jan 2017 19:27:41 -0500 Subject: [PATCH] Allow values to be wrapped prior to serialization in etcd3 --- .../apiserver/pkg/storage/etcd3/store.go | 107 +++++++++++++----- .../apiserver/pkg/storage/etcd3/store_test.go | 106 ++++++++++++++++- .../apiserver/pkg/storage/etcd3/watcher.go | 28 +++-- .../pkg/storage/etcd3/watcher_test.go | 4 +- .../storage/storagebackend/factory/etcd3.go | 4 +- .../pkg/storage/tests/cacher_test.go | 2 +- vendor/BUILD | 4 + 7 files changed, 213 insertions(+), 42 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index e55c646b0b5..7318f355798 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -33,21 +33,40 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/storage" utiltrace "k8s.io/apiserver/pkg/util/trace" "k8s.io/apiserver/pkg/storage/etcd" ) +// ValueTransformer allows a string value to be transformed before being read from or written to the underlying store. The methods +// must be able to undo the transformation caused by the other. +type ValueTransformer interface { + // TransformStringFromStorage may transform the provided string from its underlying storage representation or return an error. + TransformFromStorage([]byte) ([]byte, error) + // TransformStringToStorage may transform the provided string into the appropriate form in storage or return an error. + TransformToStorage([]byte) ([]byte, error) +} + +type identityTransformer struct{} + +func (identityTransformer) TransformFromStorage(b []byte) ([]byte, error) { return b, nil } +func (identityTransformer) TransformToStorage(b []byte) ([]byte, error) { return b, nil } + +// IdentityTransformer performs no transformation on the provided values. +var IdentityTransformer ValueTransformer = identityTransformer{} + type store struct { client *clientv3.Client // getOpts contains additional options that should be passed // to all Get() calls. - getOps []clientv3.OpOption - codec runtime.Codec - versioner storage.Versioner - pathPrefix string - watcher *watcher + getOps []clientv3.OpOption + codec runtime.Codec + versioner storage.Versioner + transformer ValueTransformer + pathPrefix string + watcher *watcher } type elemForDecode struct { @@ -63,24 +82,25 @@ type objState struct { } // New returns an etcd3 implementation of storage.Interface. -func New(c *clientv3.Client, codec runtime.Codec, prefix string) storage.Interface { - return newStore(c, true, codec, prefix) +func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer ValueTransformer) storage.Interface { + return newStore(c, true, codec, prefix, transformer) } // NewWithNoQuorumRead returns etcd3 implementation of storage.Interface // where Get operations don't require quorum read. -func NewWithNoQuorumRead(c *clientv3.Client, codec runtime.Codec, prefix string) storage.Interface { - return newStore(c, false, codec, prefix) +func NewWithNoQuorumRead(c *clientv3.Client, codec runtime.Codec, prefix string, transformer ValueTransformer) storage.Interface { + return newStore(c, false, codec, prefix, transformer) } -func newStore(c *clientv3.Client, quorumRead bool, codec runtime.Codec, prefix string) *store { +func newStore(c *clientv3.Client, quorumRead bool, codec runtime.Codec, prefix string, transformer ValueTransformer) *store { versioner := etcd.APIObjectVersioner{} result := &store{ - client: c, - versioner: versioner, - codec: codec, - pathPrefix: prefix, - watcher: newWatcher(c, codec, versioner), + client: c, + codec: codec, + versioner: versioner, + transformer: transformer, + pathPrefix: prefix, + watcher: newWatcher(c, codec, versioner, transformer), } if !quorumRead { // In case of non-quorum reads, we can set WithSerializable() @@ -110,7 +130,13 @@ func (s *store) Get(ctx context.Context, key string, resourceVersion string, out return storage.NewKeyNotFoundError(key, 0) } kv := getResp.Kvs[0] - return decode(s.codec, s.versioner, kv.Value, out, kv.ModRevision) + + data, err := s.transformer.TransformFromStorage(kv.Value) + if err != nil { + return storage.NewInternalError(err.Error()) + } + + return decode(s.codec, s.versioner, data, out, kv.ModRevision) } // Create implements storage.Interface.Create. @@ -129,10 +155,15 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, return err } + newData, err := s.transformer.TransformToStorage(data) + if err != nil { + return storage.NewInternalError(err.Error()) + } + txnResp, err := s.client.KV.Txn(ctx).If( notFound(key), ).Then( - clientv3.OpPut(key, string(data), opts...), + clientv3.OpPut(key, string(newData), opts...), ).Commit() if err != nil { return err @@ -177,7 +208,11 @@ func (s *store) unconditionalDelete(ctx context.Context, key string, out runtime } kv := getResp.Kvs[0] - return decode(s.codec, s.versioner, kv.Value, out, kv.ModRevision) + data, err := s.transformer.TransformFromStorage(kv.Value) + if err != nil { + return storage.NewInternalError(err.Error()) + } + return decode(s.codec, s.versioner, data, out, kv.ModRevision) } func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, precondtions *storage.Preconditions) error { @@ -261,6 +296,11 @@ func (s *store) GuaranteedUpdate( return decode(s.codec, s.versioner, origState.data, out, origState.rev) } + newData, err := s.transformer.TransformToStorage(data) + if err != nil { + return storage.NewInternalError(err.Error()) + } + opts, err := s.ttlOpts(ctx, int64(ttl)) if err != nil { return err @@ -270,7 +310,7 @@ func (s *store) GuaranteedUpdate( txnResp, err := s.client.KV.Txn(ctx).If( clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev), ).Then( - clientv3.OpPut(key, string(data), opts...), + clientv3.OpPut(key, string(newData), opts...), ).Else( clientv3.OpGet(key), ).Commit() @@ -289,6 +329,7 @@ func (s *store) GuaranteedUpdate( continue } putResp := txnResp.Responses[0].GetResponsePut() + return decode(s.codec, s.versioner, data, out, putResp.Header.Revision) } } @@ -308,8 +349,12 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin if len(getResp.Kvs) == 0 { return nil } + data, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value) + if err != nil { + return storage.NewInternalError(err.Error()) + } elems := []*elemForDecode{{ - data: getResp.Kvs[0].Value, + data: data, rev: uint64(getResp.Kvs[0].ModRevision), }} if err := decodeList(elems, storage.SimpleFilter(pred), listPtr, s.codec, s.versioner); err != nil { @@ -337,12 +382,18 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor return err } - elems := make([]*elemForDecode, len(getResp.Kvs)) - for i, kv := range getResp.Kvs { - elems[i] = &elemForDecode{ - data: kv.Value, - rev: uint64(kv.ModRevision), + elems := make([]*elemForDecode, 0, len(getResp.Kvs)) + for _, kv := range getResp.Kvs { + data, err := s.transformer.TransformFromStorage(kv.Value) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", key, err)) + continue } + + elems = append(elems, &elemForDecode{ + data: data, + rev: uint64(kv.ModRevision), + }) } if err := decodeList(elems, storage.SimpleFilter(pred), listPtr, s.codec, s.versioner); err != nil { return err @@ -383,9 +434,13 @@ func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Va return nil, err } } else { + data, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value) + if err != nil { + return nil, storage.NewInternalError(err.Error()) + } state.rev = getResp.Kvs[0].ModRevision state.meta.ResourceVersion = uint64(state.rev) - state.data = getResp.Kvs[0].Value + state.data = data if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil { return nil, err } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index ce379a33898..9c88d9ab6e7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -17,6 +17,7 @@ limitations under the License. package etcd3 import ( + "bytes" "fmt" "reflect" "sync" @@ -28,10 +29,12 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/apis/example" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/storage" + storagetests "k8s.io/apiserver/pkg/storage/tests" "github.com/coreos/etcd/integration" "golang.org/x/net/context" @@ -46,6 +49,25 @@ func init() { examplev1.AddToScheme(scheme) } +// prefixTransformer adds and verifies that all data has the correct prefix on its way in and out. +type prefixTransformer struct { + prefix []byte + err error +} + +func (p prefixTransformer) TransformFromStorage(b []byte) ([]byte, error) { + if !bytes.HasPrefix(b, p.prefix) { + return nil, fmt.Errorf("value does not have expected prefix: %s", string(b)) + } + return bytes.TrimPrefix(b, p.prefix), p.err +} +func (p prefixTransformer) TransformToStorage(b []byte) ([]byte, error) { + if len(b) > 0 { + return append(append([]byte{}, p.prefix...), b...), p.err + } + return b, p.err +} + func TestCreate(t *testing.T) { ctx, store, cluster := testSetup(t) defer cluster.Terminate(t) @@ -460,11 +482,91 @@ func TestGuaranteedUpdateWithConflict(t *testing.T) { } } +func TestTransformationFailure(t *testing.T) { + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")}) + ctx := context.Background() + + preset := []struct { + key string + obj *example.Pod + storedObj *example.Pod + }{{ + key: "/one-level/test", + obj: &example.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "bar"}, + Spec: storagetests.DeepEqualSafePodSpec(), + }, + }, { + key: "/two-level/1/test", + obj: &example.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "baz"}, + Spec: storagetests.DeepEqualSafePodSpec(), + }, + }} + for i, ps := range preset[:1] { + preset[i].storedObj = &example.Pod{} + err := store.Create(ctx, ps.key, ps.obj, preset[:1][i].storedObj, 0) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + } + + // create a second resource with an invalid prefix + oldTransformer := store.transformer + store.transformer = prefixTransformer{prefix: []byte("otherprefix!")} + for i, ps := range preset[1:] { + preset[1:][i].storedObj = &example.Pod{} + err := store.Create(ctx, ps.key, ps.obj, preset[1:][i].storedObj, 0) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + } + store.transformer = oldTransformer + + // only the first item is returned, and no error + var got example.PodList + if err := store.List(ctx, "/", "", storage.Everything, &got); err != nil { + t.Errorf("Unexpected error %v", err) + } + if e, a := []example.Pod{*preset[0].storedObj}, got.Items; !reflect.DeepEqual(e, a) { + t.Errorf("Unexpected: %s", diff.ObjectReflectDiff(e, a)) + } + + // Get should fail + if err := store.Get(ctx, preset[1].key, "", &example.Pod{}, false); !storage.IsInternalError(err) { + t.Errorf("Unexpected error: %v", err) + } + // GuaranteedUpdate without suggestion should return an error + if err := store.GuaranteedUpdate(ctx, preset[1].key, &example.Pod{}, false, nil, func(input runtime.Object, res storage.ResponseMeta) (output runtime.Object, ttl *uint64, err error) { + return input, nil, nil + }); !storage.IsInternalError(err) { + t.Errorf("Unexpected error: %v", err) + } + // GuaranteedUpdate with suggestion should not return an error if we don't change the object + if err := store.GuaranteedUpdate(ctx, preset[1].key, &example.Pod{}, false, nil, func(input runtime.Object, res storage.ResponseMeta) (output runtime.Object, ttl *uint64, err error) { + return input, nil, nil + }, preset[1].obj); err != nil { + t.Errorf("Unexpected error: %v", err) + } + + // Delete succeeds but reports an error because we cannot access the body + if err := store.Delete(ctx, preset[1].key, &example.Pod{}, nil); !storage.IsInternalError(err) { + t.Errorf("Unexpected error: %v", err) + } + + if err := store.Get(ctx, preset[1].key, "", &example.Pod{}, false); !storage.IsNotFound(err) { + t.Errorf("Unexpected error: %v", err) + } +} + func TestList(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), false, codec, "") + store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")}) ctx := context.Background() // Setup storage with the following structure: @@ -552,7 +654,7 @@ func TestList(t *testing.T) { func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - store := newStore(cluster.RandClient(), false, codec, "") + store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")}) ctx := context.Background() return ctx, store, cluster } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go index e75d4b98148..05c6a8bd1db 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go @@ -40,9 +40,10 @@ const ( ) type watcher struct { - client *clientv3.Client - codec runtime.Codec - versioner storage.Versioner + client *clientv3.Client + codec runtime.Codec + versioner storage.Versioner + transformer ValueTransformer } // watchChan implements watch.Interface. @@ -59,11 +60,12 @@ type watchChan struct { errChan chan error } -func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.Versioner) *watcher { +func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.Versioner, transformer ValueTransformer) *watcher { return &watcher{ - client: client, - codec: codec, - versioner: versioner, + client: client, + codec: codec, + versioner: versioner, + transformer: transformer, } } @@ -341,7 +343,11 @@ func (wc *watchChan) sendEvent(e *event) { func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtime.Object, err error) { if !e.isDeleted { - curObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, e.value, e.rev) + data, err := wc.watcher.transformer.TransformFromStorage(e.value) + if err != nil { + return nil, nil, err + } + curObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, data, e.rev) if err != nil { return nil, nil, err } @@ -352,9 +358,13 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim // we need the object only to compute whether it was filtered out // before). if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) { + data, err := wc.watcher.transformer.TransformFromStorage(e.prevValue) + if err != nil { + return nil, nil, err + } // Note that this sends the *old* object with the etcd revision for the time at // which it gets deleted. - oldObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, e.prevValue, e.rev) + oldObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, data, e.rev) if err != nil { return nil, nil, err } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 753c546c373..00c31735a91 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -227,13 +227,13 @@ func TestWatchError(t *testing.T) { codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)} cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - invalidStore := newStore(cluster.RandClient(), false, codec, "") + invalidStore := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")}) ctx := context.Background() w, err := invalidStore.Watch(ctx, "/abc", "0", storage.Everything) if err != nil { t.Fatalf("Watch failed: %v", err) } - validStore := newStore(cluster.RandClient(), false, codec, "") + validStore := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")}) validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate( func(runtime.Object) (runtime.Object, error) { return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go index fa457719ef4..48f2b760d23 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go @@ -56,7 +56,7 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e client.Close() } if c.Quorum { - return etcd3.New(client, c.Codec, c.Prefix), destroyFunc, nil + return etcd3.New(client, c.Codec, c.Prefix, etcd3.IdentityTransformer), destroyFunc, nil } - return etcd3.NewWithNoQuorumRead(client, c.Codec, c.Prefix), destroyFunc, nil + return etcd3.NewWithNoQuorumRead(client, c.Codec, c.Prefix, etcd3.IdentityTransformer), destroyFunc, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index d3690c42215..e180e47cd90 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -93,7 +93,7 @@ func AddObjectMetaFieldsSet(source fields.Set, objectMeta *metav1.ObjectMeta, ha func newEtcdTestStorage(t *testing.T, prefix string) (*etcdtesting.EtcdTestServer, storage.Interface) { server, _ := etcdtesting.NewUnsecuredEtcd3TestClientServer(t, scheme) - storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), prefix) + storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), prefix, etcd3.IdentityTransformer) return server, storage } diff --git a/vendor/BUILD b/vendor/BUILD index e795e3e6ff4..047e2196ef1 100644 --- a/vendor/BUILD +++ b/vendor/BUILD @@ -14920,6 +14920,7 @@ go_test( "//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/runtime/schema", "//vendor:k8s.io/apimachinery/pkg/runtime/serializer", + "//vendor:k8s.io/apimachinery/pkg/util/diff", "//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/apiserver/pkg/apis/example", "//vendor:k8s.io/apiserver/pkg/apis/example/v1", @@ -15038,11 +15039,13 @@ go_test( "//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/runtime/schema", "//vendor:k8s.io/apimachinery/pkg/runtime/serializer", + "//vendor:k8s.io/apimachinery/pkg/util/diff", "//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/apiserver/pkg/apis/example", "//vendor:k8s.io/apiserver/pkg/apis/example/v1", "//vendor:k8s.io/apiserver/pkg/storage", + "//vendor:k8s.io/apiserver/pkg/storage/tests", ], ) @@ -15065,6 +15068,7 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/conversion", "//vendor:k8s.io/apimachinery/pkg/runtime", + "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/apiserver/pkg/storage", "//vendor:k8s.io/apiserver/pkg/storage/etcd",