storage: move the APIObjectVersioner definition to storage

The means by which we extract and parse the version of an API object is
not specific to etcd3. In order to allow for a generic suite of tests
against any storage.Interface imlpementation, we need this logic to live
outside of the etcd3 package, or import cycles will exist.

Signed-off-by: Steve Kuznetsov <skuznets@redhat.com>
This commit is contained in:
Steve Kuznetsov 2022-05-11 07:44:21 -07:00
parent c50579afb1
commit 3939f3003e
No known key found for this signature in database
GPG Key ID: 8821C29EC988D9B4
13 changed files with 31 additions and 33 deletions

View File

@ -23,7 +23,7 @@ import (
"time" "time"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
storageetcd3 "k8s.io/apiserver/pkg/storage/etcd3" "k8s.io/apiserver/pkg/storage"
"k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/kubelet/util"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -87,8 +87,8 @@ func isObjectOlder(newObject, oldObject runtime.Object) bool {
if newObject == nil || oldObject == nil { if newObject == nil || oldObject == nil {
return false return false
} }
newVersion, _ := storageetcd3.Versioner.ObjectResourceVersion(newObject) newVersion, _ := storage.APIObjectVersioner{}.ObjectResourceVersion(newObject)
oldVersion, _ := storageetcd3.Versioner.ObjectResourceVersion(oldObject) oldVersion, _ := storage.APIObjectVersioner{}.ObjectResourceVersion(oldObject)
return newVersion < oldVersion return newVersion < oldVersion
} }

View File

@ -30,7 +30,7 @@ import (
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage/etcd3" "k8s.io/apiserver/pkg/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
storageinformers "k8s.io/client-go/informers/storage/v1" storageinformers "k8s.io/client-go/informers/storage/v1"
@ -531,7 +531,7 @@ func (b *volumeBinder) bindAPIUpdate(pod *v1.Pod, bindings []*BindingInfo, claim
} }
var ( var (
versioner = etcd3.APIObjectVersioner{} versioner = storage.APIObjectVersioner{}
) )
// checkBindings runs through all the PVCs in the Pod and checks: // checkBindings runs through all the PVCs in the Pod and checks:

View File

@ -24,7 +24,7 @@ import (
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apiserver/pkg/storage/etcd3" "k8s.io/apiserver/pkg/storage"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
corev1listers "k8s.io/client-go/listers/core/v1" corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/utils/lru" "k8s.io/utils/lru"
@ -82,7 +82,7 @@ func (e *quotaAccessor) UpdateQuotaStatus(newQuota *corev1.ResourceQuota) error
return nil return nil
} }
var etcdVersioner = etcd3.APIObjectVersioner{} var etcdVersioner = storage.APIObjectVersioner{}
// checkCache compares the passed quota against the value in the look-aside cache and returns the newer // checkCache compares the passed quota against the value in the look-aside cache and returns the newer
// if the cache is out of date, it deletes the stale entry. This only works because of etcd resourceVersions // if the cache is out of date, it deletes the stale entry. This only works because of etcd resourceVersions

View File

@ -27,7 +27,6 @@ import (
"k8s.io/apiserver/pkg/registry/generic" "k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
cacherstorage "k8s.io/apiserver/pkg/storage/cacher" cacherstorage "k8s.io/apiserver/pkg/storage/cacher"
"k8s.io/apiserver/pkg/storage/etcd3"
"k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storage/storagebackend/factory" "k8s.io/apiserver/pkg/storage/storagebackend/factory"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
@ -56,7 +55,7 @@ func StorageWithCacher() generic.StorageDecorator {
cacherConfig := cacherstorage.Config{ cacherConfig := cacherstorage.Config{
Storage: s, Storage: s,
Versioner: etcd3.APIObjectVersioner{}, Versioner: storage.APIObjectVersioner{},
ResourcePrefix: resourcePrefix, ResourcePrefix: resourcePrefix,
KeyFunc: keyFunc, KeyFunc: keyFunc,
NewFunc: newFunc, NewFunc: newFunc,

View File

@ -29,7 +29,7 @@ import (
"time" "time"
fuzz "github.com/google/gofuzz" fuzz "github.com/google/gofuzz"
apitesting "k8s.io/apimachinery/pkg/api/apitesting" "k8s.io/apimachinery/pkg/api/apitesting"
apiequality "k8s.io/apimachinery/pkg/api/equality" apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
@ -51,7 +51,6 @@ import (
"k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
cacherstorage "k8s.io/apiserver/pkg/storage/cacher" cacherstorage "k8s.io/apiserver/pkg/storage/cacher"
"k8s.io/apiserver/pkg/storage/etcd3"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
"k8s.io/apiserver/pkg/storage/names" "k8s.io/apiserver/pkg/storage/names"
"k8s.io/apiserver/pkg/storage/storagebackend/factory" "k8s.io/apiserver/pkg/storage/storagebackend/factory"
@ -267,7 +266,7 @@ func TestStoreListResourceVersion(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
versioner := etcd3.APIObjectVersioner{} versioner := storage.APIObjectVersioner{}
rev, err := versioner.ObjectResourceVersion(obj) rev, err := versioner.ObjectResourceVersion(obj)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -2318,7 +2317,7 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
if hasCacheEnabled { if hasCacheEnabled {
config := cacherstorage.Config{ config := cacherstorage.Config{
Storage: s, Storage: s,
Versioner: etcd3.APIObjectVersioner{}, Versioner: storage.APIObjectVersioner{},
ResourcePrefix: podPrefix, ResourcePrefix: podPrefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) }, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) },
GetAttrsFunc: getPodAttrs, GetAttrsFunc: getPodAttrs,

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package etcd3 package storage
import ( import (
"fmt" "fmt"
@ -23,7 +23,6 @@ import (
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apiserver/pkg/storage"
) )
// APIObjectVersioner implements versioning and extracting etcd node information // APIObjectVersioner implements versioning and extracting etcd node information
@ -94,7 +93,7 @@ func (a APIObjectVersioner) ParseResourceVersion(resourceVersion string) (uint64
} }
version, err := strconv.ParseUint(resourceVersion, 10, 64) version, err := strconv.ParseUint(resourceVersion, 10, 64)
if err != nil { if err != nil {
return 0, storage.NewInvalidError(field.ErrorList{ return 0, NewInvalidError(field.ErrorList{
// Validation errors are supposed to return version-specific field // Validation errors are supposed to return version-specific field
// paths, but this is probably close enough. // paths, but this is probably close enough.
field.Invalid(field.NewPath("resourceVersion"), resourceVersion, err.Error()), field.Invalid(field.NewPath("resourceVersion"), resourceVersion, err.Error()),
@ -104,17 +103,17 @@ func (a APIObjectVersioner) ParseResourceVersion(resourceVersion string) (uint64
} }
// Versioner implements Versioner // Versioner implements Versioner
var Versioner storage.Versioner = APIObjectVersioner{} var _ Versioner = APIObjectVersioner{}
// CompareResourceVersion compares etcd resource versions. Outside this API they are all strings, // CompareResourceVersion compares etcd resource versions. Outside this API they are all strings,
// but etcd resource versions are special, they're actually ints, so we can easily compare them. // but etcd resource versions are special, they're actually ints, so we can easily compare them.
func (a APIObjectVersioner) CompareResourceVersion(lhs, rhs runtime.Object) int { func (a APIObjectVersioner) CompareResourceVersion(lhs, rhs runtime.Object) int {
lhsVersion, err := Versioner.ObjectResourceVersion(lhs) lhsVersion, err := a.ObjectResourceVersion(lhs)
if err != nil { if err != nil {
// coder error // coder error
panic(err) panic(err)
} }
rhsVersion, err := Versioner.ObjectResourceVersion(rhs) rhsVersion, err := a.ObjectResourceVersion(rhs)
if err != nil { if err != nil {
// coder error // coder error
panic(err) panic(err)

View File

@ -14,13 +14,12 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package etcd3 package storage
import ( import (
"testing" "testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/storage"
storagetesting "k8s.io/apiserver/pkg/storage/testing" storagetesting "k8s.io/apiserver/pkg/storage/testing"
) )
@ -65,7 +64,7 @@ func TestEtcdParseResourceVersion(t *testing.T) {
switch { switch {
case testCase.Err && err == nil: case testCase.Err && err == nil:
t.Errorf("%s[%v]: unexpected non-error", testCase.Version, i) t.Errorf("%s[%v]: unexpected non-error", testCase.Version, i)
case testCase.Err && !storage.IsInvalidError(err): case testCase.Err && !IsInvalidError(err):
t.Errorf("%s[%v]: unexpected error: %v", testCase.Version, i, err) t.Errorf("%s[%v]: unexpected error: %v", testCase.Version, i, err)
case !testCase.Err && err != nil: case !testCase.Err && err != nil:
t.Errorf("%s[%v]: unexpected error: %v", testCase.Version, i, err) t.Errorf("%s[%v]: unexpected error: %v", testCase.Version, i, err)

View File

@ -35,7 +35,6 @@ import (
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/apis/example" "k8s.io/apiserver/pkg/apis/example"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
testingclock "k8s.io/utils/clock/testing" testingclock "k8s.io/utils/clock/testing"
) )
@ -111,7 +110,7 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache {
} }
return labels.Set(pod.Labels), fields.Set{"spec.nodeName": pod.Spec.NodeName}, nil return labels.Set(pod.Labels), fields.Set{"spec.nodeName": pod.Spec.NodeName}, nil
} }
versioner := etcd3.APIObjectVersioner{} versioner := storage.APIObjectVersioner{}
mockHandler := func(*watchCacheEvent) {} mockHandler := func(*watchCacheEvent) {}
wc := newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), reflect.TypeOf(&example.Pod{})) wc := newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), reflect.TypeOf(&example.Pod{}))
// To preserve behavior of tests that assume a given capacity, // To preserve behavior of tests that assume a given capacity,

View File

@ -98,7 +98,7 @@ func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object,
} }
func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store { func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store {
versioner := APIObjectVersioner{} versioner := storage.APIObjectVersioner{}
result := &store{ result := &store{
client: c, client: c,
codec: codec, codec: codec,

View File

@ -1463,7 +1463,7 @@ func TestListInconsistentContinuation(t *testing.T) {
} }
// compact to latest revision. // compact to latest revision.
versioner := APIObjectVersioner{} versioner := storage.APIObjectVersioner{}
lastRVString := preset[2].storedObj.ResourceVersion lastRVString := preset[2].storedObj.ResourceVersion
lastRV, err := versioner.ParseResourceVersion(lastRVString) lastRV, err := versioner.ParseResourceVersion(lastRVString)
if err != nil { if err != nil {

View File

@ -170,7 +170,7 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Watch failed: %v", err) t.Fatalf("Watch failed: %v", err)
} }
rv, err := APIObjectVersioner{}.ObjectResourceVersion(storedObj) rv, err := storage.APIObjectVersioner{}.ObjectResourceVersion(storedObj)
if err != nil { if err != nil {
t.Fatalf("failed to parse resourceVersion on stored object: %v", err) t.Fatalf("failed to parse resourceVersion on stored object: %v", err)
} }
@ -282,7 +282,7 @@ func (c *testCodec) Decode(data []byte, defaults *schema.GroupVersionKind, into
// new as the sentinel or newer generate no error. // new as the sentinel or newer generate no error.
func resourceVersionNotOlderThan(sentinel string) func(string) error { func resourceVersionNotOlderThan(sentinel string) func(string) error {
return func(resourceVersion string) error { return func(resourceVersion string) error {
objectVersioner := APIObjectVersioner{} objectVersioner := storage.APIObjectVersioner{}
actualRV, err := objectVersioner.ParseResourceVersion(resourceVersion) actualRV, err := objectVersioner.ParseResourceVersion(resourceVersion)
if err != nil { if err != nil {
return err return err

View File

@ -26,7 +26,7 @@ import (
"testing" "testing"
"time" "time"
apitesting "k8s.io/apimachinery/pkg/api/apitesting" "k8s.io/apimachinery/pkg/api/apitesting"
apiequality "k8s.io/apimachinery/pkg/api/equality" apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
@ -117,7 +117,7 @@ func newTestCacher(s storage.Interface) (*cacherstorage.Cacher, storage.Versione
func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*cacherstorage.Cacher, storage.Versioner, error) { func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*cacherstorage.Cacher, storage.Versioner, error) {
prefix := "pods" prefix := "pods"
v := etcd3.APIObjectVersioner{} v := storage.APIObjectVersioner{}
config := cacherstorage.Config{ config := cacherstorage.Config{
Storage: s, Storage: s,
Versioner: v, Versioner: v,

View File

@ -29,7 +29,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage/etcd3" "k8s.io/apiserver/pkg/storage"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options" "k8s.io/kubernetes/cmd/kube-apiserver/app/options"
) )
@ -119,11 +119,14 @@ func TestCrossGroupStorage(t *testing.T) {
} }
} }
versioner := etcd3.APIObjectVersioner{} versioner := storage.APIObjectVersioner{}
for _, resource := range resources { for _, resource := range resources {
// clear out the things cleared in etcd // clear out the things cleared in etcd
versioned := versionedData[resource.Mapping.Resource] versioned := versionedData[resource.Mapping.Resource]
versioner.PrepareObjectForStorage(versioned) if err := versioner.PrepareObjectForStorage(versioned); err != nil {
t.Error(err)
continue
}
versionedJSON, err := versioned.MarshalJSON() versionedJSON, err := versioned.MarshalJSON()
if err != nil { if err != nil {
t.Error(err) t.Error(err)