mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-06 10:43:56 +00:00
refactor: extract decode functions into an interface for etcd3 store
This commit is contained in:
parent
daf76e6ead
commit
1d1a656d8d
@ -57,16 +57,20 @@ func newPodList() runtime.Object { return &example.PodList{} }
|
|||||||
|
|
||||||
func newEtcdTestStorage(t testing.TB, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
|
func newEtcdTestStorage(t testing.TB, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
|
||||||
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
|
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
|
||||||
|
versioner := storage.APIObjectVersioner{}
|
||||||
|
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||||
storage := etcd3.New(
|
storage := etcd3.New(
|
||||||
server.V3Client,
|
server.V3Client,
|
||||||
apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion),
|
codec,
|
||||||
newPod,
|
newPod,
|
||||||
newPodList,
|
newPodList,
|
||||||
prefix,
|
prefix,
|
||||||
"/pods",
|
"/pods",
|
||||||
schema.GroupResource{Resource: "pods"},
|
schema.GroupResource{Resource: "pods"},
|
||||||
identity.NewEncryptCheckTransformer(),
|
identity.NewEncryptCheckTransformer(),
|
||||||
etcd3.NewDefaultLeaseManagerConfig())
|
etcd3.NewDefaultLeaseManagerConfig(),
|
||||||
|
etcd3.NewDefaultDecoder(codec, versioner),
|
||||||
|
versioner)
|
||||||
return server, storage
|
return server, storage
|
||||||
}
|
}
|
||||||
|
|
||||||
|
94
staging/src/k8s.io/apiserver/pkg/storage/etcd3/decoder.go
Normal file
94
staging/src/k8s.io/apiserver/pkg/storage/etcd3/decoder.go
Normal file
@ -0,0 +1,94 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2024 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package etcd3
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/conversion"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||||
|
"k8s.io/apiserver/pkg/storage"
|
||||||
|
|
||||||
|
"k8s.io/klog/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NewDefaultDecoder returns the default decoder for etcd3 store
|
||||||
|
func NewDefaultDecoder(codec runtime.Codec, versioner storage.Versioner) Decoder {
|
||||||
|
return &defaultDecoder{
|
||||||
|
codec: codec,
|
||||||
|
versioner: versioner,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decoder is used by the etcd storage implementation to decode
|
||||||
|
// transformed data from the storage into an object
|
||||||
|
type Decoder interface {
|
||||||
|
// Decode decodes value of bytes into object. It will also
|
||||||
|
// set the object resource version to rev.
|
||||||
|
// On success, objPtr would be set to the object.
|
||||||
|
Decode(value []byte, objPtr runtime.Object, rev int64) error
|
||||||
|
|
||||||
|
// DecodeListItem decodes bytes value in array into object.
|
||||||
|
DecodeListItem(ctx context.Context, data []byte, rev uint64, newItemFunc func() runtime.Object) (runtime.Object, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ Decoder = &defaultDecoder{}
|
||||||
|
|
||||||
|
type defaultDecoder struct {
|
||||||
|
codec runtime.Codec
|
||||||
|
versioner storage.Versioner
|
||||||
|
}
|
||||||
|
|
||||||
|
// decode decodes value of bytes into object. It will also set the object resource version to rev.
|
||||||
|
// On success, objPtr would be set to the object.
|
||||||
|
func (d *defaultDecoder) Decode(value []byte, objPtr runtime.Object, rev int64) error {
|
||||||
|
if _, err := conversion.EnforcePtr(objPtr); err != nil {
|
||||||
|
// nolint:errorlint // this code was moved from store.go as is
|
||||||
|
return fmt.Errorf("unable to convert output object to pointer: %v", err)
|
||||||
|
}
|
||||||
|
_, _, err := d.codec.Decode(value, nil, objPtr)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// being unable to set the version does not prevent the object from being extracted
|
||||||
|
if err := d.versioner.UpdateObject(objPtr, uint64(rev)); err != nil {
|
||||||
|
klog.Errorf("failed to update object version: %v", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// decodeListItem decodes bytes value in array into object.
|
||||||
|
func (d *defaultDecoder) DecodeListItem(ctx context.Context, data []byte, rev uint64, newItemFunc func() runtime.Object) (runtime.Object, error) {
|
||||||
|
startedAt := time.Now()
|
||||||
|
defer func() {
|
||||||
|
endpointsrequest.TrackDecodeLatency(ctx, time.Since(startedAt))
|
||||||
|
}()
|
||||||
|
|
||||||
|
obj, _, err := d.codec.Decode(data, nil, newItemFunc())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := d.versioner.UpdateObject(obj, rev); err != nil {
|
||||||
|
klog.Errorf("failed to update object version: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return obj, nil
|
||||||
|
}
|
@ -38,7 +38,6 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/apiserver/pkg/audit"
|
"k8s.io/apiserver/pkg/audit"
|
||||||
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
|
|
||||||
"k8s.io/apiserver/pkg/features"
|
"k8s.io/apiserver/pkg/features"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
|
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
|
||||||
@ -82,6 +81,7 @@ type store struct {
|
|||||||
groupResourceString string
|
groupResourceString string
|
||||||
watcher *watcher
|
watcher *watcher
|
||||||
leaseManager *leaseManager
|
leaseManager *leaseManager
|
||||||
|
decoder Decoder
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) RequestWatchProgress(ctx context.Context) error {
|
func (s *store) RequestWatchProgress(ctx context.Context) error {
|
||||||
@ -99,12 +99,11 @@ type objState struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// New returns an etcd3 implementation of storage.Interface.
|
// New returns an etcd3 implementation of storage.Interface.
|
||||||
func New(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig) storage.Interface {
|
func New(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) storage.Interface {
|
||||||
return newStore(c, codec, newFunc, newListFunc, prefix, resourcePrefix, groupResource, transformer, leaseManagerConfig)
|
return newStore(c, codec, newFunc, newListFunc, prefix, resourcePrefix, groupResource, transformer, leaseManagerConfig, decoder, versioner)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig) *store {
|
func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) *store {
|
||||||
versioner := storage.APIObjectVersioner{}
|
|
||||||
// for compatibility with etcd2 impl.
|
// for compatibility with etcd2 impl.
|
||||||
// no-op for default prefix of '/registry'.
|
// no-op for default prefix of '/registry'.
|
||||||
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
|
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
|
||||||
@ -137,6 +136,7 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func
|
|||||||
groupResourceString: groupResource.String(),
|
groupResourceString: groupResource.String(),
|
||||||
watcher: w,
|
watcher: w,
|
||||||
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
|
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
|
||||||
|
decoder: decoder,
|
||||||
}
|
}
|
||||||
|
|
||||||
w.getCurrentStorageRV = func(ctx context.Context) (uint64, error) {
|
w.getCurrentStorageRV = func(ctx context.Context) (uint64, error) {
|
||||||
@ -182,7 +182,7 @@ func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, ou
|
|||||||
return storage.NewInternalError(err.Error())
|
return storage.NewInternalError(err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
err = decode(s.codec, s.versioner, data, out, kv.ModRevision)
|
err = s.decoder.Decode(data, out, kv.ModRevision)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
recordDecodeError(s.groupResourceString, preparedKey)
|
recordDecodeError(s.groupResourceString, preparedKey)
|
||||||
return err
|
return err
|
||||||
@ -248,7 +248,7 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
|
|||||||
|
|
||||||
if out != nil {
|
if out != nil {
|
||||||
putResp := txnResp.Responses[0].GetResponsePut()
|
putResp := txnResp.Responses[0].GetResponsePut()
|
||||||
err = decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
|
err = s.decoder.Decode(data, out, putResp.Header.Revision)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error()))
|
span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error()))
|
||||||
recordDecodeError(s.groupResourceString, preparedKey)
|
recordDecodeError(s.groupResourceString, preparedKey)
|
||||||
@ -379,7 +379,7 @@ func (s *store) conditionalDelete(
|
|||||||
return errors.New("invalid DeleteRange response - nil header")
|
return errors.New("invalid DeleteRange response - nil header")
|
||||||
}
|
}
|
||||||
if !skipTransformDecode {
|
if !skipTransformDecode {
|
||||||
err = decode(s.codec, s.versioner, origState.data, out, deleteResp.Header.Revision)
|
err = s.decoder.Decode(origState.data, out, deleteResp.Header.Revision)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
recordDecodeError(s.groupResourceString, key)
|
recordDecodeError(s.groupResourceString, key)
|
||||||
return err
|
return err
|
||||||
@ -496,7 +496,7 @@ func (s *store) GuaranteedUpdate(
|
|||||||
}
|
}
|
||||||
// recheck that the data from etcd is not stale before short-circuiting a write
|
// recheck that the data from etcd is not stale before short-circuiting a write
|
||||||
if !origState.stale {
|
if !origState.stale {
|
||||||
err = decode(s.codec, s.versioner, origState.data, destination, origState.rev)
|
err = s.decoder.Decode(origState.data, destination, origState.rev)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
recordDecodeError(s.groupResourceString, preparedKey)
|
recordDecodeError(s.groupResourceString, preparedKey)
|
||||||
return err
|
return err
|
||||||
@ -547,7 +547,7 @@ func (s *store) GuaranteedUpdate(
|
|||||||
}
|
}
|
||||||
putResp := txnResp.Responses[0].GetResponsePut()
|
putResp := txnResp.Responses[0].GetResponsePut()
|
||||||
|
|
||||||
err = decode(s.codec, s.versioner, data, destination, putResp.Header.Revision)
|
err = s.decoder.Decode(data, destination, putResp.Header.Revision)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error()))
|
span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error()))
|
||||||
recordDecodeError(s.groupResourceString, preparedKey)
|
recordDecodeError(s.groupResourceString, preparedKey)
|
||||||
@ -779,7 +779,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
|
|||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
obj, err := decodeListItem(ctx, data, uint64(kv.ModRevision), s.codec, s.versioner, newItemFunc)
|
obj, err := s.decoder.DecodeListItem(ctx, data, uint64(kv.ModRevision), newItemFunc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
recordDecodeError(s.groupResourceString, string(kv.Key))
|
recordDecodeError(s.groupResourceString, string(kv.Key))
|
||||||
return err
|
return err
|
||||||
@ -939,7 +939,7 @@ func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key
|
|||||||
state.data = data
|
state.data = data
|
||||||
state.stale = stale
|
state.stale = stale
|
||||||
|
|
||||||
if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil {
|
if err := s.decoder.Decode(state.data, state.obj, state.rev); err != nil {
|
||||||
recordDecodeError(s.groupResourceString, key)
|
recordDecodeError(s.groupResourceString, key)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -1046,42 +1046,6 @@ func (s *store) prepareKey(key string) (string, error) {
|
|||||||
return s.pathPrefix + key[startIndex:], nil
|
return s.pathPrefix + key[startIndex:], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// decode decodes value of bytes into object. It will also set the object resource version to rev.
|
|
||||||
// On success, objPtr would be set to the object.
|
|
||||||
func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objPtr runtime.Object, rev int64) error {
|
|
||||||
if _, err := conversion.EnforcePtr(objPtr); err != nil {
|
|
||||||
return fmt.Errorf("unable to convert output object to pointer: %v", err)
|
|
||||||
}
|
|
||||||
_, _, err := codec.Decode(value, nil, objPtr)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// being unable to set the version does not prevent the object from being extracted
|
|
||||||
if err := versioner.UpdateObject(objPtr, uint64(rev)); err != nil {
|
|
||||||
klog.Errorf("failed to update object version: %v", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// decodeListItem decodes bytes value in array into object.
|
|
||||||
func decodeListItem(ctx context.Context, data []byte, rev uint64, codec runtime.Codec, versioner storage.Versioner, newItemFunc func() runtime.Object) (runtime.Object, error) {
|
|
||||||
startedAt := time.Now()
|
|
||||||
defer func() {
|
|
||||||
endpointsrequest.TrackDecodeLatency(ctx, time.Since(startedAt))
|
|
||||||
}()
|
|
||||||
|
|
||||||
obj, _, err := codec.Decode(data, nil, newItemFunc())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := versioner.UpdateObject(obj, rev); err != nil {
|
|
||||||
klog.Errorf("failed to update object version: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return obj, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// recordDecodeError record decode error split by object type.
|
// recordDecodeError record decode error split by object type.
|
||||||
func recordDecodeError(resource string, key string) {
|
func recordDecodeError(resource string, key string) {
|
||||||
metrics.RecordDecodeError(resource)
|
metrics.RecordDecodeError(resource)
|
||||||
|
@ -566,6 +566,7 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, *store, *cli
|
|||||||
if setupOpts.recorderEnabled {
|
if setupOpts.recorderEnabled {
|
||||||
client.KV = &clientRecorder{KV: client.KV}
|
client.KV = &clientRecorder{KV: client.KV}
|
||||||
}
|
}
|
||||||
|
versioner := storage.APIObjectVersioner{}
|
||||||
store := newStore(
|
store := newStore(
|
||||||
client,
|
client,
|
||||||
setupOpts.codec,
|
setupOpts.codec,
|
||||||
@ -576,6 +577,8 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, *store, *cli
|
|||||||
setupOpts.groupResource,
|
setupOpts.groupResource,
|
||||||
setupOpts.transformer,
|
setupOpts.transformer,
|
||||||
setupOpts.leaseConfig,
|
setupOpts.leaseConfig,
|
||||||
|
NewDefaultDecoder(setupOpts.codec, versioner),
|
||||||
|
versioner,
|
||||||
)
|
)
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
return ctx, store, client
|
return ctx, store, client
|
||||||
|
@ -459,7 +459,10 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc, newListFunc fu
|
|||||||
if transformer == nil {
|
if transformer == nil {
|
||||||
transformer = identity.NewEncryptCheckTransformer()
|
transformer = identity.NewEncryptCheckTransformer()
|
||||||
}
|
}
|
||||||
return etcd3.New(client, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.LeaseManagerConfig), destroyFunc, nil
|
|
||||||
|
versioner := storage.APIObjectVersioner{}
|
||||||
|
decoder := etcd3.NewDefaultDecoder(c.Codec, versioner)
|
||||||
|
return etcd3.New(client, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.LeaseManagerConfig, decoder, versioner), destroyFunc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the
|
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the
|
||||||
|
@ -84,8 +84,10 @@ func TestHighWaterMark(t *testing.T) {
|
|||||||
func TestGetCurrentResourceVersionFromStorage(t *testing.T) {
|
func TestGetCurrentResourceVersionFromStorage(t *testing.T) {
|
||||||
// test data
|
// test data
|
||||||
newEtcdTestStorage := func(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
|
newEtcdTestStorage := func(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
|
||||||
|
versioner := storage.APIObjectVersioner{}
|
||||||
|
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion, example2v1.SchemeGroupVersion)
|
||||||
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
|
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
|
||||||
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion, example2v1.SchemeGroupVersion), func() runtime.Object { return &example.Pod{} }, func() runtime.Object { return &example.PodList{} }, prefix, "/pods", schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), etcd3.NewDefaultLeaseManagerConfig())
|
storage := etcd3.New(server.V3Client, codec, func() runtime.Object { return &example.Pod{} }, func() runtime.Object { return &example.PodList{} }, prefix, "/pods", schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), etcd3.NewDefaultLeaseManagerConfig(), etcd3.NewDefaultDecoder(codec, versioner), versioner)
|
||||||
return server, storage
|
return server, storage
|
||||||
}
|
}
|
||||||
server, etcdStorage := newEtcdTestStorage(t, "")
|
server, etcdStorage := newEtcdTestStorage(t, "")
|
||||||
|
Loading…
Reference in New Issue
Block a user