mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Update etcd3 storage to leverage storage/value interfaces
Adds context argument which must be set for AES GCM authenticated data to be passed.
This commit is contained in:
parent
f418468c87
commit
a73990a33f
@ -36,27 +36,25 @@ import (
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/etcd"
|
||||
"k8s.io/apiserver/pkg/storage/value"
|
||||
utiltrace "k8s.io/apiserver/pkg/util/trace"
|
||||
)
|
||||
|
||||
// ValueTransformer allows a string value to be transformed before being read from or written to the underlying store. The methods
|
||||
// must be able to undo the transformation caused by the other.
|
||||
type ValueTransformer interface {
|
||||
// TransformFromStorage may transform the provided data from its underlying storage representation or return an error.
|
||||
// Stale is true if the object on disk is stale and a write to etcd should be issued, even if the contents of the object
|
||||
// have not changed.
|
||||
TransformFromStorage([]byte) (data []byte, stale bool, err error)
|
||||
// TransformToStorage may transform the provided data into the appropriate form in storage or return an error.
|
||||
TransformToStorage([]byte) (data []byte, err error)
|
||||
// authenticatedDataString satisfies the value.Context interface. It uses the key to
|
||||
// authenticate the stored data. This does not defend against reuse of previously
|
||||
// encrypted values under the same key, but will prevent an attacker from using an
|
||||
// encrypted value from a different key. A stronger authenticated data segment would
|
||||
// include the etcd3 Version field (which is incremented on each write to a key and
|
||||
// reset when the key is deleted), but an attacker with write access to etcd can
|
||||
// force deletion and recreation of keys to weaken that angle.
|
||||
type authenticatedDataString string
|
||||
|
||||
// AuthenticatedData implements the value.Context interface.
|
||||
func (d authenticatedDataString) AuthenticatedData() []byte {
|
||||
return []byte(string(d))
|
||||
}
|
||||
|
||||
type identityTransformer struct{}
|
||||
|
||||
func (identityTransformer) TransformFromStorage(b []byte) ([]byte, bool, error) { return b, false, nil }
|
||||
func (identityTransformer) TransformToStorage(b []byte) ([]byte, error) { return b, nil }
|
||||
|
||||
// IdentityTransformer performs no transformation on the provided values.
|
||||
var IdentityTransformer ValueTransformer = identityTransformer{}
|
||||
var _ value.Context = authenticatedDataString("")
|
||||
|
||||
type store struct {
|
||||
client *clientv3.Client
|
||||
@ -65,7 +63,7 @@ type store struct {
|
||||
getOps []clientv3.OpOption
|
||||
codec runtime.Codec
|
||||
versioner storage.Versioner
|
||||
transformer ValueTransformer
|
||||
transformer value.Transformer
|
||||
pathPrefix string
|
||||
watcher *watcher
|
||||
}
|
||||
@ -84,17 +82,17 @@ type objState struct {
|
||||
}
|
||||
|
||||
// New returns an etcd3 implementation of storage.Interface.
|
||||
func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer ValueTransformer) storage.Interface {
|
||||
func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer) storage.Interface {
|
||||
return newStore(c, true, codec, prefix, transformer)
|
||||
}
|
||||
|
||||
// NewWithNoQuorumRead returns etcd3 implementation of storage.Interface
|
||||
// where Get operations don't require quorum read.
|
||||
func NewWithNoQuorumRead(c *clientv3.Client, codec runtime.Codec, prefix string, transformer ValueTransformer) storage.Interface {
|
||||
func NewWithNoQuorumRead(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer) storage.Interface {
|
||||
return newStore(c, false, codec, prefix, transformer)
|
||||
}
|
||||
|
||||
func newStore(c *clientv3.Client, quorumRead bool, codec runtime.Codec, prefix string, transformer ValueTransformer) *store {
|
||||
func newStore(c *clientv3.Client, quorumRead bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
|
||||
versioner := etcd.APIObjectVersioner{}
|
||||
result := &store{
|
||||
client: c,
|
||||
@ -136,7 +134,7 @@ func (s *store) Get(ctx context.Context, key string, resourceVersion string, out
|
||||
}
|
||||
kv := getResp.Kvs[0]
|
||||
|
||||
data, _, err := s.transformer.TransformFromStorage(kv.Value)
|
||||
data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))
|
||||
if err != nil {
|
||||
return storage.NewInternalError(err.Error())
|
||||
}
|
||||
@ -160,7 +158,7 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
|
||||
return err
|
||||
}
|
||||
|
||||
newData, err := s.transformer.TransformToStorage(data)
|
||||
newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key))
|
||||
if err != nil {
|
||||
return storage.NewInternalError(err.Error())
|
||||
}
|
||||
@ -185,16 +183,16 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
|
||||
}
|
||||
|
||||
// Delete implements storage.Interface.Delete.
|
||||
func (s *store) Delete(ctx context.Context, key string, out runtime.Object, precondtions *storage.Preconditions) error {
|
||||
func (s *store) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions) error {
|
||||
v, err := conversion.EnforcePtr(out)
|
||||
if err != nil {
|
||||
panic("unable to convert output object to pointer")
|
||||
}
|
||||
key = path.Join(s.pathPrefix, key)
|
||||
if precondtions == nil {
|
||||
if preconditions == nil {
|
||||
return s.unconditionalDelete(ctx, key, out)
|
||||
}
|
||||
return s.conditionalDelete(ctx, key, out, v, precondtions)
|
||||
return s.conditionalDelete(ctx, key, out, v, preconditions)
|
||||
}
|
||||
|
||||
func (s *store) unconditionalDelete(ctx context.Context, key string, out runtime.Object) error {
|
||||
@ -213,14 +211,14 @@ func (s *store) unconditionalDelete(ctx context.Context, key string, out runtime
|
||||
}
|
||||
|
||||
kv := getResp.Kvs[0]
|
||||
data, _, err := s.transformer.TransformFromStorage(kv.Value)
|
||||
data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))
|
||||
if err != nil {
|
||||
return storage.NewInternalError(err.Error())
|
||||
}
|
||||
return decode(s.codec, s.versioner, data, out, kv.ModRevision)
|
||||
}
|
||||
|
||||
func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, precondtions *storage.Preconditions) error {
|
||||
func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions) error {
|
||||
getResp, err := s.client.KV.Get(ctx, key)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -230,7 +228,7 @@ func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.O
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := checkPreconditions(key, precondtions, origState.obj); err != nil {
|
||||
if err := checkPreconditions(key, preconditions, origState.obj); err != nil {
|
||||
return err
|
||||
}
|
||||
txnResp, err := s.client.KV.Txn(ctx).If(
|
||||
@ -255,7 +253,7 @@ func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.O
|
||||
// GuaranteedUpdate implements storage.Interface.GuaranteedUpdate.
|
||||
func (s *store) GuaranteedUpdate(
|
||||
ctx context.Context, key string, out runtime.Object, ignoreNotFound bool,
|
||||
precondtions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error {
|
||||
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error {
|
||||
trace := utiltrace.New(fmt.Sprintf("GuaranteedUpdate etcd3: %s", reflect.TypeOf(out).String()))
|
||||
defer trace.LogIfLong(500 * time.Millisecond)
|
||||
|
||||
@ -283,8 +281,9 @@ func (s *store) GuaranteedUpdate(
|
||||
}
|
||||
trace.Step("initial value restored")
|
||||
|
||||
transformContext := authenticatedDataString(key)
|
||||
for {
|
||||
if err := checkPreconditions(key, precondtions, origState.obj); err != nil {
|
||||
if err := checkPreconditions(key, preconditions, origState.obj); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -301,7 +300,7 @@ func (s *store) GuaranteedUpdate(
|
||||
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
|
||||
}
|
||||
|
||||
newData, err := s.transformer.TransformToStorage(data)
|
||||
newData, err := s.transformer.TransformToStorage(data, transformContext)
|
||||
if err != nil {
|
||||
return storage.NewInternalError(err.Error())
|
||||
}
|
||||
@ -354,7 +353,7 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin
|
||||
if len(getResp.Kvs) == 0 {
|
||||
return nil
|
||||
}
|
||||
data, _, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value)
|
||||
data, _, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value, authenticatedDataString(key))
|
||||
if err != nil {
|
||||
return storage.NewInternalError(err.Error())
|
||||
}
|
||||
@ -389,7 +388,7 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
|
||||
|
||||
elems := make([]*elemForDecode, 0, len(getResp.Kvs))
|
||||
for _, kv := range getResp.Kvs {
|
||||
data, _, err := s.transformer.TransformFromStorage(kv.Value)
|
||||
data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", key, err))
|
||||
continue
|
||||
@ -439,7 +438,7 @@ func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Va
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
data, stale, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value)
|
||||
data, stale, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value, authenticatedDataString(key))
|
||||
if err != nil {
|
||||
return nil, storage.NewInternalError(err.Error())
|
||||
}
|
||||
|
@ -35,6 +35,7 @@ import (
|
||||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
storagetests "k8s.io/apiserver/pkg/storage/tests"
|
||||
"k8s.io/apiserver/pkg/storage/value"
|
||||
|
||||
"github.com/coreos/etcd/integration"
|
||||
"golang.org/x/net/context"
|
||||
@ -56,13 +57,19 @@ type prefixTransformer struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (p prefixTransformer) TransformFromStorage(b []byte) ([]byte, bool, error) {
|
||||
func (p prefixTransformer) TransformFromStorage(b []byte, ctx value.Context) ([]byte, bool, error) {
|
||||
if ctx == nil {
|
||||
panic("no context provided")
|
||||
}
|
||||
if !bytes.HasPrefix(b, p.prefix) {
|
||||
return nil, false, fmt.Errorf("value does not have expected prefix: %s", string(b))
|
||||
}
|
||||
return bytes.TrimPrefix(b, p.prefix), p.stale, p.err
|
||||
}
|
||||
func (p prefixTransformer) TransformToStorage(b []byte) ([]byte, error) {
|
||||
func (p prefixTransformer) TransformToStorage(b []byte, ctx value.Context) ([]byte, error) {
|
||||
if ctx == nil {
|
||||
panic("no context provided")
|
||||
}
|
||||
if len(b) > 0 {
|
||||
return append(append([]byte{}, p.prefix...), b...), p.err
|
||||
}
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/value"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
@ -43,7 +44,7 @@ type watcher struct {
|
||||
client *clientv3.Client
|
||||
codec runtime.Codec
|
||||
versioner storage.Versioner
|
||||
transformer ValueTransformer
|
||||
transformer value.Transformer
|
||||
}
|
||||
|
||||
// watchChan implements watch.Interface.
|
||||
@ -60,7 +61,7 @@ type watchChan struct {
|
||||
errChan chan error
|
||||
}
|
||||
|
||||
func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.Versioner, transformer ValueTransformer) *watcher {
|
||||
func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.Versioner, transformer value.Transformer) *watcher {
|
||||
return &watcher{
|
||||
client: client,
|
||||
codec: codec,
|
||||
@ -343,7 +344,7 @@ func (wc *watchChan) sendEvent(e *event) {
|
||||
|
||||
func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtime.Object, err error) {
|
||||
if !e.isDeleted {
|
||||
data, _, err := wc.watcher.transformer.TransformFromStorage(e.value)
|
||||
data, _, err := wc.watcher.transformer.TransformFromStorage(e.value, authenticatedDataString(e.key))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@ -358,7 +359,7 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim
|
||||
// we need the object only to compute whether it was filtered out
|
||||
// before).
|
||||
if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) {
|
||||
data, _, err := wc.watcher.transformer.TransformFromStorage(e.prevValue)
|
||||
data, _, err := wc.watcher.transformer.TransformFromStorage(e.prevValue, authenticatedDataString(e.key))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
"k8s.io/apiserver/pkg/storage/value"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
@ -56,7 +57,7 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
|
||||
client.Close()
|
||||
}
|
||||
if c.Quorum {
|
||||
return etcd3.New(client, c.Codec, c.Prefix, etcd3.IdentityTransformer), destroyFunc, nil
|
||||
return etcd3.New(client, c.Codec, c.Prefix, value.IdentityTransformer), destroyFunc, nil
|
||||
}
|
||||
return etcd3.NewWithNoQuorumRead(client, c.Codec, c.Prefix, etcd3.IdentityTransformer), destroyFunc, nil
|
||||
return etcd3.NewWithNoQuorumRead(client, c.Codec, c.Prefix, value.IdentityTransformer), destroyFunc, nil
|
||||
}
|
||||
|
@ -42,6 +42,7 @@ import (
|
||||
"k8s.io/apiserver/pkg/storage/etcd/etcdtest"
|
||||
etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3"
|
||||
"k8s.io/apiserver/pkg/storage/value"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
@ -92,7 +93,7 @@ func AddObjectMetaFieldsSet(source fields.Set, objectMeta *metav1.ObjectMeta, ha
|
||||
|
||||
func newEtcdTestStorage(t *testing.T, prefix string) (*etcdtesting.EtcdTestServer, storage.Interface) {
|
||||
server, _ := etcdtesting.NewUnsecuredEtcd3TestClientServer(t, scheme)
|
||||
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), prefix, etcd3.IdentityTransformer)
|
||||
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), prefix, value.IdentityTransformer)
|
||||
return server, storage
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user