From 3939f3003e9605c06f65e64d1fc6f94b294f9d97 Mon Sep 17 00:00:00 2001 From: Steve Kuznetsov Date: Wed, 11 May 2022 07:44:21 -0700 Subject: [PATCH] storage: move the APIObjectVersioner definition to storage The means by which we extract and parse the version of an API object is not specific to etcd3. In order to allow for a generic suite of tests against any storage.Interface imlpementation, we need this logic to live outside of the etcd3 package, or import cycles will exist. Signed-off-by: Steve Kuznetsov --- pkg/kubelet/util/manager/cache_based_manager.go | 6 +++--- .../framework/plugins/volumebinding/binder.go | 4 ++-- .../admission/plugin/resourcequota/resource_access.go | 4 ++-- .../pkg/registry/generic/registry/storage_factory.go | 3 +-- .../pkg/registry/generic/registry/store_test.go | 7 +++---- .../pkg/storage/{etcd3 => }/api_object_versioner.go | 11 +++++------ .../storage/{etcd3 => }/api_object_versioner_test.go | 5 ++--- .../apiserver/pkg/storage/cacher/watch_cache_test.go | 3 +-- .../src/k8s.io/apiserver/pkg/storage/etcd3/store.go | 2 +- .../k8s.io/apiserver/pkg/storage/etcd3/store_test.go | 2 +- .../apiserver/pkg/storage/etcd3/watcher_test.go | 4 ++-- .../k8s.io/apiserver/pkg/storage/tests/cacher_test.go | 4 ++-- test/integration/etcd/etcd_cross_group_test.go | 9 ++++++--- 13 files changed, 31 insertions(+), 33 deletions(-) rename staging/src/k8s.io/apiserver/pkg/storage/{etcd3 => }/api_object_versioner.go (93%) rename staging/src/k8s.io/apiserver/pkg/storage/{etcd3 => }/api_object_versioner_test.go (96%) diff --git a/pkg/kubelet/util/manager/cache_based_manager.go b/pkg/kubelet/util/manager/cache_based_manager.go index c291cfe91d9..a255f364442 100644 --- a/pkg/kubelet/util/manager/cache_based_manager.go +++ b/pkg/kubelet/util/manager/cache_based_manager.go @@ -23,7 +23,7 @@ import ( "time" "k8s.io/api/core/v1" - storageetcd3 "k8s.io/apiserver/pkg/storage/etcd3" + "k8s.io/apiserver/pkg/storage" "k8s.io/kubernetes/pkg/kubelet/util" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -87,8 +87,8 @@ func isObjectOlder(newObject, oldObject runtime.Object) bool { if newObject == nil || oldObject == nil { return false } - newVersion, _ := storageetcd3.Versioner.ObjectResourceVersion(newObject) - oldVersion, _ := storageetcd3.Versioner.ObjectResourceVersion(oldObject) + newVersion, _ := storage.APIObjectVersioner{}.ObjectResourceVersion(newObject) + oldVersion, _ := storage.APIObjectVersioner{}.ObjectResourceVersion(oldObject) return newVersion < oldVersion } diff --git a/pkg/scheduler/framework/plugins/volumebinding/binder.go b/pkg/scheduler/framework/plugins/volumebinding/binder.go index 14d6a85e8d3..ac1f02e640d 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/binder.go +++ b/pkg/scheduler/framework/plugins/volumebinding/binder.go @@ -30,7 +30,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apiserver/pkg/storage/etcd3" + "k8s.io/apiserver/pkg/storage" utilfeature "k8s.io/apiserver/pkg/util/feature" coreinformers "k8s.io/client-go/informers/core/v1" storageinformers "k8s.io/client-go/informers/storage/v1" @@ -531,7 +531,7 @@ func (b *volumeBinder) bindAPIUpdate(pod *v1.Pod, bindings []*BindingInfo, claim } var ( - versioner = etcd3.APIObjectVersioner{} + versioner = storage.APIObjectVersioner{} ) // checkBindings runs through all the PVCs in the Pod and checks: diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/resourcequota/resource_access.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/resourcequota/resource_access.go index 735ba8f2cf1..f09b46268c8 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/plugin/resourcequota/resource_access.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/resourcequota/resource_access.go @@ -24,7 +24,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apiserver/pkg/storage/etcd3" + "k8s.io/apiserver/pkg/storage" "k8s.io/client-go/kubernetes" corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/utils/lru" @@ -82,7 +82,7 @@ func (e *quotaAccessor) UpdateQuotaStatus(newQuota *corev1.ResourceQuota) error return nil } -var etcdVersioner = etcd3.APIObjectVersioner{} +var etcdVersioner = storage.APIObjectVersioner{} // checkCache compares the passed quota against the value in the look-aside cache and returns the newer // if the cache is out of date, it deletes the stale entry. This only works because of etcd resourceVersions diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go index 075170f14e4..e3d25fb98aa 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go @@ -27,7 +27,6 @@ import ( "k8s.io/apiserver/pkg/registry/generic" "k8s.io/apiserver/pkg/storage" cacherstorage "k8s.io/apiserver/pkg/storage/cacher" - "k8s.io/apiserver/pkg/storage/etcd3" "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storage/storagebackend/factory" "k8s.io/client-go/tools/cache" @@ -56,7 +55,7 @@ func StorageWithCacher() generic.StorageDecorator { cacherConfig := cacherstorage.Config{ Storage: s, - Versioner: etcd3.APIObjectVersioner{}, + Versioner: storage.APIObjectVersioner{}, ResourcePrefix: resourcePrefix, KeyFunc: keyFunc, NewFunc: newFunc, diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go index c7c6bef3c16..de1fdb6373e 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go @@ -29,7 +29,7 @@ import ( "time" fuzz "github.com/google/gofuzz" - apitesting "k8s.io/apimachinery/pkg/api/apitesting" + "k8s.io/apimachinery/pkg/api/apitesting" apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -51,7 +51,6 @@ import ( "k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/storage" cacherstorage "k8s.io/apiserver/pkg/storage/cacher" - "k8s.io/apiserver/pkg/storage/etcd3" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" "k8s.io/apiserver/pkg/storage/names" "k8s.io/apiserver/pkg/storage/storagebackend/factory" @@ -267,7 +266,7 @@ func TestStoreListResourceVersion(t *testing.T) { t.Fatal(err) } - versioner := etcd3.APIObjectVersioner{} + versioner := storage.APIObjectVersioner{} rev, err := versioner.ObjectResourceVersion(obj) if err != nil { t.Fatal(err) @@ -2318,7 +2317,7 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE if hasCacheEnabled { config := cacherstorage.Config{ Storage: s, - Versioner: etcd3.APIObjectVersioner{}, + Versioner: storage.APIObjectVersioner{}, ResourcePrefix: podPrefix, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) }, GetAttrsFunc: getPodAttrs, diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/api_object_versioner.go b/staging/src/k8s.io/apiserver/pkg/storage/api_object_versioner.go similarity index 93% rename from staging/src/k8s.io/apiserver/pkg/storage/etcd3/api_object_versioner.go rename to staging/src/k8s.io/apiserver/pkg/storage/api_object_versioner.go index a5e88fd01c5..9c77b09022d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/api_object_versioner.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/api_object_versioner.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package etcd3 +package storage import ( "fmt" @@ -23,7 +23,6 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/validation/field" - "k8s.io/apiserver/pkg/storage" ) // APIObjectVersioner implements versioning and extracting etcd node information @@ -94,7 +93,7 @@ func (a APIObjectVersioner) ParseResourceVersion(resourceVersion string) (uint64 } version, err := strconv.ParseUint(resourceVersion, 10, 64) if err != nil { - return 0, storage.NewInvalidError(field.ErrorList{ + return 0, NewInvalidError(field.ErrorList{ // Validation errors are supposed to return version-specific field // paths, but this is probably close enough. field.Invalid(field.NewPath("resourceVersion"), resourceVersion, err.Error()), @@ -104,17 +103,17 @@ func (a APIObjectVersioner) ParseResourceVersion(resourceVersion string) (uint64 } // Versioner implements Versioner -var Versioner storage.Versioner = APIObjectVersioner{} +var _ Versioner = APIObjectVersioner{} // CompareResourceVersion compares etcd resource versions. Outside this API they are all strings, // but etcd resource versions are special, they're actually ints, so we can easily compare them. func (a APIObjectVersioner) CompareResourceVersion(lhs, rhs runtime.Object) int { - lhsVersion, err := Versioner.ObjectResourceVersion(lhs) + lhsVersion, err := a.ObjectResourceVersion(lhs) if err != nil { // coder error panic(err) } - rhsVersion, err := Versioner.ObjectResourceVersion(rhs) + rhsVersion, err := a.ObjectResourceVersion(rhs) if err != nil { // coder error panic(err) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/api_object_versioner_test.go b/staging/src/k8s.io/apiserver/pkg/storage/api_object_versioner_test.go similarity index 96% rename from staging/src/k8s.io/apiserver/pkg/storage/etcd3/api_object_versioner_test.go rename to staging/src/k8s.io/apiserver/pkg/storage/api_object_versioner_test.go index 9f8acce24e1..a3586aed67b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/api_object_versioner_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/api_object_versioner_test.go @@ -14,13 +14,12 @@ See the License for the specific language governing permissions and limitations under the License. */ -package etcd3 +package storage import ( "testing" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apiserver/pkg/storage" storagetesting "k8s.io/apiserver/pkg/storage/testing" ) @@ -65,7 +64,7 @@ func TestEtcdParseResourceVersion(t *testing.T) { switch { case testCase.Err && err == nil: t.Errorf("%s[%v]: unexpected non-error", testCase.Version, i) - case testCase.Err && !storage.IsInvalidError(err): + case testCase.Err && !IsInvalidError(err): t.Errorf("%s[%v]: unexpected error: %v", testCase.Version, i, err) case !testCase.Err && err != nil: t.Errorf("%s[%v]: unexpected error: %v", testCase.Version, i, err) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index 29e99f6891c..6f31c6c0abf 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -35,7 +35,6 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/apis/example" "k8s.io/apiserver/pkg/storage" - "k8s.io/apiserver/pkg/storage/etcd3" "k8s.io/client-go/tools/cache" testingclock "k8s.io/utils/clock/testing" ) @@ -111,7 +110,7 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache { } return labels.Set(pod.Labels), fields.Set{"spec.nodeName": pod.Spec.NodeName}, nil } - versioner := etcd3.APIObjectVersioner{} + versioner := storage.APIObjectVersioner{} mockHandler := func(*watchCacheEvent) {} wc := newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), reflect.TypeOf(&example.Pod{})) // To preserve behavior of tests that assume a given capacity, diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 629502448a4..bb9397cb4b6 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -98,7 +98,7 @@ func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, } func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store { - versioner := APIObjectVersioner{} + versioner := storage.APIObjectVersioner{} result := &store{ client: c, codec: codec, diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index f371558784d..9ffe59bf2a0 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -1463,7 +1463,7 @@ func TestListInconsistentContinuation(t *testing.T) { } // compact to latest revision. - versioner := APIObjectVersioner{} + versioner := storage.APIObjectVersioner{} lastRVString := preset[2].storedObj.ResourceVersion lastRV, err := versioner.ParseResourceVersion(lastRVString) if err != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index f009055698d..1971b28e9ee 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -170,7 +170,7 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) { if err != nil { t.Fatalf("Watch failed: %v", err) } - rv, err := APIObjectVersioner{}.ObjectResourceVersion(storedObj) + rv, err := storage.APIObjectVersioner{}.ObjectResourceVersion(storedObj) if err != nil { t.Fatalf("failed to parse resourceVersion on stored object: %v", err) } @@ -282,7 +282,7 @@ func (c *testCodec) Decode(data []byte, defaults *schema.GroupVersionKind, into // new as the sentinel or newer generate no error. func resourceVersionNotOlderThan(sentinel string) func(string) error { return func(resourceVersion string) error { - objectVersioner := APIObjectVersioner{} + objectVersioner := storage.APIObjectVersioner{} actualRV, err := objectVersioner.ParseResourceVersion(resourceVersion) if err != nil { return err diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index 93bea21fc1a..0b036c36809 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -26,7 +26,7 @@ import ( "testing" "time" - apitesting "k8s.io/apimachinery/pkg/api/apitesting" + "k8s.io/apimachinery/pkg/api/apitesting" apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -117,7 +117,7 @@ func newTestCacher(s storage.Interface) (*cacherstorage.Cacher, storage.Versione func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*cacherstorage.Cacher, storage.Versioner, error) { prefix := "pods" - v := etcd3.APIObjectVersioner{} + v := storage.APIObjectVersioner{} config := cacherstorage.Config{ Storage: s, Versioner: v, diff --git a/test/integration/etcd/etcd_cross_group_test.go b/test/integration/etcd/etcd_cross_group_test.go index 4f5651cad34..d247edeaa5e 100644 --- a/test/integration/etcd/etcd_cross_group_test.go +++ b/test/integration/etcd/etcd_cross_group_test.go @@ -29,7 +29,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/watch" - "k8s.io/apiserver/pkg/storage/etcd3" + "k8s.io/apiserver/pkg/storage" "k8s.io/client-go/dynamic" "k8s.io/kubernetes/cmd/kube-apiserver/app/options" ) @@ -119,11 +119,14 @@ func TestCrossGroupStorage(t *testing.T) { } } - versioner := etcd3.APIObjectVersioner{} + versioner := storage.APIObjectVersioner{} for _, resource := range resources { // clear out the things cleared in etcd versioned := versionedData[resource.Mapping.Resource] - versioner.PrepareObjectForStorage(versioned) + if err := versioner.PrepareObjectForStorage(versioned); err != nil { + t.Error(err) + continue + } versionedJSON, err := versioned.MarshalJSON() if err != nil { t.Error(err)