Merge pull request #104981 from MikeSpreitzer/plumb-group-resource-to-etcd3

Plumb the schema.GroupResource into etcd3 Store struct
This commit is contained in:
Kubernetes Prow Robot 2021-09-13 23:59:08 -07:00 committed by GitHub
commit 623f9f36e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 26 additions and 21 deletions

View File

@ -33,6 +33,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/conversion"
@ -69,6 +70,7 @@ type store struct {
versioner storage.Versioner versioner storage.Versioner
transformer value.Transformer transformer value.Transformer
pathPrefix string pathPrefix string
groupResource schema.GroupResource
watcher *watcher watcher *watcher
pagingEnabled bool pagingEnabled bool
leaseManager *leaseManager leaseManager *leaseManager
@ -83,11 +85,11 @@ type objState struct {
} }
// New returns an etcd3 implementation of storage.Interface. // New returns an etcd3 implementation of storage.Interface.
func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) storage.Interface { func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) storage.Interface {
return newStore(c, codec, newFunc, prefix, transformer, pagingEnabled, leaseManagerConfig) return newStore(c, codec, newFunc, prefix, groupResource, transformer, pagingEnabled, leaseManagerConfig)
} }
func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store { func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store {
versioner := APIObjectVersioner{} versioner := APIObjectVersioner{}
result := &store{ result := &store{
client: c, client: c,
@ -98,9 +100,10 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Ob
// for compatibility with etcd2 impl. // for compatibility with etcd2 impl.
// no-op for default prefix of '/registry'. // no-op for default prefix of '/registry'.
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/' // keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
pathPrefix: path.Join("/", prefix), pathPrefix: path.Join("/", prefix),
watcher: newWatcher(c, codec, newFunc, versioner, transformer), groupResource: groupResource,
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig), watcher: newWatcher(c, codec, newFunc, versioner, transformer),
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
} }
return result return result
} }

View File

@ -42,6 +42,7 @@ import (
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
@ -1000,7 +1001,7 @@ func TestGuaranteedUpdateWithSuggestionAndConflict(t *testing.T) {
func TestTransformationFailure(t *testing.T) { func TestTransformationFailure(t *testing.T) {
client := testserver.RunEtcd(t, nil) client := testserver.RunEtcd(t, nil)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig()) store := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig())
ctx := context.Background() ctx := context.Background()
preset := []struct { preset := []struct {
@ -1076,8 +1077,8 @@ func TestList(t *testing.T) {
client := testserver.RunEtcd(t, nil) client := testserver.RunEtcd(t, nil)
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RemainingItemCount, true)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RemainingItemCount, true)()
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, NewDefaultLeaseManagerConfig()) store := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, NewDefaultLeaseManagerConfig())
disablePagingStore := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig()) disablePagingStore := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig())
ctx := context.Background() ctx := context.Background()
// Setup storage with the following structure: // Setup storage with the following structure:
@ -1573,7 +1574,7 @@ func TestListContinuation(t *testing.T) {
transformer := &prefixTransformer{prefix: []byte(defaultTestPrefix)} transformer := &prefixTransformer{prefix: []byte(defaultTestPrefix)}
recorder := &clientRecorder{KV: etcdClient.KV} recorder := &clientRecorder{KV: etcdClient.KV}
etcdClient.KV = recorder etcdClient.KV = recorder
store := newStore(etcdClient, codec, newPod, "", transformer, true, NewDefaultLeaseManagerConfig()) store := newStore(etcdClient, codec, newPod, "", schema.GroupResource{Resource: "pods"}, transformer, true, NewDefaultLeaseManagerConfig())
ctx := context.Background() ctx := context.Background()
// Setup storage with the following structure: // Setup storage with the following structure:
@ -1733,7 +1734,7 @@ func TestListContinuationWithFilter(t *testing.T) {
transformer := &prefixTransformer{prefix: []byte(defaultTestPrefix)} transformer := &prefixTransformer{prefix: []byte(defaultTestPrefix)}
recorder := &clientRecorder{KV: etcdClient.KV} recorder := &clientRecorder{KV: etcdClient.KV}
etcdClient.KV = recorder etcdClient.KV = recorder
store := newStore(etcdClient, codec, newPod, "", transformer, true, NewDefaultLeaseManagerConfig()) store := newStore(etcdClient, codec, newPod, "", schema.GroupResource{Resource: "pods"}, transformer, true, NewDefaultLeaseManagerConfig())
ctx := context.Background() ctx := context.Background()
preset := []struct { preset := []struct {
@ -1835,7 +1836,7 @@ func TestListContinuationWithFilter(t *testing.T) {
func TestListInconsistentContinuation(t *testing.T) { func TestListInconsistentContinuation(t *testing.T) {
client := testserver.RunEtcd(t, nil) client := testserver.RunEtcd(t, nil)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, NewDefaultLeaseManagerConfig()) store := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, NewDefaultLeaseManagerConfig())
ctx := context.Background() ctx := context.Background()
// Setup storage with the following structure: // Setup storage with the following structure:
@ -1983,7 +1984,7 @@ func testSetup(t *testing.T) (context.Context, *store, *clientv3.Client) {
// As 30s is the default timeout for testing in glboal configuration, // As 30s is the default timeout for testing in glboal configuration,
// we cannot wait longer than that in a single time: change it to 10 // we cannot wait longer than that in a single time: change it to 10
// for testing purposes. See apimachinery/pkg/util/wait/wait.go // for testing purposes. See apimachinery/pkg/util/wait/wait.go
store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, LeaseManagerConfig{ store := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, LeaseManagerConfig{
ReuseDurationSeconds: 1, ReuseDurationSeconds: 1,
MaxObjectCount: defaultLeaseMaxObjectCount, MaxObjectCount: defaultLeaseMaxObjectCount,
}) })
@ -2027,7 +2028,7 @@ func TestPrefix(t *testing.T) {
"/registry": "/registry", "/registry": "/registry",
} }
for configuredPrefix, effectivePrefix := range testcases { for configuredPrefix, effectivePrefix := range testcases {
store := newStore(client, codec, nil, configuredPrefix, transformer, true, NewDefaultLeaseManagerConfig()) store := newStore(client, codec, nil, configuredPrefix, schema.GroupResource{Resource: "widgets"}, transformer, true, NewDefaultLeaseManagerConfig())
if store.pathPrefix != effectivePrefix { if store.pathPrefix != effectivePrefix {
t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, store.pathPrefix) t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, store.pathPrefix)
} }
@ -2192,7 +2193,7 @@ func TestConsistentList(t *testing.T) {
transformer := &fancyTransformer{ transformer := &fancyTransformer{
transformer: &prefixTransformer{prefix: []byte(defaultTestPrefix)}, transformer: &prefixTransformer{prefix: []byte(defaultTestPrefix)},
} }
store := newStore(client, codec, newPod, "", transformer, true, NewDefaultLeaseManagerConfig()) store := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, transformer, true, NewDefaultLeaseManagerConfig())
transformer.store = store transformer.store = store
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
@ -2296,7 +2297,7 @@ func TestCount(t *testing.T) {
func TestLeaseMaxObjectCount(t *testing.T) { func TestLeaseMaxObjectCount(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
client := testserver.RunEtcd(t, nil) client := testserver.RunEtcd(t, nil)
store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, LeaseManagerConfig{ store := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, LeaseManagerConfig{
ReuseDurationSeconds: defaultLeaseReuseDurationSeconds, ReuseDurationSeconds: defaultLeaseReuseDurationSeconds,
MaxObjectCount: 2, MaxObjectCount: 2,
}) })

View File

@ -221,13 +221,13 @@ func TestWatchFromNoneZero(t *testing.T) {
func TestWatchError(t *testing.T) { func TestWatchError(t *testing.T) {
codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)} codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)}
client := testserver.RunEtcd(t, nil) client := testserver.RunEtcd(t, nil)
invalidStore := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte("test!")}, true, NewDefaultLeaseManagerConfig()) invalidStore := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, &prefixTransformer{prefix: []byte("test!")}, true, NewDefaultLeaseManagerConfig())
ctx := context.Background() ctx := context.Background()
w, err := invalidStore.Watch(ctx, "/abc", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) w, err := invalidStore.Watch(ctx, "/abc", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil { if err != nil {
t.Fatalf("Watch failed: %v", err) t.Fatalf("Watch failed: %v", err)
} }
validStore := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte("test!")}, true, NewDefaultLeaseManagerConfig()) validStore := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, &prefixTransformer{prefix: []byte("test!")}, true, NewDefaultLeaseManagerConfig())
validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate( validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate(
func(runtime.Object) (runtime.Object, error) { func(runtime.Object) (runtime.Object, error) {
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil
@ -327,7 +327,7 @@ func TestProgressNotify(t *testing.T) {
clusterConfig := testserver.NewTestConfig(t) clusterConfig := testserver.NewTestConfig(t)
clusterConfig.ExperimentalWatchProgressNotifyInterval = time.Second clusterConfig.ExperimentalWatchProgressNotifyInterval = time.Second
client := testserver.RunEtcd(t, clusterConfig) client := testserver.RunEtcd(t, clusterConfig)
store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig()) store := newStore(client, codec, newPod, "", schema.GroupResource{Resource: "pods"}, &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig())
ctx := context.Background() ctx := context.Background()
key := "/somekey" key := "/somekey"

View File

@ -276,7 +276,7 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime.
if transformer == nil { if transformer == nil {
transformer = value.IdentityTransformer transformer = value.IdentityTransformer
} }
return etcd3.New(client, c.Codec, newFunc, c.Prefix, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil return etcd3.New(client, c.Codec, newFunc, c.Prefix, c.GroupResource, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil
} }
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the // startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the

View File

@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -105,7 +106,7 @@ func newPodList() runtime.Object { return &example.PodList{} }
func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) { func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, value.IdentityTransformer, true, etcd3.NewDefaultLeaseManagerConfig()) storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, schema.GroupResource{Resource: "pods"}, value.IdentityTransformer, true, etcd3.NewDefaultLeaseManagerConfig())
return server, storage return server, storage
} }