mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Merge pull request #40664 from smarterclayton/etcd_data_wrapper
Automatic merge from submit-queue (batch tested with PRs 41061, 40888, 40664, 41020, 41085) Allow values to be wrapped prior to serialization in etcd This adds a new value transformer to the etcd2 store that can transform the value from etcd on read and write. This will allow the store to implement encryption at rest or otherwise transform the value prior to persistence. * [x] etcd3 store * [x] example of transformation * [x] partial error handling This is in support of #12742
This commit is contained in:
commit
bc60def8a8
@ -30,6 +30,8 @@ func InterpretListError(err error, qualifiedResource schema.GroupResource) error
|
|||||||
return errors.NewNotFound(qualifiedResource, "")
|
return errors.NewNotFound(qualifiedResource, "")
|
||||||
case storage.IsUnreachable(err):
|
case storage.IsUnreachable(err):
|
||||||
return errors.NewServerTimeout(qualifiedResource, "list", 2) // TODO: make configurable or handled at a higher level
|
return errors.NewServerTimeout(qualifiedResource, "list", 2) // TODO: make configurable or handled at a higher level
|
||||||
|
case storage.IsInternalError(err):
|
||||||
|
return errors.NewInternalError(err)
|
||||||
default:
|
default:
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -43,6 +45,8 @@ func InterpretGetError(err error, qualifiedResource schema.GroupResource, name s
|
|||||||
return errors.NewNotFound(qualifiedResource, name)
|
return errors.NewNotFound(qualifiedResource, name)
|
||||||
case storage.IsUnreachable(err):
|
case storage.IsUnreachable(err):
|
||||||
return errors.NewServerTimeout(qualifiedResource, "get", 2) // TODO: make configurable or handled at a higher level
|
return errors.NewServerTimeout(qualifiedResource, "get", 2) // TODO: make configurable or handled at a higher level
|
||||||
|
case storage.IsInternalError(err):
|
||||||
|
return errors.NewInternalError(err)
|
||||||
default:
|
default:
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -56,6 +60,8 @@ func InterpretCreateError(err error, qualifiedResource schema.GroupResource, nam
|
|||||||
return errors.NewAlreadyExists(qualifiedResource, name)
|
return errors.NewAlreadyExists(qualifiedResource, name)
|
||||||
case storage.IsUnreachable(err):
|
case storage.IsUnreachable(err):
|
||||||
return errors.NewServerTimeout(qualifiedResource, "create", 2) // TODO: make configurable or handled at a higher level
|
return errors.NewServerTimeout(qualifiedResource, "create", 2) // TODO: make configurable or handled at a higher level
|
||||||
|
case storage.IsInternalError(err):
|
||||||
|
return errors.NewInternalError(err)
|
||||||
default:
|
default:
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -102,6 +108,8 @@ func InterpretWatchError(err error, resource schema.GroupResource, name string)
|
|||||||
case storage.IsInvalidError(err):
|
case storage.IsInvalidError(err):
|
||||||
invalidError, _ := err.(storage.InvalidError)
|
invalidError, _ := err.(storage.InvalidError)
|
||||||
return errors.NewInvalid(schema.GroupKind{Group: resource.Group, Kind: resource.Resource}, name, invalidError.Errs)
|
return errors.NewInvalid(schema.GroupKind{Group: resource.Group, Kind: resource.Resource}, name, invalidError.Errs)
|
||||||
|
case storage.IsInternalError(err):
|
||||||
|
return errors.NewInternalError(err)
|
||||||
default:
|
default:
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -31,23 +31,46 @@ import (
|
|||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/conversion"
|
"k8s.io/apimachinery/pkg/conversion"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
"k8s.io/apiserver/pkg/storage/etcd/metrics"
|
"k8s.io/apiserver/pkg/storage/etcd/metrics"
|
||||||
|
etcdutil "k8s.io/apiserver/pkg/storage/etcd/util"
|
||||||
utilcache "k8s.io/apiserver/pkg/util/cache"
|
utilcache "k8s.io/apiserver/pkg/util/cache"
|
||||||
utiltrace "k8s.io/apiserver/pkg/util/trace"
|
utiltrace "k8s.io/apiserver/pkg/util/trace"
|
||||||
etcdutil "k8s.io/apiserver/pkg/storage/etcd/util"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ValueTransformer allows a string value to be transformed before being read from or written to the underlying store. The methods
|
||||||
|
// must be able to undo the transformation caused by the other.
|
||||||
|
type ValueTransformer interface {
|
||||||
|
// TransformStringFromStorage may transform the provided string from its underlying storage representation or return an error.
|
||||||
|
// Stale is true if the object on disk is stale and a write to etcd should be issued, even if the contents of the object
|
||||||
|
// have not changed.
|
||||||
|
TransformStringFromStorage(string) (value string, stale bool, err error)
|
||||||
|
// TransformStringToStorage may transform the provided string into the appropriate form in storage or return an error.
|
||||||
|
TransformStringToStorage(string) (value string, err error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type identityTransformer struct{}
|
||||||
|
|
||||||
|
func (identityTransformer) TransformStringFromStorage(s string) (string, bool, error) {
|
||||||
|
return s, false, nil
|
||||||
|
}
|
||||||
|
func (identityTransformer) TransformStringToStorage(s string) (string, error) { return s, nil }
|
||||||
|
|
||||||
|
// IdentityTransformer performs no transformation on the provided values.
|
||||||
|
var IdentityTransformer ValueTransformer = identityTransformer{}
|
||||||
|
|
||||||
// Creates a new storage interface from the client
|
// Creates a new storage interface from the client
|
||||||
// TODO: deprecate in favor of storage.Config abstraction over time
|
// TODO: deprecate in favor of storage.Config abstraction over time
|
||||||
func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int, copier runtime.ObjectCopier) storage.Interface {
|
func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int, copier runtime.ObjectCopier, transformer ValueTransformer) storage.Interface {
|
||||||
return &etcdHelper{
|
return &etcdHelper{
|
||||||
etcdMembersAPI: etcd.NewMembersAPI(client),
|
etcdMembersAPI: etcd.NewMembersAPI(client),
|
||||||
etcdKeysAPI: etcd.NewKeysAPI(client),
|
etcdKeysAPI: etcd.NewKeysAPI(client),
|
||||||
codec: codec,
|
codec: codec,
|
||||||
versioner: APIObjectVersioner{},
|
versioner: APIObjectVersioner{},
|
||||||
copier: copier,
|
copier: copier,
|
||||||
|
transformer: transformer,
|
||||||
pathPrefix: path.Join("/", prefix),
|
pathPrefix: path.Join("/", prefix),
|
||||||
quorum: quorum,
|
quorum: quorum,
|
||||||
cache: utilcache.NewCache(cacheSize),
|
cache: utilcache.NewCache(cacheSize),
|
||||||
@ -60,6 +83,7 @@ type etcdHelper struct {
|
|||||||
etcdKeysAPI etcd.KeysAPI
|
etcdKeysAPI etcd.KeysAPI
|
||||||
codec runtime.Codec
|
codec runtime.Codec
|
||||||
copier runtime.ObjectCopier
|
copier runtime.ObjectCopier
|
||||||
|
transformer ValueTransformer
|
||||||
// Note that versioner is required for etcdHelper to work correctly.
|
// Note that versioner is required for etcdHelper to work correctly.
|
||||||
// The public constructors (NewStorage & NewEtcdStorage) are setting it
|
// The public constructors (NewStorage & NewEtcdStorage) are setting it
|
||||||
// correctly, so be careful when manipulating with it manually.
|
// correctly, so be careful when manipulating with it manually.
|
||||||
@ -112,7 +136,13 @@ func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Ob
|
|||||||
TTL: time.Duration(ttl) * time.Second,
|
TTL: time.Duration(ttl) * time.Second,
|
||||||
PrevExist: etcd.PrevNoExist,
|
PrevExist: etcd.PrevNoExist,
|
||||||
}
|
}
|
||||||
response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts)
|
|
||||||
|
newBody, err := h.transformer.TransformStringToStorage(string(data))
|
||||||
|
if err != nil {
|
||||||
|
return storage.NewInternalError(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
response, err := h.etcdKeysAPI.Set(ctx, key, newBody, &opts)
|
||||||
trace.Step("Object created")
|
trace.Step("Object created")
|
||||||
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
|
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -122,7 +152,7 @@ func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Ob
|
|||||||
if _, err := conversion.EnforcePtr(out); err != nil {
|
if _, err := conversion.EnforcePtr(out); err != nil {
|
||||||
panic("unable to convert output object to pointer")
|
panic("unable to convert output object to pointer")
|
||||||
}
|
}
|
||||||
_, _, err = h.extractObj(response, err, out, false, false)
|
_, _, _, err = h.extractObj(response, err, out, false, false)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -160,7 +190,7 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object,
|
|||||||
if !etcdutil.IsEtcdNotFound(err) {
|
if !etcdutil.IsEtcdNotFound(err) {
|
||||||
// if the object that existed prior to the delete is returned by etcd, update the out object.
|
// if the object that existed prior to the delete is returned by etcd, update the out object.
|
||||||
if err != nil || response.PrevNode != nil {
|
if err != nil || response.PrevNode != nil {
|
||||||
_, _, err = h.extractObj(response, err, out, false, true)
|
_, _, _, err = h.extractObj(response, err, out, false, true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return toStorageErr(err, key, 0)
|
return toStorageErr(err, key, 0)
|
||||||
@ -169,7 +199,7 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object,
|
|||||||
// Check the preconditions match.
|
// Check the preconditions match.
|
||||||
obj := reflect.New(v.Type()).Interface().(runtime.Object)
|
obj := reflect.New(v.Type()).Interface().(runtime.Object)
|
||||||
for {
|
for {
|
||||||
_, node, res, err := h.bodyAndExtractObj(ctx, key, obj, false)
|
_, node, res, _, err := h.bodyAndExtractObj(ctx, key, obj, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return toStorageErr(err, key, 0)
|
return toStorageErr(err, key, 0)
|
||||||
}
|
}
|
||||||
@ -186,17 +216,17 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object,
|
|||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
response, err := h.etcdKeysAPI.Delete(ctx, key, &opt)
|
response, err := h.etcdKeysAPI.Delete(ctx, key, &opt)
|
||||||
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
|
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
|
||||||
if etcdutil.IsEtcdTestFailed(err) {
|
if !etcdutil.IsEtcdTestFailed(err) {
|
||||||
glog.Infof("deletion of %s failed because of a conflict, going to retry", key)
|
|
||||||
} else {
|
|
||||||
if !etcdutil.IsEtcdNotFound(err) {
|
if !etcdutil.IsEtcdNotFound(err) {
|
||||||
// if the object that existed prior to the delete is returned by etcd, update the out object.
|
// if the object that existed prior to the delete is returned by etcd, update the out object.
|
||||||
if err != nil || response.PrevNode != nil {
|
if err != nil || response.PrevNode != nil {
|
||||||
_, _, err = h.extractObj(response, err, out, false, true)
|
_, _, _, err = h.extractObj(response, err, out, false, true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return toStorageErr(err, key, 0)
|
return toStorageErr(err, key, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
glog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -210,7 +240,7 @@ func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion stri
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
key = path.Join(h.pathPrefix, key)
|
key = path.Join(h.pathPrefix, key)
|
||||||
w := newEtcdWatcher(false, h.quorum, nil, storage.SimpleFilter(pred), h.codec, h.versioner, nil, h)
|
w := newEtcdWatcher(false, h.quorum, nil, storage.SimpleFilter(pred), h.codec, h.versioner, nil, h.transformer, h)
|
||||||
go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
|
go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
|
||||||
return w, nil
|
return w, nil
|
||||||
}
|
}
|
||||||
@ -225,7 +255,7 @@ func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
key = path.Join(h.pathPrefix, key)
|
key = path.Join(h.pathPrefix, key)
|
||||||
w := newEtcdWatcher(true, h.quorum, exceptKey(key), storage.SimpleFilter(pred), h.codec, h.versioner, nil, h)
|
w := newEtcdWatcher(true, h.quorum, exceptKey(key), storage.SimpleFilter(pred), h.codec, h.versioner, nil, h.transformer, h)
|
||||||
go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
|
go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
|
||||||
return w, nil
|
return w, nil
|
||||||
}
|
}
|
||||||
@ -236,13 +266,13 @@ func (h *etcdHelper) Get(ctx context.Context, key string, resourceVersion string
|
|||||||
glog.Errorf("Context is nil")
|
glog.Errorf("Context is nil")
|
||||||
}
|
}
|
||||||
key = path.Join(h.pathPrefix, key)
|
key = path.Join(h.pathPrefix, key)
|
||||||
_, _, _, err := h.bodyAndExtractObj(ctx, key, objPtr, ignoreNotFound)
|
_, _, _, _, err := h.bodyAndExtractObj(ctx, key, objPtr, ignoreNotFound)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// bodyAndExtractObj performs the normal Get path to etcd, returning the parsed node and response for additional information
|
// bodyAndExtractObj performs the normal Get path to etcd, returning the parsed node and response for additional information
|
||||||
// about the response, like the current etcd index and the ttl.
|
// about the response, like the current etcd index and the ttl.
|
||||||
func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, err error) {
|
func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, stale bool, err error) {
|
||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
glog.Errorf("Context is nil")
|
glog.Errorf("Context is nil")
|
||||||
}
|
}
|
||||||
@ -255,13 +285,13 @@ func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr r
|
|||||||
response, err := h.etcdKeysAPI.Get(ctx, key, opts)
|
response, err := h.etcdKeysAPI.Get(ctx, key, opts)
|
||||||
metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime)
|
metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime)
|
||||||
if err != nil && !etcdutil.IsEtcdNotFound(err) {
|
if err != nil && !etcdutil.IsEtcdNotFound(err) {
|
||||||
return "", nil, nil, toStorageErr(err, key, 0)
|
return "", nil, nil, false, toStorageErr(err, key, 0)
|
||||||
}
|
}
|
||||||
body, node, err = h.extractObj(response, err, objPtr, ignoreNotFound, false)
|
body, node, stale, err = h.extractObj(response, err, objPtr, ignoreNotFound, false)
|
||||||
return body, node, response, toStorageErr(err, key, 0)
|
return body, node, response, stale, toStorageErr(err, key, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, node *etcd.Node, err error) {
|
func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, node *etcd.Node, stale bool, err error) {
|
||||||
if response != nil {
|
if response != nil {
|
||||||
if prevNode {
|
if prevNode {
|
||||||
node = response.PrevNode
|
node = response.PrevNode
|
||||||
@ -273,26 +303,30 @@ func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run
|
|||||||
if ignoreNotFound {
|
if ignoreNotFound {
|
||||||
v, err := conversion.EnforcePtr(objPtr)
|
v, err := conversion.EnforcePtr(objPtr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", nil, err
|
return "", nil, false, err
|
||||||
}
|
}
|
||||||
v.Set(reflect.Zero(v.Type()))
|
v.Set(reflect.Zero(v.Type()))
|
||||||
return "", nil, nil
|
return "", nil, false, nil
|
||||||
} else if inErr != nil {
|
} else if inErr != nil {
|
||||||
return "", nil, inErr
|
return "", nil, false, inErr
|
||||||
}
|
}
|
||||||
return "", nil, fmt.Errorf("unable to locate a value on the response: %#v", response)
|
return "", nil, false, fmt.Errorf("unable to locate a value on the response: %#v", response)
|
||||||
|
}
|
||||||
|
|
||||||
|
body, stale, err = h.transformer.TransformStringFromStorage(node.Value)
|
||||||
|
if err != nil {
|
||||||
|
return body, nil, stale, storage.NewInternalError(err.Error())
|
||||||
}
|
}
|
||||||
body = node.Value
|
|
||||||
out, gvk, err := h.codec.Decode([]byte(body), nil, objPtr)
|
out, gvk, err := h.codec.Decode([]byte(body), nil, objPtr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return body, nil, err
|
return body, nil, stale, err
|
||||||
}
|
}
|
||||||
if out != objPtr {
|
if out != objPtr {
|
||||||
return body, nil, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr))
|
return body, nil, stale, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr))
|
||||||
}
|
}
|
||||||
// being unable to set the version does not prevent the object from being extracted
|
// being unable to set the version does not prevent the object from being extracted
|
||||||
_ = h.versioner.UpdateObject(objPtr, node.ModifiedIndex)
|
_ = h.versioner.UpdateObject(objPtr, node.ModifiedIndex)
|
||||||
return body, node, err
|
return body, node, stale, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements storage.Interface.
|
// Implements storage.Interface.
|
||||||
@ -359,7 +393,14 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun
|
|||||||
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
|
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
obj, _, err := h.codec.Decode([]byte(node.Value), nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object))
|
body, _, err := h.transformer.TransformStringFromStorage(node.Value)
|
||||||
|
if err != nil {
|
||||||
|
// omit items from lists and watches that cannot be transformed, but log the error
|
||||||
|
utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", node.Key, err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
obj, _, err := h.codec.Decode([]byte(body), nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -448,7 +489,7 @@ func (h *etcdHelper) GuaranteedUpdate(
|
|||||||
key = path.Join(h.pathPrefix, key)
|
key = path.Join(h.pathPrefix, key)
|
||||||
for {
|
for {
|
||||||
obj := reflect.New(v.Type()).Interface().(runtime.Object)
|
obj := reflect.New(v.Type()).Interface().(runtime.Object)
|
||||||
origBody, node, res, err := h.bodyAndExtractObj(ctx, key, obj, ignoreNotFound)
|
origBody, node, res, stale, err := h.bodyAndExtractObj(ctx, key, obj, ignoreNotFound)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return toStorageErr(err, key, 0)
|
return toStorageErr(err, key, 0)
|
||||||
}
|
}
|
||||||
@ -493,10 +534,15 @@ func (h *etcdHelper) GuaranteedUpdate(
|
|||||||
return errors.New("resourceVersion cannot be set on objects store in etcd")
|
return errors.New("resourceVersion cannot be set on objects store in etcd")
|
||||||
}
|
}
|
||||||
|
|
||||||
data, err := runtime.Encode(h.codec, ret)
|
newBodyData, err := runtime.Encode(h.codec, ret)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
newBody := string(newBodyData)
|
||||||
|
data, err := h.transformer.TransformStringToStorage(newBody)
|
||||||
|
if err != nil {
|
||||||
|
return storage.NewInternalError(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
// First time this key has been used, try creating new value.
|
// First time this key has been used, try creating new value.
|
||||||
if index == 0 {
|
if index == 0 {
|
||||||
@ -505,19 +551,20 @@ func (h *etcdHelper) GuaranteedUpdate(
|
|||||||
TTL: time.Duration(ttl) * time.Second,
|
TTL: time.Duration(ttl) * time.Second,
|
||||||
PrevExist: etcd.PrevNoExist,
|
PrevExist: etcd.PrevNoExist,
|
||||||
}
|
}
|
||||||
response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts)
|
response, err := h.etcdKeysAPI.Set(ctx, key, data, &opts)
|
||||||
metrics.RecordEtcdRequestLatency("create", getTypeName(ptrToType), startTime)
|
metrics.RecordEtcdRequestLatency("create", getTypeName(ptrToType), startTime)
|
||||||
if etcdutil.IsEtcdNodeExist(err) {
|
if etcdutil.IsEtcdNodeExist(err) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
_, _, err = h.extractObj(response, err, ptrToType, false, false)
|
_, _, _, err = h.extractObj(response, err, ptrToType, false, false)
|
||||||
return toStorageErr(err, key, 0)
|
return toStorageErr(err, key, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
if string(data) == origBody {
|
// If we don't send an update, we simply return the currently existing
|
||||||
// If we don't send an update, we simply return the currently existing
|
// version of the object. However, the value transformer may indicate that
|
||||||
// version of the object.
|
// the on disk representation has changed and that we must commit an update.
|
||||||
_, _, err := h.extractObj(res, nil, ptrToType, ignoreNotFound, false)
|
if newBody == origBody && !stale {
|
||||||
|
_, _, _, err := h.extractObj(res, nil, ptrToType, ignoreNotFound, false)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -527,13 +574,13 @@ func (h *etcdHelper) GuaranteedUpdate(
|
|||||||
PrevIndex: index,
|
PrevIndex: index,
|
||||||
TTL: time.Duration(ttl) * time.Second,
|
TTL: time.Duration(ttl) * time.Second,
|
||||||
}
|
}
|
||||||
response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts)
|
response, err := h.etcdKeysAPI.Set(ctx, key, data, &opts)
|
||||||
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime)
|
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime)
|
||||||
if etcdutil.IsEtcdTestFailed(err) {
|
if etcdutil.IsEtcdTestFailed(err) {
|
||||||
// Try again.
|
// Try again.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
_, _, err = h.extractObj(response, err, ptrToType, false, false)
|
_, _, _, err = h.extractObj(response, err, ptrToType, false, false)
|
||||||
return toStorageErr(err, key, int64(index))
|
return toStorageErr(err, key, int64(index))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -17,8 +17,10 @@ limitations under the License.
|
|||||||
package etcd
|
package etcd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"path"
|
"path"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -33,6 +35,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||||
|
"k8s.io/apimachinery/pkg/util/diff"
|
||||||
"k8s.io/apiserver/pkg/apis/example"
|
"k8s.io/apiserver/pkg/apis/example"
|
||||||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
@ -42,6 +45,34 @@ import (
|
|||||||
storagetests "k8s.io/apiserver/pkg/storage/tests"
|
storagetests "k8s.io/apiserver/pkg/storage/tests"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// prefixTransformer adds and verifies that all data has the correct prefix on its way in and out.
|
||||||
|
type prefixTransformer struct {
|
||||||
|
prefix string
|
||||||
|
stale bool
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p prefixTransformer) TransformStringFromStorage(s string) (string, bool, error) {
|
||||||
|
if !strings.HasPrefix(s, p.prefix) {
|
||||||
|
return "", false, fmt.Errorf("value does not have expected prefix: %s", s)
|
||||||
|
}
|
||||||
|
return strings.TrimPrefix(s, p.prefix), p.stale, p.err
|
||||||
|
}
|
||||||
|
func (p prefixTransformer) TransformStringToStorage(s string) (string, error) {
|
||||||
|
if len(s) > 0 {
|
||||||
|
return p.prefix + s, p.err
|
||||||
|
}
|
||||||
|
return s, p.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func defaultPrefix(s string) string {
|
||||||
|
return "test!" + s
|
||||||
|
}
|
||||||
|
|
||||||
|
func defaultPrefixValue(value []byte) string {
|
||||||
|
return defaultPrefix(string(value))
|
||||||
|
}
|
||||||
|
|
||||||
func testScheme(t *testing.T) (*runtime.Scheme, serializer.CodecFactory) {
|
func testScheme(t *testing.T) (*runtime.Scheme, serializer.CodecFactory) {
|
||||||
scheme := runtime.NewScheme()
|
scheme := runtime.NewScheme()
|
||||||
scheme.Log(t)
|
scheme.Log(t)
|
||||||
@ -66,7 +97,7 @@ func testScheme(t *testing.T) (*runtime.Scheme, serializer.CodecFactory) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newEtcdHelper(client etcd.Client, scheme *runtime.Scheme, codec runtime.Codec, prefix string) etcdHelper {
|
func newEtcdHelper(client etcd.Client, scheme *runtime.Scheme, codec runtime.Codec, prefix string) etcdHelper {
|
||||||
return *NewEtcdStorage(client, codec, prefix, false, etcdtest.DeserializationCacheSize, scheme).(*etcdHelper)
|
return *NewEtcdStorage(client, codec, prefix, false, etcdtest.DeserializationCacheSize, scheme, prefixTransformer{prefix: "test!"}).(*etcdHelper)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns an encoded version of example.Pod with the given name.
|
// Returns an encoded version of example.Pod with the given name.
|
||||||
@ -135,6 +166,61 @@ func TestList(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTransformationFailure(t *testing.T) {
|
||||||
|
scheme, codecs := testScheme(t)
|
||||||
|
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||||
|
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||||
|
defer server.Terminate(t)
|
||||||
|
helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
|
||||||
|
|
||||||
|
pods := []example.Pod{
|
||||||
|
{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "bar"},
|
||||||
|
Spec: storagetests.DeepEqualSafePodSpec(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "baz"},
|
||||||
|
Spec: storagetests.DeepEqualSafePodSpec(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
createPodList(t, helper, &example.PodList{Items: pods[:1]})
|
||||||
|
|
||||||
|
// create a second resource with an invalid prefix
|
||||||
|
oldTransformer := helper.transformer
|
||||||
|
helper.transformer = prefixTransformer{prefix: "otherprefix!"}
|
||||||
|
createPodList(t, helper, &example.PodList{Items: pods[1:]})
|
||||||
|
helper.transformer = oldTransformer
|
||||||
|
|
||||||
|
// only the first item is returned, and no error
|
||||||
|
var got example.PodList
|
||||||
|
if err := helper.List(context.TODO(), "/", "", storage.Everything, &got); err != nil {
|
||||||
|
t.Errorf("Unexpected error %v", err)
|
||||||
|
}
|
||||||
|
if e, a := pods[:1], got.Items; !reflect.DeepEqual(e, a) {
|
||||||
|
t.Errorf("Unexpected: %s", diff.ObjectReflectDiff(e, a))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get should fail
|
||||||
|
if err := helper.Get(context.TODO(), "/baz", "", &example.Pod{}, false); !storage.IsInternalError(err) {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
// GuaranteedUpdate should return an error
|
||||||
|
if err := helper.GuaranteedUpdate(context.TODO(), "/baz", &example.Pod{}, false, nil, func(input runtime.Object, res storage.ResponseMeta) (output runtime.Object, ttl *uint64, err error) {
|
||||||
|
return input, nil, nil
|
||||||
|
}, &pods[1]); !storage.IsInternalError(err) {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete succeeds but reports an error because we cannot access the body
|
||||||
|
if err := helper.Delete(context.TODO(), "/baz", &example.Pod{}, nil); !storage.IsInternalError(err) {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := helper.Get(context.TODO(), "/baz", "", &example.Pod{}, false); !storage.IsNotFound(err) {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestListFiltered(t *testing.T) {
|
func TestListFiltered(t *testing.T) {
|
||||||
scheme, codecs := testScheme(t)
|
scheme, codecs := testScheme(t)
|
||||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||||
@ -364,7 +450,8 @@ func TestGuaranteedUpdateNoChange(t *testing.T) {
|
|||||||
helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
|
helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
|
||||||
|
|
||||||
obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1}
|
obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1}
|
||||||
err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
|
original := &storagetesting.TestResource{}
|
||||||
|
err := helper.GuaranteedUpdate(context.TODO(), key, original, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
|
||||||
return obj, nil
|
return obj, nil
|
||||||
}))
|
}))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -373,7 +460,27 @@ func TestGuaranteedUpdateNoChange(t *testing.T) {
|
|||||||
|
|
||||||
// Update an existing node with the same data
|
// Update an existing node with the same data
|
||||||
callbackCalled := false
|
callbackCalled := false
|
||||||
objUpdate := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1}
|
objUpdate := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: original.ResourceVersion}, Value: 1}
|
||||||
|
result := &storagetesting.TestResource{}
|
||||||
|
err = helper.GuaranteedUpdate(context.TODO(), key, result, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
|
||||||
|
callbackCalled = true
|
||||||
|
return objUpdate, nil
|
||||||
|
}))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected error %#v", err)
|
||||||
|
}
|
||||||
|
if !callbackCalled {
|
||||||
|
t.Errorf("tryUpdate callback should have been called.")
|
||||||
|
}
|
||||||
|
if result.ResourceVersion != original.ResourceVersion {
|
||||||
|
t.Fatalf("updated the object resource version")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update an existing node with the same data but return stale
|
||||||
|
helper.transformer = prefixTransformer{prefix: "test!", stale: true}
|
||||||
|
callbackCalled = false
|
||||||
|
result = &storagetesting.TestResource{}
|
||||||
|
objUpdate = &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1}
|
||||||
err = helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
|
err = helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
|
||||||
callbackCalled = true
|
callbackCalled = true
|
||||||
return objUpdate, nil
|
return objUpdate, nil
|
||||||
@ -384,6 +491,9 @@ func TestGuaranteedUpdateNoChange(t *testing.T) {
|
|||||||
if !callbackCalled {
|
if !callbackCalled {
|
||||||
t.Errorf("tryUpdate callback should have been called.")
|
t.Errorf("tryUpdate callback should have been called.")
|
||||||
}
|
}
|
||||||
|
if result.ResourceVersion == original.ResourceVersion {
|
||||||
|
t.Errorf("did not update the object resource version")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGuaranteedUpdateKeyNotFound(t *testing.T) {
|
func TestGuaranteedUpdateKeyNotFound(t *testing.T) {
|
||||||
@ -540,7 +650,7 @@ func TestDeleteWithRetry(t *testing.T) {
|
|||||||
// party has updated the object.
|
// party has updated the object.
|
||||||
fakeGet := func(ctx context.Context, key string, opts *etcd.GetOptions) (*etcd.Response, error) {
|
fakeGet := func(ctx context.Context, key string, opts *etcd.GetOptions) (*etcd.Response, error) {
|
||||||
data, _ := runtime.Encode(codec, obj)
|
data, _ := runtime.Encode(codec, obj)
|
||||||
return &etcd.Response{Node: &etcd.Node{Value: string(data), ModifiedIndex: 99}}, nil
|
return &etcd.Response{Node: &etcd.Node{Value: defaultPrefixValue(data), ModifiedIndex: 99}}, nil
|
||||||
}
|
}
|
||||||
expectedRetries := 3
|
expectedRetries := 3
|
||||||
helper := newEtcdHelper(server.Client, scheme, codec, prefix)
|
helper := newEtcdHelper(server.Client, scheme, codec, prefix)
|
||||||
|
@ -71,8 +71,9 @@ type etcdWatcher struct {
|
|||||||
// Note that versioner is required for etcdWatcher to work correctly.
|
// Note that versioner is required for etcdWatcher to work correctly.
|
||||||
// There is no public constructor of it, so be careful when manipulating
|
// There is no public constructor of it, so be careful when manipulating
|
||||||
// with it manually.
|
// with it manually.
|
||||||
versioner storage.Versioner
|
versioner storage.Versioner
|
||||||
transform TransformFunc
|
transform TransformFunc
|
||||||
|
valueTransformer ValueTransformer
|
||||||
|
|
||||||
list bool // If we're doing a recursive watch, should be true.
|
list bool // If we're doing a recursive watch, should be true.
|
||||||
quorum bool // If we enable quorum, shoule be true
|
quorum bool // If we enable quorum, shoule be true
|
||||||
@ -107,15 +108,18 @@ const watchWaitDuration = 100 * time.Millisecond
|
|||||||
func newEtcdWatcher(
|
func newEtcdWatcher(
|
||||||
list bool, quorum bool, include includeFunc, filter storage.FilterFunc,
|
list bool, quorum bool, include includeFunc, filter storage.FilterFunc,
|
||||||
encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc,
|
encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc,
|
||||||
|
valueTransformer ValueTransformer,
|
||||||
cache etcdCache) *etcdWatcher {
|
cache etcdCache) *etcdWatcher {
|
||||||
w := &etcdWatcher{
|
w := &etcdWatcher{
|
||||||
encoding: encoding,
|
encoding: encoding,
|
||||||
versioner: versioner,
|
versioner: versioner,
|
||||||
transform: transform,
|
transform: transform,
|
||||||
list: list,
|
valueTransformer: valueTransformer,
|
||||||
quorum: quorum,
|
|
||||||
include: include,
|
list: list,
|
||||||
filter: filter,
|
quorum: quorum,
|
||||||
|
include: include,
|
||||||
|
filter: filter,
|
||||||
// Buffer this channel, so that the etcd client is not forced
|
// Buffer this channel, so that the etcd client is not forced
|
||||||
// to context switch with every object it gets, and so that a
|
// to context switch with every object it gets, and so that a
|
||||||
// long time spent decoding an object won't block the *next*
|
// long time spent decoding an object won't block the *next*
|
||||||
@ -309,12 +313,18 @@ func (w *etcdWatcher) translate() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// decodeObject extracts an object from the provided etcd node or returns an error.
|
||||||
func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
|
func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
|
||||||
if obj, found := w.cache.getFromCache(node.ModifiedIndex, storage.SimpleFilter(storage.Everything)); found {
|
if obj, found := w.cache.getFromCache(node.ModifiedIndex, storage.SimpleFilter(storage.Everything)); found {
|
||||||
return obj, nil
|
return obj, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
obj, err := runtime.Decode(w.encoding, []byte(node.Value))
|
body, _, err := w.valueTransformer.TransformStringFromStorage(node.Value)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
obj, err := runtime.Decode(w.encoding, []byte(body))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -133,7 +133,7 @@ func TestWatchInterpretations(t *testing.T) {
|
|||||||
}
|
}
|
||||||
for name, item := range table {
|
for name, item := range table {
|
||||||
for _, action := range item.actions {
|
for _, action := range item.actions {
|
||||||
w := newEtcdWatcher(true, false, nil, firstLetterIsB, codec, versioner, nil, &fakeEtcdCache{})
|
w := newEtcdWatcher(true, false, nil, firstLetterIsB, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
|
||||||
emitCalled := false
|
emitCalled := false
|
||||||
w.emit = func(event watch.Event) {
|
w.emit = func(event watch.Event) {
|
||||||
emitCalled = true
|
emitCalled = true
|
||||||
@ -150,10 +150,10 @@ func TestWatchInterpretations(t *testing.T) {
|
|||||||
|
|
||||||
var n, pn *etcd.Node
|
var n, pn *etcd.Node
|
||||||
if item.nodeValue != "" {
|
if item.nodeValue != "" {
|
||||||
n = &etcd.Node{Value: item.nodeValue}
|
n = &etcd.Node{Value: defaultPrefix(item.nodeValue)}
|
||||||
}
|
}
|
||||||
if item.prevNodeValue != "" {
|
if item.prevNodeValue != "" {
|
||||||
pn = &etcd.Node{Value: item.prevNodeValue}
|
pn = &etcd.Node{Value: defaultPrefix(item.prevNodeValue)}
|
||||||
}
|
}
|
||||||
|
|
||||||
w.sendResult(&etcd.Response{
|
w.sendResult(&etcd.Response{
|
||||||
@ -173,7 +173,7 @@ func TestWatchInterpretations(t *testing.T) {
|
|||||||
func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
|
func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
|
||||||
_, codecs := testScheme(t)
|
_, codecs := testScheme(t)
|
||||||
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
|
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
|
||||||
w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, &fakeEtcdCache{})
|
w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
|
||||||
w.emit = func(e watch.Event) {
|
w.emit = func(e watch.Event) {
|
||||||
t.Errorf("Unexpected emit: %v", e)
|
t.Errorf("Unexpected emit: %v", e)
|
||||||
}
|
}
|
||||||
@ -189,7 +189,7 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
|
|||||||
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
|
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
|
||||||
actions := []string{"create", "set", "compareAndSwap", "delete"}
|
actions := []string{"create", "set", "compareAndSwap", "delete"}
|
||||||
for _, action := range actions {
|
for _, action := range actions {
|
||||||
w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, &fakeEtcdCache{})
|
w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
|
||||||
w.emit = func(e watch.Event) {
|
w.emit = func(e watch.Event) {
|
||||||
t.Errorf("Unexpected emit: %v", e)
|
t.Errorf("Unexpected emit: %v", e)
|
||||||
}
|
}
|
||||||
@ -205,20 +205,20 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
|
|||||||
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
|
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
|
||||||
actions := []string{"create", "set", "compareAndSwap", "delete"}
|
actions := []string{"create", "set", "compareAndSwap", "delete"}
|
||||||
for _, action := range actions {
|
for _, action := range actions {
|
||||||
w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, &fakeEtcdCache{})
|
w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
|
||||||
w.emit = func(e watch.Event) {
|
w.emit = func(e watch.Event) {
|
||||||
t.Errorf("Unexpected emit: %v", e)
|
t.Errorf("Unexpected emit: %v", e)
|
||||||
}
|
}
|
||||||
w.sendResult(&etcd.Response{
|
w.sendResult(&etcd.Response{
|
||||||
Action: action,
|
Action: action,
|
||||||
Node: &etcd.Node{
|
Node: &etcd.Node{
|
||||||
Value: "foobar",
|
Value: defaultPrefix("foobar"),
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
w.sendResult(&etcd.Response{
|
w.sendResult(&etcd.Response{
|
||||||
Action: action,
|
Action: action,
|
||||||
PrevNode: &etcd.Node{
|
PrevNode: &etcd.Node{
|
||||||
Value: "foobar",
|
Value: defaultPrefix("foobar"),
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
w.Stop()
|
w.Stop()
|
||||||
@ -231,7 +231,7 @@ func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) {
|
|||||||
filter := func(obj runtime.Object) bool {
|
filter := func(obj runtime.Object) bool {
|
||||||
return obj.(*example.Pod).Name != "bar"
|
return obj.(*example.Pod).Name != "bar"
|
||||||
}
|
}
|
||||||
w := newEtcdWatcher(false, false, nil, filter, codec, versioner, nil, &fakeEtcdCache{})
|
w := newEtcdWatcher(false, false, nil, filter, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
|
||||||
|
|
||||||
eventChan := make(chan watch.Event, 1)
|
eventChan := make(chan watch.Event, 1)
|
||||||
w.emit = func(e watch.Event) {
|
w.emit = func(e watch.Event) {
|
||||||
@ -259,7 +259,7 @@ func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) {
|
|||||||
ModifiedIndex: 2,
|
ModifiedIndex: 2,
|
||||||
},
|
},
|
||||||
PrevNode: &etcd.Node{
|
PrevNode: &etcd.Node{
|
||||||
Value: string(fooBytes),
|
Value: defaultPrefixValue(fooBytes),
|
||||||
ModifiedIndex: 1,
|
ModifiedIndex: 1,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -268,11 +268,11 @@ func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) {
|
|||||||
response: &etcd.Response{
|
response: &etcd.Response{
|
||||||
Action: EtcdSet,
|
Action: EtcdSet,
|
||||||
Node: &etcd.Node{
|
Node: &etcd.Node{
|
||||||
Value: string(barBytes),
|
Value: defaultPrefixValue(barBytes),
|
||||||
ModifiedIndex: 2,
|
ModifiedIndex: 2,
|
||||||
},
|
},
|
||||||
PrevNode: &etcd.Node{
|
PrevNode: &etcd.Node{
|
||||||
Value: string(fooBytes),
|
Value: defaultPrefixValue(fooBytes),
|
||||||
ModifiedIndex: 1,
|
ModifiedIndex: 1,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -33,21 +33,42 @@ import (
|
|||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/conversion"
|
"k8s.io/apimachinery/pkg/conversion"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
utiltrace "k8s.io/apiserver/pkg/util/trace"
|
|
||||||
"k8s.io/apiserver/pkg/storage/etcd"
|
"k8s.io/apiserver/pkg/storage/etcd"
|
||||||
|
utiltrace "k8s.io/apiserver/pkg/util/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ValueTransformer allows a string value to be transformed before being read from or written to the underlying store. The methods
|
||||||
|
// must be able to undo the transformation caused by the other.
|
||||||
|
type ValueTransformer interface {
|
||||||
|
// TransformFromStorage may transform the provided data from its underlying storage representation or return an error.
|
||||||
|
// Stale is true if the object on disk is stale and a write to etcd should be issued, even if the contents of the object
|
||||||
|
// have not changed.
|
||||||
|
TransformFromStorage([]byte) (data []byte, stale bool, err error)
|
||||||
|
// TransformToStorage may transform the provided data into the appropriate form in storage or return an error.
|
||||||
|
TransformToStorage([]byte) (data []byte, err error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type identityTransformer struct{}
|
||||||
|
|
||||||
|
func (identityTransformer) TransformFromStorage(b []byte) ([]byte, bool, error) { return b, false, nil }
|
||||||
|
func (identityTransformer) TransformToStorage(b []byte) ([]byte, error) { return b, nil }
|
||||||
|
|
||||||
|
// IdentityTransformer performs no transformation on the provided values.
|
||||||
|
var IdentityTransformer ValueTransformer = identityTransformer{}
|
||||||
|
|
||||||
type store struct {
|
type store struct {
|
||||||
client *clientv3.Client
|
client *clientv3.Client
|
||||||
// getOpts contains additional options that should be passed
|
// getOpts contains additional options that should be passed
|
||||||
// to all Get() calls.
|
// to all Get() calls.
|
||||||
getOps []clientv3.OpOption
|
getOps []clientv3.OpOption
|
||||||
codec runtime.Codec
|
codec runtime.Codec
|
||||||
versioner storage.Versioner
|
versioner storage.Versioner
|
||||||
pathPrefix string
|
transformer ValueTransformer
|
||||||
watcher *watcher
|
pathPrefix string
|
||||||
|
watcher *watcher
|
||||||
}
|
}
|
||||||
|
|
||||||
type elemForDecode struct {
|
type elemForDecode struct {
|
||||||
@ -56,31 +77,33 @@ type elemForDecode struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type objState struct {
|
type objState struct {
|
||||||
obj runtime.Object
|
obj runtime.Object
|
||||||
meta *storage.ResponseMeta
|
meta *storage.ResponseMeta
|
||||||
rev int64
|
rev int64
|
||||||
data []byte
|
data []byte
|
||||||
|
stale bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns an etcd3 implementation of storage.Interface.
|
// New returns an etcd3 implementation of storage.Interface.
|
||||||
func New(c *clientv3.Client, codec runtime.Codec, prefix string) storage.Interface {
|
func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer ValueTransformer) storage.Interface {
|
||||||
return newStore(c, true, codec, prefix)
|
return newStore(c, true, codec, prefix, transformer)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewWithNoQuorumRead returns etcd3 implementation of storage.Interface
|
// NewWithNoQuorumRead returns etcd3 implementation of storage.Interface
|
||||||
// where Get operations don't require quorum read.
|
// where Get operations don't require quorum read.
|
||||||
func NewWithNoQuorumRead(c *clientv3.Client, codec runtime.Codec, prefix string) storage.Interface {
|
func NewWithNoQuorumRead(c *clientv3.Client, codec runtime.Codec, prefix string, transformer ValueTransformer) storage.Interface {
|
||||||
return newStore(c, false, codec, prefix)
|
return newStore(c, false, codec, prefix, transformer)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newStore(c *clientv3.Client, quorumRead bool, codec runtime.Codec, prefix string) *store {
|
func newStore(c *clientv3.Client, quorumRead bool, codec runtime.Codec, prefix string, transformer ValueTransformer) *store {
|
||||||
versioner := etcd.APIObjectVersioner{}
|
versioner := etcd.APIObjectVersioner{}
|
||||||
result := &store{
|
result := &store{
|
||||||
client: c,
|
client: c,
|
||||||
versioner: versioner,
|
codec: codec,
|
||||||
codec: codec,
|
versioner: versioner,
|
||||||
pathPrefix: prefix,
|
transformer: transformer,
|
||||||
watcher: newWatcher(c, codec, versioner),
|
pathPrefix: prefix,
|
||||||
|
watcher: newWatcher(c, codec, versioner, transformer),
|
||||||
}
|
}
|
||||||
if !quorumRead {
|
if !quorumRead {
|
||||||
// In case of non-quorum reads, we can set WithSerializable()
|
// In case of non-quorum reads, we can set WithSerializable()
|
||||||
@ -110,7 +133,13 @@ func (s *store) Get(ctx context.Context, key string, resourceVersion string, out
|
|||||||
return storage.NewKeyNotFoundError(key, 0)
|
return storage.NewKeyNotFoundError(key, 0)
|
||||||
}
|
}
|
||||||
kv := getResp.Kvs[0]
|
kv := getResp.Kvs[0]
|
||||||
return decode(s.codec, s.versioner, kv.Value, out, kv.ModRevision)
|
|
||||||
|
data, _, err := s.transformer.TransformFromStorage(kv.Value)
|
||||||
|
if err != nil {
|
||||||
|
return storage.NewInternalError(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
return decode(s.codec, s.versioner, data, out, kv.ModRevision)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create implements storage.Interface.Create.
|
// Create implements storage.Interface.Create.
|
||||||
@ -129,10 +158,15 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
newData, err := s.transformer.TransformToStorage(data)
|
||||||
|
if err != nil {
|
||||||
|
return storage.NewInternalError(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
txnResp, err := s.client.KV.Txn(ctx).If(
|
txnResp, err := s.client.KV.Txn(ctx).If(
|
||||||
notFound(key),
|
notFound(key),
|
||||||
).Then(
|
).Then(
|
||||||
clientv3.OpPut(key, string(data), opts...),
|
clientv3.OpPut(key, string(newData), opts...),
|
||||||
).Commit()
|
).Commit()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -177,7 +211,11 @@ func (s *store) unconditionalDelete(ctx context.Context, key string, out runtime
|
|||||||
}
|
}
|
||||||
|
|
||||||
kv := getResp.Kvs[0]
|
kv := getResp.Kvs[0]
|
||||||
return decode(s.codec, s.versioner, kv.Value, out, kv.ModRevision)
|
data, _, err := s.transformer.TransformFromStorage(kv.Value)
|
||||||
|
if err != nil {
|
||||||
|
return storage.NewInternalError(err.Error())
|
||||||
|
}
|
||||||
|
return decode(s.codec, s.versioner, data, out, kv.ModRevision)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, precondtions *storage.Preconditions) error {
|
func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, precondtions *storage.Preconditions) error {
|
||||||
@ -257,10 +295,15 @@ func (s *store) GuaranteedUpdate(
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if bytes.Equal(data, origState.data) {
|
if !origState.stale && bytes.Equal(data, origState.data) {
|
||||||
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
|
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
newData, err := s.transformer.TransformToStorage(data)
|
||||||
|
if err != nil {
|
||||||
|
return storage.NewInternalError(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
opts, err := s.ttlOpts(ctx, int64(ttl))
|
opts, err := s.ttlOpts(ctx, int64(ttl))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -270,7 +313,7 @@ func (s *store) GuaranteedUpdate(
|
|||||||
txnResp, err := s.client.KV.Txn(ctx).If(
|
txnResp, err := s.client.KV.Txn(ctx).If(
|
||||||
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
|
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
|
||||||
).Then(
|
).Then(
|
||||||
clientv3.OpPut(key, string(data), opts...),
|
clientv3.OpPut(key, string(newData), opts...),
|
||||||
).Else(
|
).Else(
|
||||||
clientv3.OpGet(key),
|
clientv3.OpGet(key),
|
||||||
).Commit()
|
).Commit()
|
||||||
@ -289,6 +332,7 @@ func (s *store) GuaranteedUpdate(
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
putResp := txnResp.Responses[0].GetResponsePut()
|
putResp := txnResp.Responses[0].GetResponsePut()
|
||||||
|
|
||||||
return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
|
return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -308,8 +352,12 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin
|
|||||||
if len(getResp.Kvs) == 0 {
|
if len(getResp.Kvs) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
data, _, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value)
|
||||||
|
if err != nil {
|
||||||
|
return storage.NewInternalError(err.Error())
|
||||||
|
}
|
||||||
elems := []*elemForDecode{{
|
elems := []*elemForDecode{{
|
||||||
data: getResp.Kvs[0].Value,
|
data: data,
|
||||||
rev: uint64(getResp.Kvs[0].ModRevision),
|
rev: uint64(getResp.Kvs[0].ModRevision),
|
||||||
}}
|
}}
|
||||||
if err := decodeList(elems, storage.SimpleFilter(pred), listPtr, s.codec, s.versioner); err != nil {
|
if err := decodeList(elems, storage.SimpleFilter(pred), listPtr, s.codec, s.versioner); err != nil {
|
||||||
@ -337,12 +385,18 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
elems := make([]*elemForDecode, len(getResp.Kvs))
|
elems := make([]*elemForDecode, 0, len(getResp.Kvs))
|
||||||
for i, kv := range getResp.Kvs {
|
for _, kv := range getResp.Kvs {
|
||||||
elems[i] = &elemForDecode{
|
data, _, err := s.transformer.TransformFromStorage(kv.Value)
|
||||||
data: kv.Value,
|
if err != nil {
|
||||||
rev: uint64(kv.ModRevision),
|
utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", key, err))
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
elems = append(elems, &elemForDecode{
|
||||||
|
data: data,
|
||||||
|
rev: uint64(kv.ModRevision),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
if err := decodeList(elems, storage.SimpleFilter(pred), listPtr, s.codec, s.versioner); err != nil {
|
if err := decodeList(elems, storage.SimpleFilter(pred), listPtr, s.codec, s.versioner); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -383,9 +437,14 @@ func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Va
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
data, stale, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value)
|
||||||
|
if err != nil {
|
||||||
|
return nil, storage.NewInternalError(err.Error())
|
||||||
|
}
|
||||||
state.rev = getResp.Kvs[0].ModRevision
|
state.rev = getResp.Kvs[0].ModRevision
|
||||||
state.meta.ResourceVersion = uint64(state.rev)
|
state.meta.ResourceVersion = uint64(state.rev)
|
||||||
state.data = getResp.Kvs[0].Value
|
state.data = data
|
||||||
|
state.stale = stale
|
||||||
if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil {
|
if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package etcd3
|
package etcd3
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
@ -28,10 +29,12 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||||
|
"k8s.io/apimachinery/pkg/util/diff"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/apiserver/pkg/apis/example"
|
"k8s.io/apiserver/pkg/apis/example"
|
||||||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
|
storagetests "k8s.io/apiserver/pkg/storage/tests"
|
||||||
|
|
||||||
"github.com/coreos/etcd/integration"
|
"github.com/coreos/etcd/integration"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
@ -46,6 +49,26 @@ func init() {
|
|||||||
examplev1.AddToScheme(scheme)
|
examplev1.AddToScheme(scheme)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// prefixTransformer adds and verifies that all data has the correct prefix on its way in and out.
|
||||||
|
type prefixTransformer struct {
|
||||||
|
prefix []byte
|
||||||
|
stale bool
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p prefixTransformer) TransformFromStorage(b []byte) ([]byte, bool, error) {
|
||||||
|
if !bytes.HasPrefix(b, p.prefix) {
|
||||||
|
return nil, false, fmt.Errorf("value does not have expected prefix: %s", string(b))
|
||||||
|
}
|
||||||
|
return bytes.TrimPrefix(b, p.prefix), p.stale, p.err
|
||||||
|
}
|
||||||
|
func (p prefixTransformer) TransformToStorage(b []byte) ([]byte, error) {
|
||||||
|
if len(b) > 0 {
|
||||||
|
return append(append([]byte{}, p.prefix...), b...), p.err
|
||||||
|
}
|
||||||
|
return b, p.err
|
||||||
|
}
|
||||||
|
|
||||||
func TestCreate(t *testing.T) {
|
func TestCreate(t *testing.T) {
|
||||||
ctx, store, cluster := testSetup(t)
|
ctx, store, cluster := testSetup(t)
|
||||||
defer cluster.Terminate(t)
|
defer cluster.Terminate(t)
|
||||||
@ -294,6 +317,7 @@ func TestGuaranteedUpdate(t *testing.T) {
|
|||||||
expectNotFoundErr bool
|
expectNotFoundErr bool
|
||||||
expectInvalidObjErr bool
|
expectInvalidObjErr bool
|
||||||
expectNoUpdate bool
|
expectNoUpdate bool
|
||||||
|
transformStale bool
|
||||||
}{{ // GuaranteedUpdate on non-existing key with ignoreNotFound=false
|
}{{ // GuaranteedUpdate on non-existing key with ignoreNotFound=false
|
||||||
key: "/non-existing",
|
key: "/non-existing",
|
||||||
ignoreNotFound: false,
|
ignoreNotFound: false,
|
||||||
@ -322,6 +346,14 @@ func TestGuaranteedUpdate(t *testing.T) {
|
|||||||
expectNotFoundErr: false,
|
expectNotFoundErr: false,
|
||||||
expectInvalidObjErr: false,
|
expectInvalidObjErr: false,
|
||||||
expectNoUpdate: true,
|
expectNoUpdate: true,
|
||||||
|
}, { // GuaranteedUpdate with same data but stale
|
||||||
|
key: key,
|
||||||
|
ignoreNotFound: false,
|
||||||
|
precondition: nil,
|
||||||
|
expectNotFoundErr: false,
|
||||||
|
expectInvalidObjErr: false,
|
||||||
|
expectNoUpdate: false,
|
||||||
|
transformStale: true,
|
||||||
}, { // GuaranteedUpdate with UID match
|
}, { // GuaranteedUpdate with UID match
|
||||||
key: key,
|
key: key,
|
||||||
ignoreNotFound: false,
|
ignoreNotFound: false,
|
||||||
@ -344,6 +376,12 @@ func TestGuaranteedUpdate(t *testing.T) {
|
|||||||
if tt.expectNoUpdate {
|
if tt.expectNoUpdate {
|
||||||
name = storeObj.Name
|
name = storeObj.Name
|
||||||
}
|
}
|
||||||
|
originalTransformer := store.transformer.(prefixTransformer)
|
||||||
|
if tt.transformStale {
|
||||||
|
transformer := originalTransformer
|
||||||
|
transformer.stale = true
|
||||||
|
store.transformer = transformer
|
||||||
|
}
|
||||||
version := storeObj.ResourceVersion
|
version := storeObj.ResourceVersion
|
||||||
err := store.GuaranteedUpdate(ctx, tt.key, out, tt.ignoreNotFound, tt.precondition,
|
err := store.GuaranteedUpdate(ctx, tt.key, out, tt.ignoreNotFound, tt.precondition,
|
||||||
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
|
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
|
||||||
@ -356,6 +394,7 @@ func TestGuaranteedUpdate(t *testing.T) {
|
|||||||
pod.Name = name
|
pod.Name = name
|
||||||
return &pod, nil
|
return &pod, nil
|
||||||
}))
|
}))
|
||||||
|
store.transformer = originalTransformer
|
||||||
|
|
||||||
if tt.expectNotFoundErr {
|
if tt.expectNotFoundErr {
|
||||||
if err == nil || !storage.IsNotFound(err) {
|
if err == nil || !storage.IsNotFound(err) {
|
||||||
@ -460,11 +499,91 @@ func TestGuaranteedUpdateWithConflict(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTransformationFailure(t *testing.T) {
|
||||||
|
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||||
|
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||||
|
defer cluster.Terminate(t)
|
||||||
|
store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")})
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
preset := []struct {
|
||||||
|
key string
|
||||||
|
obj *example.Pod
|
||||||
|
storedObj *example.Pod
|
||||||
|
}{{
|
||||||
|
key: "/one-level/test",
|
||||||
|
obj: &example.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "bar"},
|
||||||
|
Spec: storagetests.DeepEqualSafePodSpec(),
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
key: "/two-level/1/test",
|
||||||
|
obj: &example.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "baz"},
|
||||||
|
Spec: storagetests.DeepEqualSafePodSpec(),
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
for i, ps := range preset[:1] {
|
||||||
|
preset[i].storedObj = &example.Pod{}
|
||||||
|
err := store.Create(ctx, ps.key, ps.obj, preset[:1][i].storedObj, 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Set failed: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// create a second resource with an invalid prefix
|
||||||
|
oldTransformer := store.transformer
|
||||||
|
store.transformer = prefixTransformer{prefix: []byte("otherprefix!")}
|
||||||
|
for i, ps := range preset[1:] {
|
||||||
|
preset[1:][i].storedObj = &example.Pod{}
|
||||||
|
err := store.Create(ctx, ps.key, ps.obj, preset[1:][i].storedObj, 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Set failed: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
store.transformer = oldTransformer
|
||||||
|
|
||||||
|
// only the first item is returned, and no error
|
||||||
|
var got example.PodList
|
||||||
|
if err := store.List(ctx, "/", "", storage.Everything, &got); err != nil {
|
||||||
|
t.Errorf("Unexpected error %v", err)
|
||||||
|
}
|
||||||
|
if e, a := []example.Pod{*preset[0].storedObj}, got.Items; !reflect.DeepEqual(e, a) {
|
||||||
|
t.Errorf("Unexpected: %s", diff.ObjectReflectDiff(e, a))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get should fail
|
||||||
|
if err := store.Get(ctx, preset[1].key, "", &example.Pod{}, false); !storage.IsInternalError(err) {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
// GuaranteedUpdate without suggestion should return an error
|
||||||
|
if err := store.GuaranteedUpdate(ctx, preset[1].key, &example.Pod{}, false, nil, func(input runtime.Object, res storage.ResponseMeta) (output runtime.Object, ttl *uint64, err error) {
|
||||||
|
return input, nil, nil
|
||||||
|
}); !storage.IsInternalError(err) {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
// GuaranteedUpdate with suggestion should not return an error if we don't change the object
|
||||||
|
if err := store.GuaranteedUpdate(ctx, preset[1].key, &example.Pod{}, false, nil, func(input runtime.Object, res storage.ResponseMeta) (output runtime.Object, ttl *uint64, err error) {
|
||||||
|
return input, nil, nil
|
||||||
|
}, preset[1].obj); err != nil {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete succeeds but reports an error because we cannot access the body
|
||||||
|
if err := store.Delete(ctx, preset[1].key, &example.Pod{}, nil); !storage.IsInternalError(err) {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := store.Get(ctx, preset[1].key, "", &example.Pod{}, false); !storage.IsNotFound(err) {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestList(t *testing.T) {
|
func TestList(t *testing.T) {
|
||||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||||
defer cluster.Terminate(t)
|
defer cluster.Terminate(t)
|
||||||
store := newStore(cluster.RandClient(), false, codec, "")
|
store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")})
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
// Setup storage with the following structure:
|
// Setup storage with the following structure:
|
||||||
@ -552,7 +671,7 @@ func TestList(t *testing.T) {
|
|||||||
func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
|
func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
|
||||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||||
store := newStore(cluster.RandClient(), false, codec, "")
|
store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")})
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
return ctx, store, cluster
|
return ctx, store, cluster
|
||||||
}
|
}
|
||||||
|
@ -40,9 +40,10 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type watcher struct {
|
type watcher struct {
|
||||||
client *clientv3.Client
|
client *clientv3.Client
|
||||||
codec runtime.Codec
|
codec runtime.Codec
|
||||||
versioner storage.Versioner
|
versioner storage.Versioner
|
||||||
|
transformer ValueTransformer
|
||||||
}
|
}
|
||||||
|
|
||||||
// watchChan implements watch.Interface.
|
// watchChan implements watch.Interface.
|
||||||
@ -59,11 +60,12 @@ type watchChan struct {
|
|||||||
errChan chan error
|
errChan chan error
|
||||||
}
|
}
|
||||||
|
|
||||||
func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.Versioner) *watcher {
|
func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.Versioner, transformer ValueTransformer) *watcher {
|
||||||
return &watcher{
|
return &watcher{
|
||||||
client: client,
|
client: client,
|
||||||
codec: codec,
|
codec: codec,
|
||||||
versioner: versioner,
|
versioner: versioner,
|
||||||
|
transformer: transformer,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -341,7 +343,11 @@ func (wc *watchChan) sendEvent(e *event) {
|
|||||||
|
|
||||||
func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtime.Object, err error) {
|
func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtime.Object, err error) {
|
||||||
if !e.isDeleted {
|
if !e.isDeleted {
|
||||||
curObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, e.value, e.rev)
|
data, _, err := wc.watcher.transformer.TransformFromStorage(e.value)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
curObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, data, e.rev)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
@ -352,9 +358,13 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim
|
|||||||
// we need the object only to compute whether it was filtered out
|
// we need the object only to compute whether it was filtered out
|
||||||
// before).
|
// before).
|
||||||
if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) {
|
if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) {
|
||||||
|
data, _, err := wc.watcher.transformer.TransformFromStorage(e.prevValue)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
// Note that this sends the *old* object with the etcd revision for the time at
|
// Note that this sends the *old* object with the etcd revision for the time at
|
||||||
// which it gets deleted.
|
// which it gets deleted.
|
||||||
oldObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, e.prevValue, e.rev)
|
oldObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, data, e.rev)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
@ -227,13 +227,13 @@ func TestWatchError(t *testing.T) {
|
|||||||
codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)}
|
codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)}
|
||||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||||
defer cluster.Terminate(t)
|
defer cluster.Terminate(t)
|
||||||
invalidStore := newStore(cluster.RandClient(), false, codec, "")
|
invalidStore := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")})
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
w, err := invalidStore.Watch(ctx, "/abc", "0", storage.Everything)
|
w, err := invalidStore.Watch(ctx, "/abc", "0", storage.Everything)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Watch failed: %v", err)
|
t.Fatalf("Watch failed: %v", err)
|
||||||
}
|
}
|
||||||
validStore := newStore(cluster.RandClient(), false, codec, "")
|
validStore := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")})
|
||||||
validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate(
|
validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate(
|
||||||
func(runtime.Object) (runtime.Object, error) {
|
func(runtime.Object) (runtime.Object, error) {
|
||||||
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil
|
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil
|
||||||
|
@ -39,7 +39,7 @@ func newETCD2Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
s := etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize, c.Copier)
|
s := etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize, c.Copier, etcd.IdentityTransformer)
|
||||||
return s, tr.CloseIdleConnections, nil
|
return s, tr.CloseIdleConnections, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -56,7 +56,7 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
|
|||||||
client.Close()
|
client.Close()
|
||||||
}
|
}
|
||||||
if c.Quorum {
|
if c.Quorum {
|
||||||
return etcd3.New(client, c.Codec, c.Prefix), destroyFunc, nil
|
return etcd3.New(client, c.Codec, c.Prefix, etcd3.IdentityTransformer), destroyFunc, nil
|
||||||
}
|
}
|
||||||
return etcd3.NewWithNoQuorumRead(client, c.Codec, c.Prefix), destroyFunc, nil
|
return etcd3.NewWithNoQuorumRead(client, c.Codec, c.Prefix, etcd3.IdentityTransformer), destroyFunc, nil
|
||||||
}
|
}
|
||||||
|
@ -93,7 +93,7 @@ func AddObjectMetaFieldsSet(source fields.Set, objectMeta *metav1.ObjectMeta, ha
|
|||||||
|
|
||||||
func newEtcdTestStorage(t *testing.T, prefix string) (*etcdtesting.EtcdTestServer, storage.Interface) {
|
func newEtcdTestStorage(t *testing.T, prefix string) (*etcdtesting.EtcdTestServer, storage.Interface) {
|
||||||
server, _ := etcdtesting.NewUnsecuredEtcd3TestClientServer(t, scheme)
|
server, _ := etcdtesting.NewUnsecuredEtcd3TestClientServer(t, scheme)
|
||||||
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), prefix)
|
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), prefix, etcd3.IdentityTransformer)
|
||||||
return server, storage
|
return server, storage
|
||||||
}
|
}
|
||||||
|
|
||||||
|
4
vendor/BUILD
vendored
4
vendor/BUILD
vendored
@ -14930,6 +14930,7 @@ go_test(
|
|||||||
"//vendor:k8s.io/apimachinery/pkg/runtime",
|
"//vendor:k8s.io/apimachinery/pkg/runtime",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
|
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/runtime/serializer",
|
"//vendor:k8s.io/apimachinery/pkg/runtime/serializer",
|
||||||
|
"//vendor:k8s.io/apimachinery/pkg/util/diff",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/watch",
|
"//vendor:k8s.io/apimachinery/pkg/watch",
|
||||||
"//vendor:k8s.io/apiserver/pkg/apis/example",
|
"//vendor:k8s.io/apiserver/pkg/apis/example",
|
||||||
"//vendor:k8s.io/apiserver/pkg/apis/example/v1",
|
"//vendor:k8s.io/apiserver/pkg/apis/example/v1",
|
||||||
@ -15048,11 +15049,13 @@ go_test(
|
|||||||
"//vendor:k8s.io/apimachinery/pkg/runtime",
|
"//vendor:k8s.io/apimachinery/pkg/runtime",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
|
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/runtime/serializer",
|
"//vendor:k8s.io/apimachinery/pkg/runtime/serializer",
|
||||||
|
"//vendor:k8s.io/apimachinery/pkg/util/diff",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/util/wait",
|
"//vendor:k8s.io/apimachinery/pkg/util/wait",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/watch",
|
"//vendor:k8s.io/apimachinery/pkg/watch",
|
||||||
"//vendor:k8s.io/apiserver/pkg/apis/example",
|
"//vendor:k8s.io/apiserver/pkg/apis/example",
|
||||||
"//vendor:k8s.io/apiserver/pkg/apis/example/v1",
|
"//vendor:k8s.io/apiserver/pkg/apis/example/v1",
|
||||||
"//vendor:k8s.io/apiserver/pkg/storage",
|
"//vendor:k8s.io/apiserver/pkg/storage",
|
||||||
|
"//vendor:k8s.io/apiserver/pkg/storage/tests",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -15075,6 +15078,7 @@ go_library(
|
|||||||
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
|
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/conversion",
|
"//vendor:k8s.io/apimachinery/pkg/conversion",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/runtime",
|
"//vendor:k8s.io/apimachinery/pkg/runtime",
|
||||||
|
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/watch",
|
"//vendor:k8s.io/apimachinery/pkg/watch",
|
||||||
"//vendor:k8s.io/apiserver/pkg/storage",
|
"//vendor:k8s.io/apiserver/pkg/storage",
|
||||||
"//vendor:k8s.io/apiserver/pkg/storage/etcd",
|
"//vendor:k8s.io/apiserver/pkg/storage/etcd",
|
||||||
|
Loading…
Reference in New Issue
Block a user