diff --git a/staging/src/k8s.io/apiserver/pkg/storage/errors/storage.go b/staging/src/k8s.io/apiserver/pkg/storage/errors/storage.go index 84f8d857727..fd3b35ed06f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/errors/storage.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/errors/storage.go @@ -30,6 +30,8 @@ func InterpretListError(err error, qualifiedResource schema.GroupResource) error return errors.NewNotFound(qualifiedResource, "") case storage.IsUnreachable(err): return errors.NewServerTimeout(qualifiedResource, "list", 2) // TODO: make configurable or handled at a higher level + case storage.IsInternalError(err): + return errors.NewInternalError(err) default: return err } @@ -43,6 +45,8 @@ func InterpretGetError(err error, qualifiedResource schema.GroupResource, name s return errors.NewNotFound(qualifiedResource, name) case storage.IsUnreachable(err): return errors.NewServerTimeout(qualifiedResource, "get", 2) // TODO: make configurable or handled at a higher level + case storage.IsInternalError(err): + return errors.NewInternalError(err) default: return err } @@ -56,6 +60,8 @@ func InterpretCreateError(err error, qualifiedResource schema.GroupResource, nam return errors.NewAlreadyExists(qualifiedResource, name) case storage.IsUnreachable(err): return errors.NewServerTimeout(qualifiedResource, "create", 2) // TODO: make configurable or handled at a higher level + case storage.IsInternalError(err): + return errors.NewInternalError(err) default: return err } @@ -102,6 +108,8 @@ func InterpretWatchError(err error, resource schema.GroupResource, name string) case storage.IsInvalidError(err): invalidError, _ := err.(storage.InvalidError) return errors.NewInvalid(schema.GroupKind{Group: resource.Group, Kind: resource.Resource}, name, invalidError.Errs) + case storage.IsInternalError(err): + return errors.NewInternalError(err) default: return err } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper.go index 13a54b38b42..7c6c675a073 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper.go @@ -31,23 +31,46 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/etcd/metrics" + etcdutil "k8s.io/apiserver/pkg/storage/etcd/util" utilcache "k8s.io/apiserver/pkg/util/cache" utiltrace "k8s.io/apiserver/pkg/util/trace" - etcdutil "k8s.io/apiserver/pkg/storage/etcd/util" ) +// ValueTransformer allows a string value to be transformed before being read from or written to the underlying store. The methods +// must be able to undo the transformation caused by the other. +type ValueTransformer interface { + // TransformStringFromStorage may transform the provided string from its underlying storage representation or return an error. + // Stale is true if the object on disk is stale and a write to etcd should be issued, even if the contents of the object + // have not changed. + TransformStringFromStorage(string) (value string, stale bool, err error) + // TransformStringToStorage may transform the provided string into the appropriate form in storage or return an error. + TransformStringToStorage(string) (value string, err error) +} + +type identityTransformer struct{} + +func (identityTransformer) TransformStringFromStorage(s string) (string, bool, error) { + return s, false, nil +} +func (identityTransformer) TransformStringToStorage(s string) (string, error) { return s, nil } + +// IdentityTransformer performs no transformation on the provided values. +var IdentityTransformer ValueTransformer = identityTransformer{} + // Creates a new storage interface from the client // TODO: deprecate in favor of storage.Config abstraction over time -func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int, copier runtime.ObjectCopier) storage.Interface { +func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int, copier runtime.ObjectCopier, transformer ValueTransformer) storage.Interface { return &etcdHelper{ etcdMembersAPI: etcd.NewMembersAPI(client), etcdKeysAPI: etcd.NewKeysAPI(client), codec: codec, versioner: APIObjectVersioner{}, copier: copier, + transformer: transformer, pathPrefix: path.Join("/", prefix), quorum: quorum, cache: utilcache.NewCache(cacheSize), @@ -60,6 +83,7 @@ type etcdHelper struct { etcdKeysAPI etcd.KeysAPI codec runtime.Codec copier runtime.ObjectCopier + transformer ValueTransformer // Note that versioner is required for etcdHelper to work correctly. // The public constructors (NewStorage & NewEtcdStorage) are setting it // correctly, so be careful when manipulating with it manually. @@ -112,7 +136,13 @@ func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Ob TTL: time.Duration(ttl) * time.Second, PrevExist: etcd.PrevNoExist, } - response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts) + + newBody, err := h.transformer.TransformStringToStorage(string(data)) + if err != nil { + return storage.NewInternalError(err.Error()) + } + + response, err := h.etcdKeysAPI.Set(ctx, key, newBody, &opts) trace.Step("Object created") metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime) if err != nil { @@ -122,7 +152,7 @@ func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Ob if _, err := conversion.EnforcePtr(out); err != nil { panic("unable to convert output object to pointer") } - _, _, err = h.extractObj(response, err, out, false, false) + _, _, _, err = h.extractObj(response, err, out, false, false) } return err } @@ -160,7 +190,7 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object, if !etcdutil.IsEtcdNotFound(err) { // if the object that existed prior to the delete is returned by etcd, update the out object. if err != nil || response.PrevNode != nil { - _, _, err = h.extractObj(response, err, out, false, true) + _, _, _, err = h.extractObj(response, err, out, false, true) } } return toStorageErr(err, key, 0) @@ -169,7 +199,7 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object, // Check the preconditions match. obj := reflect.New(v.Type()).Interface().(runtime.Object) for { - _, node, res, err := h.bodyAndExtractObj(ctx, key, obj, false) + _, node, res, _, err := h.bodyAndExtractObj(ctx, key, obj, false) if err != nil { return toStorageErr(err, key, 0) } @@ -186,17 +216,17 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object, startTime := time.Now() response, err := h.etcdKeysAPI.Delete(ctx, key, &opt) metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime) - if etcdutil.IsEtcdTestFailed(err) { - glog.Infof("deletion of %s failed because of a conflict, going to retry", key) - } else { + if !etcdutil.IsEtcdTestFailed(err) { if !etcdutil.IsEtcdNotFound(err) { // if the object that existed prior to the delete is returned by etcd, update the out object. if err != nil || response.PrevNode != nil { - _, _, err = h.extractObj(response, err, out, false, true) + _, _, _, err = h.extractObj(response, err, out, false, true) } } return toStorageErr(err, key, 0) } + + glog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key) } } @@ -210,7 +240,7 @@ func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion stri return nil, err } key = path.Join(h.pathPrefix, key) - w := newEtcdWatcher(false, h.quorum, nil, storage.SimpleFilter(pred), h.codec, h.versioner, nil, h) + w := newEtcdWatcher(false, h.quorum, nil, storage.SimpleFilter(pred), h.codec, h.versioner, nil, h.transformer, h) go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV) return w, nil } @@ -225,7 +255,7 @@ func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion return nil, err } key = path.Join(h.pathPrefix, key) - w := newEtcdWatcher(true, h.quorum, exceptKey(key), storage.SimpleFilter(pred), h.codec, h.versioner, nil, h) + w := newEtcdWatcher(true, h.quorum, exceptKey(key), storage.SimpleFilter(pred), h.codec, h.versioner, nil, h.transformer, h) go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV) return w, nil } @@ -236,13 +266,13 @@ func (h *etcdHelper) Get(ctx context.Context, key string, resourceVersion string glog.Errorf("Context is nil") } key = path.Join(h.pathPrefix, key) - _, _, _, err := h.bodyAndExtractObj(ctx, key, objPtr, ignoreNotFound) + _, _, _, _, err := h.bodyAndExtractObj(ctx, key, objPtr, ignoreNotFound) return err } // bodyAndExtractObj performs the normal Get path to etcd, returning the parsed node and response for additional information // about the response, like the current etcd index and the ttl. -func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, err error) { +func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, stale bool, err error) { if ctx == nil { glog.Errorf("Context is nil") } @@ -255,13 +285,13 @@ func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr r response, err := h.etcdKeysAPI.Get(ctx, key, opts) metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime) if err != nil && !etcdutil.IsEtcdNotFound(err) { - return "", nil, nil, toStorageErr(err, key, 0) + return "", nil, nil, false, toStorageErr(err, key, 0) } - body, node, err = h.extractObj(response, err, objPtr, ignoreNotFound, false) - return body, node, response, toStorageErr(err, key, 0) + body, node, stale, err = h.extractObj(response, err, objPtr, ignoreNotFound, false) + return body, node, response, stale, toStorageErr(err, key, 0) } -func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, node *etcd.Node, err error) { +func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, node *etcd.Node, stale bool, err error) { if response != nil { if prevNode { node = response.PrevNode @@ -273,26 +303,30 @@ func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run if ignoreNotFound { v, err := conversion.EnforcePtr(objPtr) if err != nil { - return "", nil, err + return "", nil, false, err } v.Set(reflect.Zero(v.Type())) - return "", nil, nil + return "", nil, false, nil } else if inErr != nil { - return "", nil, inErr + return "", nil, false, inErr } - return "", nil, fmt.Errorf("unable to locate a value on the response: %#v", response) + return "", nil, false, fmt.Errorf("unable to locate a value on the response: %#v", response) + } + + body, stale, err = h.transformer.TransformStringFromStorage(node.Value) + if err != nil { + return body, nil, stale, storage.NewInternalError(err.Error()) } - body = node.Value out, gvk, err := h.codec.Decode([]byte(body), nil, objPtr) if err != nil { - return body, nil, err + return body, nil, stale, err } if out != objPtr { - return body, nil, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr)) + return body, nil, stale, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr)) } // being unable to set the version does not prevent the object from being extracted _ = h.versioner.UpdateObject(objPtr, node.ModifiedIndex) - return body, node, err + return body, node, stale, err } // Implements storage.Interface. @@ -359,7 +393,14 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) } } else { - obj, _, err := h.codec.Decode([]byte(node.Value), nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object)) + body, _, err := h.transformer.TransformStringFromStorage(node.Value) + if err != nil { + // omit items from lists and watches that cannot be transformed, but log the error + utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", node.Key, err)) + continue + } + + obj, _, err := h.codec.Decode([]byte(body), nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object)) if err != nil { return err } @@ -448,7 +489,7 @@ func (h *etcdHelper) GuaranteedUpdate( key = path.Join(h.pathPrefix, key) for { obj := reflect.New(v.Type()).Interface().(runtime.Object) - origBody, node, res, err := h.bodyAndExtractObj(ctx, key, obj, ignoreNotFound) + origBody, node, res, stale, err := h.bodyAndExtractObj(ctx, key, obj, ignoreNotFound) if err != nil { return toStorageErr(err, key, 0) } @@ -493,10 +534,15 @@ func (h *etcdHelper) GuaranteedUpdate( return errors.New("resourceVersion cannot be set on objects store in etcd") } - data, err := runtime.Encode(h.codec, ret) + newBodyData, err := runtime.Encode(h.codec, ret) if err != nil { return err } + newBody := string(newBodyData) + data, err := h.transformer.TransformStringToStorage(newBody) + if err != nil { + return storage.NewInternalError(err.Error()) + } // First time this key has been used, try creating new value. if index == 0 { @@ -505,19 +551,20 @@ func (h *etcdHelper) GuaranteedUpdate( TTL: time.Duration(ttl) * time.Second, PrevExist: etcd.PrevNoExist, } - response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts) + response, err := h.etcdKeysAPI.Set(ctx, key, data, &opts) metrics.RecordEtcdRequestLatency("create", getTypeName(ptrToType), startTime) if etcdutil.IsEtcdNodeExist(err) { continue } - _, _, err = h.extractObj(response, err, ptrToType, false, false) + _, _, _, err = h.extractObj(response, err, ptrToType, false, false) return toStorageErr(err, key, 0) } - if string(data) == origBody { - // If we don't send an update, we simply return the currently existing - // version of the object. - _, _, err := h.extractObj(res, nil, ptrToType, ignoreNotFound, false) + // If we don't send an update, we simply return the currently existing + // version of the object. However, the value transformer may indicate that + // the on disk representation has changed and that we must commit an update. + if newBody == origBody && !stale { + _, _, _, err := h.extractObj(res, nil, ptrToType, ignoreNotFound, false) return err } @@ -527,13 +574,13 @@ func (h *etcdHelper) GuaranteedUpdate( PrevIndex: index, TTL: time.Duration(ttl) * time.Second, } - response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts) + response, err := h.etcdKeysAPI.Set(ctx, key, data, &opts) metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime) if etcdutil.IsEtcdTestFailed(err) { // Try again. continue } - _, _, err = h.extractObj(response, err, ptrToType, false, false) + _, _, _, err = h.extractObj(response, err, ptrToType, false, false) return toStorageErr(err, key, int64(index)) } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper_test.go index 1d7f525358f..5845e0a2cbe 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper_test.go @@ -17,8 +17,10 @@ limitations under the License. package etcd import ( + "fmt" "path" "reflect" + "strings" "sync" "testing" "time" @@ -33,6 +35,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apiserver/pkg/apis/example" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/storage" @@ -42,6 +45,34 @@ import ( storagetests "k8s.io/apiserver/pkg/storage/tests" ) +// prefixTransformer adds and verifies that all data has the correct prefix on its way in and out. +type prefixTransformer struct { + prefix string + stale bool + err error +} + +func (p prefixTransformer) TransformStringFromStorage(s string) (string, bool, error) { + if !strings.HasPrefix(s, p.prefix) { + return "", false, fmt.Errorf("value does not have expected prefix: %s", s) + } + return strings.TrimPrefix(s, p.prefix), p.stale, p.err +} +func (p prefixTransformer) TransformStringToStorage(s string) (string, error) { + if len(s) > 0 { + return p.prefix + s, p.err + } + return s, p.err +} + +func defaultPrefix(s string) string { + return "test!" + s +} + +func defaultPrefixValue(value []byte) string { + return defaultPrefix(string(value)) +} + func testScheme(t *testing.T) (*runtime.Scheme, serializer.CodecFactory) { scheme := runtime.NewScheme() scheme.Log(t) @@ -66,7 +97,7 @@ func testScheme(t *testing.T) (*runtime.Scheme, serializer.CodecFactory) { } func newEtcdHelper(client etcd.Client, scheme *runtime.Scheme, codec runtime.Codec, prefix string) etcdHelper { - return *NewEtcdStorage(client, codec, prefix, false, etcdtest.DeserializationCacheSize, scheme).(*etcdHelper) + return *NewEtcdStorage(client, codec, prefix, false, etcdtest.DeserializationCacheSize, scheme, prefixTransformer{prefix: "test!"}).(*etcdHelper) } // Returns an encoded version of example.Pod with the given name. @@ -135,6 +166,61 @@ func TestList(t *testing.T) { } } +func TestTransformationFailure(t *testing.T) { + scheme, codecs := testScheme(t) + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) + server := etcdtesting.NewEtcdTestClientServer(t) + defer server.Terminate(t) + helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix()) + + pods := []example.Pod{ + { + ObjectMeta: metav1.ObjectMeta{Name: "bar"}, + Spec: storagetests.DeepEqualSafePodSpec(), + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "baz"}, + Spec: storagetests.DeepEqualSafePodSpec(), + }, + } + createPodList(t, helper, &example.PodList{Items: pods[:1]}) + + // create a second resource with an invalid prefix + oldTransformer := helper.transformer + helper.transformer = prefixTransformer{prefix: "otherprefix!"} + createPodList(t, helper, &example.PodList{Items: pods[1:]}) + helper.transformer = oldTransformer + + // only the first item is returned, and no error + var got example.PodList + if err := helper.List(context.TODO(), "/", "", storage.Everything, &got); err != nil { + t.Errorf("Unexpected error %v", err) + } + if e, a := pods[:1], got.Items; !reflect.DeepEqual(e, a) { + t.Errorf("Unexpected: %s", diff.ObjectReflectDiff(e, a)) + } + + // Get should fail + if err := helper.Get(context.TODO(), "/baz", "", &example.Pod{}, false); !storage.IsInternalError(err) { + t.Errorf("Unexpected error: %v", err) + } + // GuaranteedUpdate should return an error + if err := helper.GuaranteedUpdate(context.TODO(), "/baz", &example.Pod{}, false, nil, func(input runtime.Object, res storage.ResponseMeta) (output runtime.Object, ttl *uint64, err error) { + return input, nil, nil + }, &pods[1]); !storage.IsInternalError(err) { + t.Errorf("Unexpected error: %v", err) + } + + // Delete succeeds but reports an error because we cannot access the body + if err := helper.Delete(context.TODO(), "/baz", &example.Pod{}, nil); !storage.IsInternalError(err) { + t.Errorf("Unexpected error: %v", err) + } + + if err := helper.Get(context.TODO(), "/baz", "", &example.Pod{}, false); !storage.IsNotFound(err) { + t.Errorf("Unexpected error: %v", err) + } +} + func TestListFiltered(t *testing.T) { scheme, codecs := testScheme(t) codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) @@ -364,7 +450,8 @@ func TestGuaranteedUpdateNoChange(t *testing.T) { helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix()) obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1} - err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { + original := &storagetesting.TestResource{} + err := helper.GuaranteedUpdate(context.TODO(), key, original, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { return obj, nil })) if err != nil { @@ -373,7 +460,27 @@ func TestGuaranteedUpdateNoChange(t *testing.T) { // Update an existing node with the same data callbackCalled := false - objUpdate := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1} + objUpdate := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: original.ResourceVersion}, Value: 1} + result := &storagetesting.TestResource{} + err = helper.GuaranteedUpdate(context.TODO(), key, result, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { + callbackCalled = true + return objUpdate, nil + })) + if err != nil { + t.Fatalf("Unexpected error %#v", err) + } + if !callbackCalled { + t.Errorf("tryUpdate callback should have been called.") + } + if result.ResourceVersion != original.ResourceVersion { + t.Fatalf("updated the object resource version") + } + + // Update an existing node with the same data but return stale + helper.transformer = prefixTransformer{prefix: "test!", stale: true} + callbackCalled = false + result = &storagetesting.TestResource{} + objUpdate = &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1} err = helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { callbackCalled = true return objUpdate, nil @@ -384,6 +491,9 @@ func TestGuaranteedUpdateNoChange(t *testing.T) { if !callbackCalled { t.Errorf("tryUpdate callback should have been called.") } + if result.ResourceVersion == original.ResourceVersion { + t.Errorf("did not update the object resource version") + } } func TestGuaranteedUpdateKeyNotFound(t *testing.T) { @@ -540,7 +650,7 @@ func TestDeleteWithRetry(t *testing.T) { // party has updated the object. fakeGet := func(ctx context.Context, key string, opts *etcd.GetOptions) (*etcd.Response, error) { data, _ := runtime.Encode(codec, obj) - return &etcd.Response{Node: &etcd.Node{Value: string(data), ModifiedIndex: 99}}, nil + return &etcd.Response{Node: &etcd.Node{Value: defaultPrefixValue(data), ModifiedIndex: 99}}, nil } expectedRetries := 3 helper := newEtcdHelper(server.Client, scheme, codec, prefix) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher.go index a1fb305275d..1cd368bd86e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher.go @@ -71,8 +71,9 @@ type etcdWatcher struct { // Note that versioner is required for etcdWatcher to work correctly. // There is no public constructor of it, so be careful when manipulating // with it manually. - versioner storage.Versioner - transform TransformFunc + versioner storage.Versioner + transform TransformFunc + valueTransformer ValueTransformer list bool // If we're doing a recursive watch, should be true. quorum bool // If we enable quorum, shoule be true @@ -107,15 +108,18 @@ const watchWaitDuration = 100 * time.Millisecond func newEtcdWatcher( list bool, quorum bool, include includeFunc, filter storage.FilterFunc, encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc, + valueTransformer ValueTransformer, cache etcdCache) *etcdWatcher { w := &etcdWatcher{ - encoding: encoding, - versioner: versioner, - transform: transform, - list: list, - quorum: quorum, - include: include, - filter: filter, + encoding: encoding, + versioner: versioner, + transform: transform, + valueTransformer: valueTransformer, + + list: list, + quorum: quorum, + include: include, + filter: filter, // Buffer this channel, so that the etcd client is not forced // to context switch with every object it gets, and so that a // long time spent decoding an object won't block the *next* @@ -309,12 +313,18 @@ func (w *etcdWatcher) translate() { } } +// decodeObject extracts an object from the provided etcd node or returns an error. func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) { if obj, found := w.cache.getFromCache(node.ModifiedIndex, storage.SimpleFilter(storage.Everything)); found { return obj, nil } - obj, err := runtime.Decode(w.encoding, []byte(node.Value)) + body, _, err := w.valueTransformer.TransformStringFromStorage(node.Value) + if err != nil { + return nil, err + } + + obj, err := runtime.Decode(w.encoding, []byte(body)) if err != nil { return nil, err } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher_test.go index b296c149421..d2fe49acffd 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher_test.go @@ -133,7 +133,7 @@ func TestWatchInterpretations(t *testing.T) { } for name, item := range table { for _, action := range item.actions { - w := newEtcdWatcher(true, false, nil, firstLetterIsB, codec, versioner, nil, &fakeEtcdCache{}) + w := newEtcdWatcher(true, false, nil, firstLetterIsB, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{}) emitCalled := false w.emit = func(event watch.Event) { emitCalled = true @@ -150,10 +150,10 @@ func TestWatchInterpretations(t *testing.T) { var n, pn *etcd.Node if item.nodeValue != "" { - n = &etcd.Node{Value: item.nodeValue} + n = &etcd.Node{Value: defaultPrefix(item.nodeValue)} } if item.prevNodeValue != "" { - pn = &etcd.Node{Value: item.prevNodeValue} + pn = &etcd.Node{Value: defaultPrefix(item.prevNodeValue)} } w.sendResult(&etcd.Response{ @@ -173,7 +173,7 @@ func TestWatchInterpretations(t *testing.T) { func TestWatchInterpretation_ResponseNotSet(t *testing.T) { _, codecs := testScheme(t) codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"}) - w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, &fakeEtcdCache{}) + w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{}) w.emit = func(e watch.Event) { t.Errorf("Unexpected emit: %v", e) } @@ -189,7 +189,7 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) { codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"}) actions := []string{"create", "set", "compareAndSwap", "delete"} for _, action := range actions { - w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, &fakeEtcdCache{}) + w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{}) w.emit = func(e watch.Event) { t.Errorf("Unexpected emit: %v", e) } @@ -205,20 +205,20 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) { codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"}) actions := []string{"create", "set", "compareAndSwap", "delete"} for _, action := range actions { - w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, &fakeEtcdCache{}) + w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{}) w.emit = func(e watch.Event) { t.Errorf("Unexpected emit: %v", e) } w.sendResult(&etcd.Response{ Action: action, Node: &etcd.Node{ - Value: "foobar", + Value: defaultPrefix("foobar"), }, }) w.sendResult(&etcd.Response{ Action: action, PrevNode: &etcd.Node{ - Value: "foobar", + Value: defaultPrefix("foobar"), }, }) w.Stop() @@ -231,7 +231,7 @@ func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) { filter := func(obj runtime.Object) bool { return obj.(*example.Pod).Name != "bar" } - w := newEtcdWatcher(false, false, nil, filter, codec, versioner, nil, &fakeEtcdCache{}) + w := newEtcdWatcher(false, false, nil, filter, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{}) eventChan := make(chan watch.Event, 1) w.emit = func(e watch.Event) { @@ -259,7 +259,7 @@ func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) { ModifiedIndex: 2, }, PrevNode: &etcd.Node{ - Value: string(fooBytes), + Value: defaultPrefixValue(fooBytes), ModifiedIndex: 1, }, }, @@ -268,11 +268,11 @@ func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) { response: &etcd.Response{ Action: EtcdSet, Node: &etcd.Node{ - Value: string(barBytes), + Value: defaultPrefixValue(barBytes), ModifiedIndex: 2, }, PrevNode: &etcd.Node{ - Value: string(fooBytes), + Value: defaultPrefixValue(fooBytes), ModifiedIndex: 1, }, }, diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index e55c646b0b5..19fc4b6dc5a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -33,21 +33,42 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/storage" - utiltrace "k8s.io/apiserver/pkg/util/trace" "k8s.io/apiserver/pkg/storage/etcd" + utiltrace "k8s.io/apiserver/pkg/util/trace" ) +// ValueTransformer allows a string value to be transformed before being read from or written to the underlying store. The methods +// must be able to undo the transformation caused by the other. +type ValueTransformer interface { + // TransformFromStorage may transform the provided data from its underlying storage representation or return an error. + // Stale is true if the object on disk is stale and a write to etcd should be issued, even if the contents of the object + // have not changed. + TransformFromStorage([]byte) (data []byte, stale bool, err error) + // TransformToStorage may transform the provided data into the appropriate form in storage or return an error. + TransformToStorage([]byte) (data []byte, err error) +} + +type identityTransformer struct{} + +func (identityTransformer) TransformFromStorage(b []byte) ([]byte, bool, error) { return b, false, nil } +func (identityTransformer) TransformToStorage(b []byte) ([]byte, error) { return b, nil } + +// IdentityTransformer performs no transformation on the provided values. +var IdentityTransformer ValueTransformer = identityTransformer{} + type store struct { client *clientv3.Client // getOpts contains additional options that should be passed // to all Get() calls. - getOps []clientv3.OpOption - codec runtime.Codec - versioner storage.Versioner - pathPrefix string - watcher *watcher + getOps []clientv3.OpOption + codec runtime.Codec + versioner storage.Versioner + transformer ValueTransformer + pathPrefix string + watcher *watcher } type elemForDecode struct { @@ -56,31 +77,33 @@ type elemForDecode struct { } type objState struct { - obj runtime.Object - meta *storage.ResponseMeta - rev int64 - data []byte + obj runtime.Object + meta *storage.ResponseMeta + rev int64 + data []byte + stale bool } // New returns an etcd3 implementation of storage.Interface. -func New(c *clientv3.Client, codec runtime.Codec, prefix string) storage.Interface { - return newStore(c, true, codec, prefix) +func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer ValueTransformer) storage.Interface { + return newStore(c, true, codec, prefix, transformer) } // NewWithNoQuorumRead returns etcd3 implementation of storage.Interface // where Get operations don't require quorum read. -func NewWithNoQuorumRead(c *clientv3.Client, codec runtime.Codec, prefix string) storage.Interface { - return newStore(c, false, codec, prefix) +func NewWithNoQuorumRead(c *clientv3.Client, codec runtime.Codec, prefix string, transformer ValueTransformer) storage.Interface { + return newStore(c, false, codec, prefix, transformer) } -func newStore(c *clientv3.Client, quorumRead bool, codec runtime.Codec, prefix string) *store { +func newStore(c *clientv3.Client, quorumRead bool, codec runtime.Codec, prefix string, transformer ValueTransformer) *store { versioner := etcd.APIObjectVersioner{} result := &store{ - client: c, - versioner: versioner, - codec: codec, - pathPrefix: prefix, - watcher: newWatcher(c, codec, versioner), + client: c, + codec: codec, + versioner: versioner, + transformer: transformer, + pathPrefix: prefix, + watcher: newWatcher(c, codec, versioner, transformer), } if !quorumRead { // In case of non-quorum reads, we can set WithSerializable() @@ -110,7 +133,13 @@ func (s *store) Get(ctx context.Context, key string, resourceVersion string, out return storage.NewKeyNotFoundError(key, 0) } kv := getResp.Kvs[0] - return decode(s.codec, s.versioner, kv.Value, out, kv.ModRevision) + + data, _, err := s.transformer.TransformFromStorage(kv.Value) + if err != nil { + return storage.NewInternalError(err.Error()) + } + + return decode(s.codec, s.versioner, data, out, kv.ModRevision) } // Create implements storage.Interface.Create. @@ -129,10 +158,15 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, return err } + newData, err := s.transformer.TransformToStorage(data) + if err != nil { + return storage.NewInternalError(err.Error()) + } + txnResp, err := s.client.KV.Txn(ctx).If( notFound(key), ).Then( - clientv3.OpPut(key, string(data), opts...), + clientv3.OpPut(key, string(newData), opts...), ).Commit() if err != nil { return err @@ -177,7 +211,11 @@ func (s *store) unconditionalDelete(ctx context.Context, key string, out runtime } kv := getResp.Kvs[0] - return decode(s.codec, s.versioner, kv.Value, out, kv.ModRevision) + data, _, err := s.transformer.TransformFromStorage(kv.Value) + if err != nil { + return storage.NewInternalError(err.Error()) + } + return decode(s.codec, s.versioner, data, out, kv.ModRevision) } func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, precondtions *storage.Preconditions) error { @@ -257,10 +295,15 @@ func (s *store) GuaranteedUpdate( if err != nil { return err } - if bytes.Equal(data, origState.data) { + if !origState.stale && bytes.Equal(data, origState.data) { return decode(s.codec, s.versioner, origState.data, out, origState.rev) } + newData, err := s.transformer.TransformToStorage(data) + if err != nil { + return storage.NewInternalError(err.Error()) + } + opts, err := s.ttlOpts(ctx, int64(ttl)) if err != nil { return err @@ -270,7 +313,7 @@ func (s *store) GuaranteedUpdate( txnResp, err := s.client.KV.Txn(ctx).If( clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev), ).Then( - clientv3.OpPut(key, string(data), opts...), + clientv3.OpPut(key, string(newData), opts...), ).Else( clientv3.OpGet(key), ).Commit() @@ -289,6 +332,7 @@ func (s *store) GuaranteedUpdate( continue } putResp := txnResp.Responses[0].GetResponsePut() + return decode(s.codec, s.versioner, data, out, putResp.Header.Revision) } } @@ -308,8 +352,12 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin if len(getResp.Kvs) == 0 { return nil } + data, _, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value) + if err != nil { + return storage.NewInternalError(err.Error()) + } elems := []*elemForDecode{{ - data: getResp.Kvs[0].Value, + data: data, rev: uint64(getResp.Kvs[0].ModRevision), }} if err := decodeList(elems, storage.SimpleFilter(pred), listPtr, s.codec, s.versioner); err != nil { @@ -337,12 +385,18 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor return err } - elems := make([]*elemForDecode, len(getResp.Kvs)) - for i, kv := range getResp.Kvs { - elems[i] = &elemForDecode{ - data: kv.Value, - rev: uint64(kv.ModRevision), + elems := make([]*elemForDecode, 0, len(getResp.Kvs)) + for _, kv := range getResp.Kvs { + data, _, err := s.transformer.TransformFromStorage(kv.Value) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", key, err)) + continue } + + elems = append(elems, &elemForDecode{ + data: data, + rev: uint64(kv.ModRevision), + }) } if err := decodeList(elems, storage.SimpleFilter(pred), listPtr, s.codec, s.versioner); err != nil { return err @@ -383,9 +437,14 @@ func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Va return nil, err } } else { + data, stale, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value) + if err != nil { + return nil, storage.NewInternalError(err.Error()) + } state.rev = getResp.Kvs[0].ModRevision state.meta.ResourceVersion = uint64(state.rev) - state.data = getResp.Kvs[0].Value + state.data = data + state.stale = stale if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil { return nil, err } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index ce379a33898..de58ca58c9f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -17,6 +17,7 @@ limitations under the License. package etcd3 import ( + "bytes" "fmt" "reflect" "sync" @@ -28,10 +29,12 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/apis/example" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/storage" + storagetests "k8s.io/apiserver/pkg/storage/tests" "github.com/coreos/etcd/integration" "golang.org/x/net/context" @@ -46,6 +49,26 @@ func init() { examplev1.AddToScheme(scheme) } +// prefixTransformer adds and verifies that all data has the correct prefix on its way in and out. +type prefixTransformer struct { + prefix []byte + stale bool + err error +} + +func (p prefixTransformer) TransformFromStorage(b []byte) ([]byte, bool, error) { + if !bytes.HasPrefix(b, p.prefix) { + return nil, false, fmt.Errorf("value does not have expected prefix: %s", string(b)) + } + return bytes.TrimPrefix(b, p.prefix), p.stale, p.err +} +func (p prefixTransformer) TransformToStorage(b []byte) ([]byte, error) { + if len(b) > 0 { + return append(append([]byte{}, p.prefix...), b...), p.err + } + return b, p.err +} + func TestCreate(t *testing.T) { ctx, store, cluster := testSetup(t) defer cluster.Terminate(t) @@ -294,6 +317,7 @@ func TestGuaranteedUpdate(t *testing.T) { expectNotFoundErr bool expectInvalidObjErr bool expectNoUpdate bool + transformStale bool }{{ // GuaranteedUpdate on non-existing key with ignoreNotFound=false key: "/non-existing", ignoreNotFound: false, @@ -322,6 +346,14 @@ func TestGuaranteedUpdate(t *testing.T) { expectNotFoundErr: false, expectInvalidObjErr: false, expectNoUpdate: true, + }, { // GuaranteedUpdate with same data but stale + key: key, + ignoreNotFound: false, + precondition: nil, + expectNotFoundErr: false, + expectInvalidObjErr: false, + expectNoUpdate: false, + transformStale: true, }, { // GuaranteedUpdate with UID match key: key, ignoreNotFound: false, @@ -344,6 +376,12 @@ func TestGuaranteedUpdate(t *testing.T) { if tt.expectNoUpdate { name = storeObj.Name } + originalTransformer := store.transformer.(prefixTransformer) + if tt.transformStale { + transformer := originalTransformer + transformer.stale = true + store.transformer = transformer + } version := storeObj.ResourceVersion err := store.GuaranteedUpdate(ctx, tt.key, out, tt.ignoreNotFound, tt.precondition, storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { @@ -356,6 +394,7 @@ func TestGuaranteedUpdate(t *testing.T) { pod.Name = name return &pod, nil })) + store.transformer = originalTransformer if tt.expectNotFoundErr { if err == nil || !storage.IsNotFound(err) { @@ -460,11 +499,91 @@ func TestGuaranteedUpdateWithConflict(t *testing.T) { } } +func TestTransformationFailure(t *testing.T) { + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")}) + ctx := context.Background() + + preset := []struct { + key string + obj *example.Pod + storedObj *example.Pod + }{{ + key: "/one-level/test", + obj: &example.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "bar"}, + Spec: storagetests.DeepEqualSafePodSpec(), + }, + }, { + key: "/two-level/1/test", + obj: &example.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "baz"}, + Spec: storagetests.DeepEqualSafePodSpec(), + }, + }} + for i, ps := range preset[:1] { + preset[i].storedObj = &example.Pod{} + err := store.Create(ctx, ps.key, ps.obj, preset[:1][i].storedObj, 0) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + } + + // create a second resource with an invalid prefix + oldTransformer := store.transformer + store.transformer = prefixTransformer{prefix: []byte("otherprefix!")} + for i, ps := range preset[1:] { + preset[1:][i].storedObj = &example.Pod{} + err := store.Create(ctx, ps.key, ps.obj, preset[1:][i].storedObj, 0) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + } + store.transformer = oldTransformer + + // only the first item is returned, and no error + var got example.PodList + if err := store.List(ctx, "/", "", storage.Everything, &got); err != nil { + t.Errorf("Unexpected error %v", err) + } + if e, a := []example.Pod{*preset[0].storedObj}, got.Items; !reflect.DeepEqual(e, a) { + t.Errorf("Unexpected: %s", diff.ObjectReflectDiff(e, a)) + } + + // Get should fail + if err := store.Get(ctx, preset[1].key, "", &example.Pod{}, false); !storage.IsInternalError(err) { + t.Errorf("Unexpected error: %v", err) + } + // GuaranteedUpdate without suggestion should return an error + if err := store.GuaranteedUpdate(ctx, preset[1].key, &example.Pod{}, false, nil, func(input runtime.Object, res storage.ResponseMeta) (output runtime.Object, ttl *uint64, err error) { + return input, nil, nil + }); !storage.IsInternalError(err) { + t.Errorf("Unexpected error: %v", err) + } + // GuaranteedUpdate with suggestion should not return an error if we don't change the object + if err := store.GuaranteedUpdate(ctx, preset[1].key, &example.Pod{}, false, nil, func(input runtime.Object, res storage.ResponseMeta) (output runtime.Object, ttl *uint64, err error) { + return input, nil, nil + }, preset[1].obj); err != nil { + t.Errorf("Unexpected error: %v", err) + } + + // Delete succeeds but reports an error because we cannot access the body + if err := store.Delete(ctx, preset[1].key, &example.Pod{}, nil); !storage.IsInternalError(err) { + t.Errorf("Unexpected error: %v", err) + } + + if err := store.Get(ctx, preset[1].key, "", &example.Pod{}, false); !storage.IsNotFound(err) { + t.Errorf("Unexpected error: %v", err) + } +} + func TestList(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), false, codec, "") + store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")}) ctx := context.Background() // Setup storage with the following structure: @@ -552,7 +671,7 @@ func TestList(t *testing.T) { func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - store := newStore(cluster.RandClient(), false, codec, "") + store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")}) ctx := context.Background() return ctx, store, cluster } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go index e75d4b98148..fca49d9cccd 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go @@ -40,9 +40,10 @@ const ( ) type watcher struct { - client *clientv3.Client - codec runtime.Codec - versioner storage.Versioner + client *clientv3.Client + codec runtime.Codec + versioner storage.Versioner + transformer ValueTransformer } // watchChan implements watch.Interface. @@ -59,11 +60,12 @@ type watchChan struct { errChan chan error } -func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.Versioner) *watcher { +func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.Versioner, transformer ValueTransformer) *watcher { return &watcher{ - client: client, - codec: codec, - versioner: versioner, + client: client, + codec: codec, + versioner: versioner, + transformer: transformer, } } @@ -341,7 +343,11 @@ func (wc *watchChan) sendEvent(e *event) { func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtime.Object, err error) { if !e.isDeleted { - curObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, e.value, e.rev) + data, _, err := wc.watcher.transformer.TransformFromStorage(e.value) + if err != nil { + return nil, nil, err + } + curObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, data, e.rev) if err != nil { return nil, nil, err } @@ -352,9 +358,13 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim // we need the object only to compute whether it was filtered out // before). if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) { + data, _, err := wc.watcher.transformer.TransformFromStorage(e.prevValue) + if err != nil { + return nil, nil, err + } // Note that this sends the *old* object with the etcd revision for the time at // which it gets deleted. - oldObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, e.prevValue, e.rev) + oldObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, data, e.rev) if err != nil { return nil, nil, err } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 753c546c373..00c31735a91 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -227,13 +227,13 @@ func TestWatchError(t *testing.T) { codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)} cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - invalidStore := newStore(cluster.RandClient(), false, codec, "") + invalidStore := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")}) ctx := context.Background() w, err := invalidStore.Watch(ctx, "/abc", "0", storage.Everything) if err != nil { t.Fatalf("Watch failed: %v", err) } - validStore := newStore(cluster.RandClient(), false, codec, "") + validStore := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")}) validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate( func(runtime.Object) (runtime.Object, error) { return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd2.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd2.go index 78377c49356..a96897ca1f8 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd2.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd2.go @@ -39,7 +39,7 @@ func newETCD2Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e if err != nil { return nil, nil, err } - s := etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize, c.Copier) + s := etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize, c.Copier, etcd.IdentityTransformer) return s, tr.CloseIdleConnections, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go index fa457719ef4..48f2b760d23 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go @@ -56,7 +56,7 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e client.Close() } if c.Quorum { - return etcd3.New(client, c.Codec, c.Prefix), destroyFunc, nil + return etcd3.New(client, c.Codec, c.Prefix, etcd3.IdentityTransformer), destroyFunc, nil } - return etcd3.NewWithNoQuorumRead(client, c.Codec, c.Prefix), destroyFunc, nil + return etcd3.NewWithNoQuorumRead(client, c.Codec, c.Prefix, etcd3.IdentityTransformer), destroyFunc, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index d3690c42215..e180e47cd90 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -93,7 +93,7 @@ func AddObjectMetaFieldsSet(source fields.Set, objectMeta *metav1.ObjectMeta, ha func newEtcdTestStorage(t *testing.T, prefix string) (*etcdtesting.EtcdTestServer, storage.Interface) { server, _ := etcdtesting.NewUnsecuredEtcd3TestClientServer(t, scheme) - storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), prefix) + storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), prefix, etcd3.IdentityTransformer) return server, storage } diff --git a/vendor/BUILD b/vendor/BUILD index 076bf294155..220a1b28119 100644 --- a/vendor/BUILD +++ b/vendor/BUILD @@ -14930,6 +14930,7 @@ go_test( "//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/runtime/schema", "//vendor:k8s.io/apimachinery/pkg/runtime/serializer", + "//vendor:k8s.io/apimachinery/pkg/util/diff", "//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/apiserver/pkg/apis/example", "//vendor:k8s.io/apiserver/pkg/apis/example/v1", @@ -15048,11 +15049,13 @@ go_test( "//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/runtime/schema", "//vendor:k8s.io/apimachinery/pkg/runtime/serializer", + "//vendor:k8s.io/apimachinery/pkg/util/diff", "//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/apiserver/pkg/apis/example", "//vendor:k8s.io/apiserver/pkg/apis/example/v1", "//vendor:k8s.io/apiserver/pkg/storage", + "//vendor:k8s.io/apiserver/pkg/storage/tests", ], ) @@ -15075,6 +15078,7 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/conversion", "//vendor:k8s.io/apimachinery/pkg/runtime", + "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/apiserver/pkg/storage", "//vendor:k8s.io/apiserver/pkg/storage/etcd",