storage/factory: extend the Create method by newList and resourcePrefix params

This commit is contained in:
Lukasz Szaszkiewicz 2023-07-28 09:53:01 +02:00
parent bcbceea117
commit ccabc01093
19 changed files with 58 additions and 34 deletions

View File

@ -132,7 +132,9 @@ func (s *storageLeases) Destroy() {
// NewLeases creates a new etcd-based Leases implementation. // NewLeases creates a new etcd-based Leases implementation.
func NewLeases(config *storagebackend.ConfigForResource, baseKey string, leaseTime time.Duration) (Leases, error) { func NewLeases(config *storagebackend.ConfigForResource, baseKey string, leaseTime time.Duration) (Leases, error) {
leaseStorage, destroyFn, err := storagefactory.Create(*config, nil) // note that newFunc, newListFunc and resourcePrefix
// can be left blank unless the storage.Watch method is used
leaseStorage, destroyFn, err := storagefactory.Create(*config, nil, nil, "")
if err != nil { if err != nil {
return nil, fmt.Errorf("error creating storage factory: %v", err) return nil, fmt.Errorf("error creating storage factory: %v", err)
} }

View File

@ -93,9 +93,10 @@ func TestLeaseEndpointReconciler(t *testing.T) {
t.Cleanup(func() { server.Terminate(t) }) t.Cleanup(func() { server.Terminate(t) })
newFunc := func() runtime.Object { return &corev1.Endpoints{} } newFunc := func() runtime.Object { return &corev1.Endpoints{} }
newListFunc := func() runtime.Object { return &corev1.EndpointsList{} }
sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion) sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion)
s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "endpoints"}), newFunc) s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "endpoints"}), newFunc, newListFunc, "")
if err != nil { if err != nil {
t.Fatalf("Error creating storage: %v", err) t.Fatalf("Error creating storage: %v", err)
} }
@ -456,9 +457,10 @@ func TestLeaseRemoveEndpoints(t *testing.T) {
t.Cleanup(func() { server.Terminate(t) }) t.Cleanup(func() { server.Terminate(t) })
newFunc := func() runtime.Object { return &corev1.Endpoints{} } newFunc := func() runtime.Object { return &corev1.Endpoints{} }
newListFunc := func() runtime.Object { return &corev1.EndpointsList{} }
sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion) sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion)
s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "pods"}), newFunc) s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "pods"}), newFunc, newListFunc, "")
if err != nil { if err != nil {
t.Fatalf("Error creating storage: %v", err) t.Fatalf("Error creating storage: %v", err)
} }
@ -582,9 +584,10 @@ func TestApiserverShutdown(t *testing.T) {
t.Cleanup(func() { server.Terminate(t) }) t.Cleanup(func() { server.Terminate(t) })
newFunc := func() runtime.Object { return &corev1.Endpoints{} } newFunc := func() runtime.Object { return &corev1.Endpoints{} }
newListFunc := func() runtime.Object { return &corev1.EndpointsList{} }
sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion) sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion)
s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "endpoints"}), newFunc) s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "endpoints"}), newFunc, newListFunc, "")
if err != nil { if err != nil {
t.Fatalf("Error creating storage: %v", err) t.Fatalf("Error creating storage: %v", err)
} }

View File

@ -30,7 +30,7 @@ import (
func TestPodLogValidates(t *testing.T) { func TestPodLogValidates(t *testing.T) {
config, server := registrytest.NewEtcdStorage(t, "") config, server := registrytest.NewEtcdStorage(t, "")
defer server.Terminate(t) defer server.Terminate(t)
s, destroyFunc, err := generic.NewRawStorage(config, nil) s, destroyFunc, err := generic.NewRawStorage(config, nil, nil, "")
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }

View File

@ -63,7 +63,9 @@ var _ rangeallocation.RangeRegistry = &Etcd{}
// NewEtcd returns an allocator that is backed by Etcd and can manage // NewEtcd returns an allocator that is backed by Etcd and can manage
// persisting the snapshot state of allocation after each allocation is made. // persisting the snapshot state of allocation after each allocation is made.
func NewEtcd(alloc allocator.Snapshottable, baseKey string, config *storagebackend.ConfigForResource) (*Etcd, error) { func NewEtcd(alloc allocator.Snapshottable, baseKey string, config *storagebackend.ConfigForResource) (*Etcd, error) {
storage, d, err := generic.NewRawStorage(config, nil) // note that newFunc, newListFunc and resourcePrefix
// can be left blank unless the storage.Watch method is used
storage, d, err := generic.NewRawStorage(config, nil, nil, "")
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -56,7 +56,7 @@ func newStorage(t *testing.T) (*etcd3testing.EtcdTestServer, ipallocator.Interfa
if err != nil { if err != nil {
t.Fatalf("unexpected error creating etcd: %v", err) t.Fatalf("unexpected error creating etcd: %v", err)
} }
s, d, err := generic.NewRawStorage(configForAllocations, nil) s, d, err := generic.NewRawStorage(configForAllocations, nil, nil, "")
if err != nil { if err != nil {
t.Fatalf("Couldn't create storage: %v", err) t.Fatalf("Couldn't create storage: %v", err)
} }

View File

@ -61,7 +61,7 @@ func newStorage(t *testing.T) (*etcd3testing.EtcdTestServer, portallocator.Inter
if err != nil { if err != nil {
t.Fatalf("unexpected error creating etcd: %v", err) t.Fatalf("unexpected error creating etcd: %v", err)
} }
s, d, err := generic.NewRawStorage(configForAllocations, nil) s, d, err := generic.NewRawStorage(configForAllocations, nil, nil, "")
if err != nil { if err != nil {
t.Fatalf("Couldn't create storage: %v", err) t.Fatalf("Couldn't create storage: %v", err)
} }

View File

@ -78,7 +78,9 @@ type peerEndpointLeaseReconciler struct {
// NewPeerEndpointLeaseReconciler creates a new peer endpoint lease reconciler // NewPeerEndpointLeaseReconciler creates a new peer endpoint lease reconciler
func NewPeerEndpointLeaseReconciler(config *storagebackend.ConfigForResource, baseKey string, leaseTime time.Duration) (PeerEndpointLeaseReconciler, error) { func NewPeerEndpointLeaseReconciler(config *storagebackend.ConfigForResource, baseKey string, leaseTime time.Duration) (PeerEndpointLeaseReconciler, error) {
leaseStorage, destroyFn, err := storagefactory.Create(*config, nil) // note that newFunc, newListFunc and resourcePrefix
// can be left blank unless the storage.Watch method is used
leaseStorage, destroyFn, err := storagefactory.Create(*config, nil, nil, "")
if err != nil { if err != nil {
return nil, fmt.Errorf("error creating storage factory: %v", err) return nil, fmt.Errorf("error creating storage factory: %v", err)
} }

View File

@ -89,9 +89,10 @@ func TestPeerEndpointLeaseReconciler(t *testing.T) {
t.Cleanup(func() { server.Terminate(t) }) t.Cleanup(func() { server.Terminate(t) })
newFunc := func() runtime.Object { return &corev1.Endpoints{} } newFunc := func() runtime.Object { return &corev1.Endpoints{} }
newListFunc := func() runtime.Object { return &corev1.EndpointsList{} }
sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion) sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion)
s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "endpoints"}), newFunc) s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "endpoints"}), newFunc, newListFunc, "")
if err != nil { if err != nil {
t.Fatalf("Error creating storage: %v", err) t.Fatalf("Error creating storage: %v", err)
} }
@ -195,9 +196,10 @@ func TestPeerLeaseRemoveEndpoints(t *testing.T) {
t.Cleanup(func() { server.Terminate(t) }) t.Cleanup(func() { server.Terminate(t) })
newFunc := func() runtime.Object { return &corev1.Endpoints{} } newFunc := func() runtime.Object { return &corev1.Endpoints{} }
newListFunc := func() runtime.Object { return &corev1.EndpointsList{} }
sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion) sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion)
s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "pods"}), newFunc) s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "pods"}), newFunc, newListFunc, "")
if err != nil { if err != nil {
t.Fatalf("Error creating storage: %v", err) t.Fatalf("Error creating storage: %v", err)
} }

View File

@ -39,7 +39,7 @@ import (
func NewDryRunnableTestStorage(t *testing.T) (DryRunnableStorage, func()) { func NewDryRunnableTestStorage(t *testing.T) (DryRunnableStorage, func()) {
server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
sc.Codec = apitesting.TestStorageCodec(codecs, examplev1.SchemeGroupVersion) sc.Codec = apitesting.TestStorageCodec(codecs, examplev1.SchemeGroupVersion)
s, destroy, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "pods"}), nil) s, destroy, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "pods"}), nil, nil, "")
if err != nil { if err != nil {
t.Fatalf("Error creating storage: %v", err) t.Fatalf("Error creating storage: %v", err)
} }

View File

@ -44,7 +44,7 @@ func StorageWithCacher() generic.StorageDecorator {
triggerFuncs storage.IndexerFuncs, triggerFuncs storage.IndexerFuncs,
indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) { indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
s, d, err := generic.NewRawStorage(storageConfig, newFunc) s, d, err := generic.NewRawStorage(storageConfig, newFunc, newListFunc, resourcePrefix)
if err != nil { if err != nil {
return s, d, err return s, d, err
} }

View File

@ -2325,7 +2325,7 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
newListFunc := func() runtime.Object { return &example.PodList{} } newListFunc := func() runtime.Object { return &example.PodList{} }
sc.Codec = apitesting.TestStorageCodec(codecs, examplev1.SchemeGroupVersion) sc.Codec = apitesting.TestStorageCodec(codecs, examplev1.SchemeGroupVersion)
s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "pods"}), newFunc) s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "pods"}), newFunc, newListFunc, "/pods")
if err != nil { if err != nil {
t.Fatalf("Error creating storage: %v", err) t.Fatalf("Error creating storage: %v", err)
} }

View File

@ -47,12 +47,12 @@ func UndecoratedStorage(
getAttrsFunc storage.AttrFunc, getAttrsFunc storage.AttrFunc,
trigger storage.IndexerFuncs, trigger storage.IndexerFuncs,
indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) { indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
return NewRawStorage(config, newFunc) return NewRawStorage(config, newFunc, newListFunc, resourcePrefix)
} }
// NewRawStorage creates the low level kv storage. This is a work-around for current // NewRawStorage creates the low level kv storage. This is a work-around for current
// two layer of same storage interface. // two layer of same storage interface.
// TODO: Once cacher is enabled on all registries (event registry is special), we will remove this method. // TODO: Once cacher is enabled on all registries (event registry is special), we will remove this method.
func NewRawStorage(config *storagebackend.ConfigForResource, newFunc func() runtime.Object) (storage.Interface, factory.DestroyFunc, error) { func NewRawStorage(config *storagebackend.ConfigForResource, newFunc, newListFunc func() runtime.Object, resourcePrefix string) (storage.Interface, factory.DestroyFunc, error) {
return factory.Create(*config, newFunc) return factory.Create(*config, newFunc, newListFunc, resourcePrefix)
} }

View File

@ -61,7 +61,9 @@ func newEtcdTestStorage(t *testing.T, prefix string, pagingEnabled bool) (*etcd3
server.V3Client, server.V3Client,
apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion),
newPod, newPod,
newPodList,
prefix, prefix,
"/pods",
schema.GroupResource{Resource: "pods"}, schema.GroupResource{Resource: "pods"},
identity.NewEncryptCheckTransformer(), identity.NewEncryptCheckTransformer(),
pagingEnabled, pagingEnabled,

View File

@ -99,11 +99,11 @@ type objState struct {
} }
// New returns an etcd3 implementation of storage.Interface. // New returns an etcd3 implementation of storage.Interface.
func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) storage.Interface { func New(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) storage.Interface {
return newStore(c, codec, newFunc, prefix, groupResource, transformer, pagingEnabled, leaseManagerConfig) return newStore(c, codec, newFunc, newListFunc, prefix, resourcePrefix, groupResource, transformer, pagingEnabled, leaseManagerConfig)
} }
func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store { func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store {
versioner := storage.APIObjectVersioner{} versioner := storage.APIObjectVersioner{}
// for compatibility with etcd2 impl. // for compatibility with etcd2 impl.
// no-op for default prefix of '/registry'. // no-op for default prefix of '/registry'.
@ -114,6 +114,7 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Ob
pathPrefix += "/" pathPrefix += "/"
} }
// TODO(p0lyn0mial): pass newListFunc and resourcePrefix to the watcher
w := &watcher{ w := &watcher{
client: c, client: c,
codec: codec, codec: codec,

View File

@ -64,6 +64,10 @@ func newPod() runtime.Object {
return &example.Pod{} return &example.Pod{}
} }
func newPodList() runtime.Object {
return &example.PodList{}
}
func checkStorageInvariants(etcdClient *clientv3.Client, codec runtime.Codec) storagetesting.KeyValidation { func checkStorageInvariants(etcdClient *clientv3.Client, codec runtime.Codec) storagetesting.KeyValidation {
return func(ctx context.Context, t *testing.T, key string) { return func(ctx context.Context, t *testing.T, key string) {
getResp, err := etcdClient.KV.Get(ctx, key) getResp, err := etcdClient.KV.Get(ctx, key)
@ -468,14 +472,16 @@ func (r *clientRecorder) GetReadsAndReset() uint64 {
} }
type setupOptions struct { type setupOptions struct {
client func(testing.TB) *clientv3.Client client func(testing.TB) *clientv3.Client
codec runtime.Codec codec runtime.Codec
newFunc func() runtime.Object newFunc func() runtime.Object
prefix string newListFunc func() runtime.Object
groupResource schema.GroupResource prefix string
transformer value.Transformer resourcePrefix string
pagingEnabled bool groupResource schema.GroupResource
leaseConfig LeaseManagerConfig transformer value.Transformer
pagingEnabled bool
leaseConfig LeaseManagerConfig
recorderEnabled bool recorderEnabled bool
} }
@ -520,7 +526,9 @@ func withDefaults(options *setupOptions) {
} }
options.codec = apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) options.codec = apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
options.newFunc = newPod options.newFunc = newPod
options.newListFunc = newPodList
options.prefix = "" options.prefix = ""
options.resourcePrefix = "/pods"
options.groupResource = schema.GroupResource{Resource: "pods"} options.groupResource = schema.GroupResource{Resource: "pods"}
options.transformer = newTestTransformer() options.transformer = newTestTransformer()
options.pagingEnabled = true options.pagingEnabled = true
@ -543,7 +551,9 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, *store, *cli
client, client,
setupOpts.codec, setupOpts.codec,
setupOpts.newFunc, setupOpts.newFunc,
setupOpts.newListFunc,
setupOpts.prefix, setupOpts.prefix,
setupOpts.resourcePrefix,
setupOpts.groupResource, setupOpts.groupResource,
setupOpts.transformer, setupOpts.transformer,
setupOpts.pagingEnabled, setupOpts.pagingEnabled,

View File

@ -419,7 +419,7 @@ func startCompactorOnce(c storagebackend.TransportConfig, interval time.Duration
}, nil }, nil
} }
func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) { func newETCD3Storage(c storagebackend.ConfigForResource, newFunc, newListFunc func() runtime.Object, resourcePrefix string) (storage.Interface, DestroyFunc, error) {
stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval) stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
@ -454,7 +454,7 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime.
if transformer == nil { if transformer == nil {
transformer = identity.NewEncryptCheckTransformer() transformer = identity.NewEncryptCheckTransformer()
} }
return etcd3.New(client, c.Codec, newFunc, c.Prefix, c.GroupResource, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil return etcd3.New(client, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil
} }
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the // startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the

View File

@ -30,12 +30,12 @@ import (
type DestroyFunc func() type DestroyFunc func()
// Create creates a storage backend based on given config. // Create creates a storage backend based on given config.
func Create(c storagebackend.ConfigForResource, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) { func Create(c storagebackend.ConfigForResource, newFunc, newListFunc func() runtime.Object, resourcePrefix string) (storage.Interface, DestroyFunc, error) {
switch c.Type { switch c.Type {
case storagebackend.StorageTypeETCD2: case storagebackend.StorageTypeETCD2:
return nil, nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type) return nil, nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3: case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3Storage(c, newFunc) return newETCD3Storage(c, newFunc, newListFunc, resourcePrefix)
default: default:
return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type) return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
} }

View File

@ -81,7 +81,7 @@ func TestTLSConnection(t *testing.T) {
}, },
Codec: codec, Codec: codec,
} }
storage, destroyFunc, err := newETCD3Storage(*cfg.ForResource(schema.GroupResource{Resource: "pods"}), nil) storage, destroyFunc, err := newETCD3Storage(*cfg.ForResource(schema.GroupResource{Resource: "pods"}), nil, nil, "")
defer destroyFunc() defer destroyFunc()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@ -85,7 +85,7 @@ func TestGetCurrentResourceVersionFromStorage(t *testing.T) {
// test data // test data
newEtcdTestStorage := func(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) { newEtcdTestStorage := func(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion, example2v1.SchemeGroupVersion), func() runtime.Object { return &example.Pod{} }, prefix, schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), true, etcd3.NewDefaultLeaseManagerConfig()) storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion, example2v1.SchemeGroupVersion), func() runtime.Object { return &example.Pod{} }, func() runtime.Object { return &example.PodList{} }, prefix, "/pods", schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), true, etcd3.NewDefaultLeaseManagerConfig())
return server, storage return server, storage
} }
server, etcdStorage := newEtcdTestStorage(t, "") server, etcdStorage := newEtcdTestStorage(t, "")