Allow values to be wrapped prior to serialization in etcd3

This commit is contained in:
Clayton Coleman 2017-01-31 19:27:41 -05:00
parent 36df93826a
commit 4313bc6df3
No known key found for this signature in database
GPG Key ID: 3D16906B4F1C5CB3
7 changed files with 213 additions and 42 deletions

View File

@ -33,21 +33,40 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
utiltrace "k8s.io/apiserver/pkg/util/trace"
"k8s.io/apiserver/pkg/storage/etcd"
)
// ValueTransformer allows a string value to be transformed before being read from or written to the underlying store. The methods
// must be able to undo the transformation caused by the other.
type ValueTransformer interface {
// TransformStringFromStorage may transform the provided string from its underlying storage representation or return an error.
TransformFromStorage([]byte) ([]byte, error)
// TransformStringToStorage may transform the provided string into the appropriate form in storage or return an error.
TransformToStorage([]byte) ([]byte, error)
}
type identityTransformer struct{}
func (identityTransformer) TransformFromStorage(b []byte) ([]byte, error) { return b, nil }
func (identityTransformer) TransformToStorage(b []byte) ([]byte, error) { return b, nil }
// IdentityTransformer performs no transformation on the provided values.
var IdentityTransformer ValueTransformer = identityTransformer{}
type store struct {
client *clientv3.Client
// getOpts contains additional options that should be passed
// to all Get() calls.
getOps []clientv3.OpOption
codec runtime.Codec
versioner storage.Versioner
pathPrefix string
watcher *watcher
getOps []clientv3.OpOption
codec runtime.Codec
versioner storage.Versioner
transformer ValueTransformer
pathPrefix string
watcher *watcher
}
type elemForDecode struct {
@ -63,24 +82,25 @@ type objState struct {
}
// New returns an etcd3 implementation of storage.Interface.
func New(c *clientv3.Client, codec runtime.Codec, prefix string) storage.Interface {
return newStore(c, true, codec, prefix)
func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer ValueTransformer) storage.Interface {
return newStore(c, true, codec, prefix, transformer)
}
// NewWithNoQuorumRead returns etcd3 implementation of storage.Interface
// where Get operations don't require quorum read.
func NewWithNoQuorumRead(c *clientv3.Client, codec runtime.Codec, prefix string) storage.Interface {
return newStore(c, false, codec, prefix)
func NewWithNoQuorumRead(c *clientv3.Client, codec runtime.Codec, prefix string, transformer ValueTransformer) storage.Interface {
return newStore(c, false, codec, prefix, transformer)
}
func newStore(c *clientv3.Client, quorumRead bool, codec runtime.Codec, prefix string) *store {
func newStore(c *clientv3.Client, quorumRead bool, codec runtime.Codec, prefix string, transformer ValueTransformer) *store {
versioner := etcd.APIObjectVersioner{}
result := &store{
client: c,
versioner: versioner,
codec: codec,
pathPrefix: prefix,
watcher: newWatcher(c, codec, versioner),
client: c,
codec: codec,
versioner: versioner,
transformer: transformer,
pathPrefix: prefix,
watcher: newWatcher(c, codec, versioner, transformer),
}
if !quorumRead {
// In case of non-quorum reads, we can set WithSerializable()
@ -110,7 +130,13 @@ func (s *store) Get(ctx context.Context, key string, resourceVersion string, out
return storage.NewKeyNotFoundError(key, 0)
}
kv := getResp.Kvs[0]
return decode(s.codec, s.versioner, kv.Value, out, kv.ModRevision)
data, err := s.transformer.TransformFromStorage(kv.Value)
if err != nil {
return storage.NewInternalError(err.Error())
}
return decode(s.codec, s.versioner, data, out, kv.ModRevision)
}
// Create implements storage.Interface.Create.
@ -129,10 +155,15 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
return err
}
newData, err := s.transformer.TransformToStorage(data)
if err != nil {
return storage.NewInternalError(err.Error())
}
txnResp, err := s.client.KV.Txn(ctx).If(
notFound(key),
).Then(
clientv3.OpPut(key, string(data), opts...),
clientv3.OpPut(key, string(newData), opts...),
).Commit()
if err != nil {
return err
@ -177,7 +208,11 @@ func (s *store) unconditionalDelete(ctx context.Context, key string, out runtime
}
kv := getResp.Kvs[0]
return decode(s.codec, s.versioner, kv.Value, out, kv.ModRevision)
data, err := s.transformer.TransformFromStorage(kv.Value)
if err != nil {
return storage.NewInternalError(err.Error())
}
return decode(s.codec, s.versioner, data, out, kv.ModRevision)
}
func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, precondtions *storage.Preconditions) error {
@ -261,6 +296,11 @@ func (s *store) GuaranteedUpdate(
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
}
newData, err := s.transformer.TransformToStorage(data)
if err != nil {
return storage.NewInternalError(err.Error())
}
opts, err := s.ttlOpts(ctx, int64(ttl))
if err != nil {
return err
@ -270,7 +310,7 @@ func (s *store) GuaranteedUpdate(
txnResp, err := s.client.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
).Then(
clientv3.OpPut(key, string(data), opts...),
clientv3.OpPut(key, string(newData), opts...),
).Else(
clientv3.OpGet(key),
).Commit()
@ -289,6 +329,7 @@ func (s *store) GuaranteedUpdate(
continue
}
putResp := txnResp.Responses[0].GetResponsePut()
return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
}
}
@ -308,8 +349,12 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin
if len(getResp.Kvs) == 0 {
return nil
}
data, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value)
if err != nil {
return storage.NewInternalError(err.Error())
}
elems := []*elemForDecode{{
data: getResp.Kvs[0].Value,
data: data,
rev: uint64(getResp.Kvs[0].ModRevision),
}}
if err := decodeList(elems, storage.SimpleFilter(pred), listPtr, s.codec, s.versioner); err != nil {
@ -337,12 +382,18 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
return err
}
elems := make([]*elemForDecode, len(getResp.Kvs))
for i, kv := range getResp.Kvs {
elems[i] = &elemForDecode{
data: kv.Value,
rev: uint64(kv.ModRevision),
elems := make([]*elemForDecode, 0, len(getResp.Kvs))
for _, kv := range getResp.Kvs {
data, err := s.transformer.TransformFromStorage(kv.Value)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", key, err))
continue
}
elems = append(elems, &elemForDecode{
data: data,
rev: uint64(kv.ModRevision),
})
}
if err := decodeList(elems, storage.SimpleFilter(pred), listPtr, s.codec, s.versioner); err != nil {
return err
@ -383,9 +434,13 @@ func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Va
return nil, err
}
} else {
data, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value)
if err != nil {
return nil, storage.NewInternalError(err.Error())
}
state.rev = getResp.Kvs[0].ModRevision
state.meta.ResourceVersion = uint64(state.rev)
state.data = getResp.Kvs[0].Value
state.data = data
if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil {
return nil, err
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package etcd3
import (
"bytes"
"fmt"
"reflect"
"sync"
@ -28,10 +29,12 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage"
storagetests "k8s.io/apiserver/pkg/storage/tests"
"github.com/coreos/etcd/integration"
"golang.org/x/net/context"
@ -46,6 +49,25 @@ func init() {
examplev1.AddToScheme(scheme)
}
// prefixTransformer adds and verifies that all data has the correct prefix on its way in and out.
type prefixTransformer struct {
prefix []byte
err error
}
func (p prefixTransformer) TransformFromStorage(b []byte) ([]byte, error) {
if !bytes.HasPrefix(b, p.prefix) {
return nil, fmt.Errorf("value does not have expected prefix: %s", string(b))
}
return bytes.TrimPrefix(b, p.prefix), p.err
}
func (p prefixTransformer) TransformToStorage(b []byte) ([]byte, error) {
if len(b) > 0 {
return append(append([]byte{}, p.prefix...), b...), p.err
}
return b, p.err
}
func TestCreate(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
@ -460,11 +482,91 @@ func TestGuaranteedUpdateWithConflict(t *testing.T) {
}
}
func TestTransformationFailure(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")})
ctx := context.Background()
preset := []struct {
key string
obj *example.Pod
storedObj *example.Pod
}{{
key: "/one-level/test",
obj: &example.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "bar"},
Spec: storagetests.DeepEqualSafePodSpec(),
},
}, {
key: "/two-level/1/test",
obj: &example.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "baz"},
Spec: storagetests.DeepEqualSafePodSpec(),
},
}}
for i, ps := range preset[:1] {
preset[i].storedObj = &example.Pod{}
err := store.Create(ctx, ps.key, ps.obj, preset[:1][i].storedObj, 0)
if err != nil {
t.Fatalf("Set failed: %v", err)
}
}
// create a second resource with an invalid prefix
oldTransformer := store.transformer
store.transformer = prefixTransformer{prefix: []byte("otherprefix!")}
for i, ps := range preset[1:] {
preset[1:][i].storedObj = &example.Pod{}
err := store.Create(ctx, ps.key, ps.obj, preset[1:][i].storedObj, 0)
if err != nil {
t.Fatalf("Set failed: %v", err)
}
}
store.transformer = oldTransformer
// only the first item is returned, and no error
var got example.PodList
if err := store.List(ctx, "/", "", storage.Everything, &got); err != nil {
t.Errorf("Unexpected error %v", err)
}
if e, a := []example.Pod{*preset[0].storedObj}, got.Items; !reflect.DeepEqual(e, a) {
t.Errorf("Unexpected: %s", diff.ObjectReflectDiff(e, a))
}
// Get should fail
if err := store.Get(ctx, preset[1].key, "", &example.Pod{}, false); !storage.IsInternalError(err) {
t.Errorf("Unexpected error: %v", err)
}
// GuaranteedUpdate without suggestion should return an error
if err := store.GuaranteedUpdate(ctx, preset[1].key, &example.Pod{}, false, nil, func(input runtime.Object, res storage.ResponseMeta) (output runtime.Object, ttl *uint64, err error) {
return input, nil, nil
}); !storage.IsInternalError(err) {
t.Errorf("Unexpected error: %v", err)
}
// GuaranteedUpdate with suggestion should not return an error if we don't change the object
if err := store.GuaranteedUpdate(ctx, preset[1].key, &example.Pod{}, false, nil, func(input runtime.Object, res storage.ResponseMeta) (output runtime.Object, ttl *uint64, err error) {
return input, nil, nil
}, preset[1].obj); err != nil {
t.Errorf("Unexpected error: %v", err)
}
// Delete succeeds but reports an error because we cannot access the body
if err := store.Delete(ctx, preset[1].key, &example.Pod{}, nil); !storage.IsInternalError(err) {
t.Errorf("Unexpected error: %v", err)
}
if err := store.Get(ctx, preset[1].key, "", &example.Pod{}, false); !storage.IsNotFound(err) {
t.Errorf("Unexpected error: %v", err)
}
}
func TestList(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), false, codec, "")
store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")})
ctx := context.Background()
// Setup storage with the following structure:
@ -552,7 +654,7 @@ func TestList(t *testing.T) {
func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
store := newStore(cluster.RandClient(), false, codec, "")
store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")})
ctx := context.Background()
return ctx, store, cluster
}

View File

@ -40,9 +40,10 @@ const (
)
type watcher struct {
client *clientv3.Client
codec runtime.Codec
versioner storage.Versioner
client *clientv3.Client
codec runtime.Codec
versioner storage.Versioner
transformer ValueTransformer
}
// watchChan implements watch.Interface.
@ -59,11 +60,12 @@ type watchChan struct {
errChan chan error
}
func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.Versioner) *watcher {
func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.Versioner, transformer ValueTransformer) *watcher {
return &watcher{
client: client,
codec: codec,
versioner: versioner,
client: client,
codec: codec,
versioner: versioner,
transformer: transformer,
}
}
@ -341,7 +343,11 @@ func (wc *watchChan) sendEvent(e *event) {
func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtime.Object, err error) {
if !e.isDeleted {
curObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, e.value, e.rev)
data, err := wc.watcher.transformer.TransformFromStorage(e.value)
if err != nil {
return nil, nil, err
}
curObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, data, e.rev)
if err != nil {
return nil, nil, err
}
@ -352,9 +358,13 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim
// we need the object only to compute whether it was filtered out
// before).
if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) {
data, err := wc.watcher.transformer.TransformFromStorage(e.prevValue)
if err != nil {
return nil, nil, err
}
// Note that this sends the *old* object with the etcd revision for the time at
// which it gets deleted.
oldObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, e.prevValue, e.rev)
oldObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, data, e.rev)
if err != nil {
return nil, nil, err
}

View File

@ -227,13 +227,13 @@ func TestWatchError(t *testing.T) {
codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)}
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
invalidStore := newStore(cluster.RandClient(), false, codec, "")
invalidStore := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")})
ctx := context.Background()
w, err := invalidStore.Watch(ctx, "/abc", "0", storage.Everything)
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
validStore := newStore(cluster.RandClient(), false, codec, "")
validStore := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")})
validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate(
func(runtime.Object) (runtime.Object, error) {
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil

View File

@ -56,7 +56,7 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
client.Close()
}
if c.Quorum {
return etcd3.New(client, c.Codec, c.Prefix), destroyFunc, nil
return etcd3.New(client, c.Codec, c.Prefix, etcd3.IdentityTransformer), destroyFunc, nil
}
return etcd3.NewWithNoQuorumRead(client, c.Codec, c.Prefix), destroyFunc, nil
return etcd3.NewWithNoQuorumRead(client, c.Codec, c.Prefix, etcd3.IdentityTransformer), destroyFunc, nil
}

View File

@ -93,7 +93,7 @@ func AddObjectMetaFieldsSet(source fields.Set, objectMeta *metav1.ObjectMeta, ha
func newEtcdTestStorage(t *testing.T, prefix string) (*etcdtesting.EtcdTestServer, storage.Interface) {
server, _ := etcdtesting.NewUnsecuredEtcd3TestClientServer(t, scheme)
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), prefix)
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), prefix, etcd3.IdentityTransformer)
return server, storage
}

4
vendor/BUILD vendored
View File

@ -14920,6 +14920,7 @@ go_test(
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
"//vendor:k8s.io/apimachinery/pkg/runtime/serializer",
"//vendor:k8s.io/apimachinery/pkg/util/diff",
"//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/apiserver/pkg/apis/example",
"//vendor:k8s.io/apiserver/pkg/apis/example/v1",
@ -15038,11 +15039,13 @@ go_test(
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
"//vendor:k8s.io/apimachinery/pkg/runtime/serializer",
"//vendor:k8s.io/apimachinery/pkg/util/diff",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/apiserver/pkg/apis/example",
"//vendor:k8s.io/apiserver/pkg/apis/example/v1",
"//vendor:k8s.io/apiserver/pkg/storage",
"//vendor:k8s.io/apiserver/pkg/storage/tests",
],
)
@ -15065,6 +15068,7 @@ go_library(
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/conversion",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/apiserver/pkg/storage",
"//vendor:k8s.io/apiserver/pkg/storage/etcd",