From 9a253d896a096b4e1ffccf4b1f84e5cac1e1aad0 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Wed, 26 Jul 2023 15:53:13 +0200 Subject: [PATCH] storage/util: move GetCurrentResourceVersionFromStorage --- .../apiserver/pkg/storage/cacher/cacher.go | 41 +------- .../storage/cacher/cacher_whitebox_test.go | 74 -------------- .../src/k8s.io/apiserver/pkg/storage/util.go | 43 ++++++++ .../k8s.io/apiserver/pkg/storage/util_test.go | 98 ++++++++++++++++++- 4 files changed, 141 insertions(+), 115 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 0796f591d7f..6ddfab85a15 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -21,7 +21,6 @@ import ( "fmt" "net/http" "reflect" - "strconv" "sync" "time" @@ -773,7 +772,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio return c.storage.GetList(ctx, key, opts, listObj) } if listRV == 0 && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) { - listRV, err = c.getCurrentResourceVersionFromStorage(ctx) + listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String()) if err != nil { return err } @@ -1249,42 +1248,6 @@ func (c *Cacher) LastSyncResourceVersion() (uint64, error) { return c.versioner.ParseResourceVersion(resourceVersion) } -// getCurrentResourceVersionFromStorage gets the current resource version from the underlying storage engine. -// this method issues an empty list request and reads only the ResourceVersion from the object metadata -func (c *Cacher) getCurrentResourceVersionFromStorage(ctx context.Context) (uint64, error) { - if c.newListFunc == nil { - return 0, fmt.Errorf("newListFunction wasn't provided for %v", c.objectType) - } - emptyList := c.newListFunc() - pred := storage.SelectionPredicate{ - Label: labels.Everything(), - Field: fields.Everything(), - Limit: 1, // just in case we actually hit something - } - - err := c.storage.GetList(ctx, c.resourcePrefix, storage.ListOptions{Predicate: pred}, emptyList) - if err != nil { - return 0, err - } - emptyListAccessor, err := meta.ListAccessor(emptyList) - if err != nil { - return 0, err - } - if emptyListAccessor == nil { - return 0, fmt.Errorf("unable to extract a list accessor from %T", emptyList) - } - - currentResourceVersion, err := strconv.Atoi(emptyListAccessor.GetResourceVersion()) - if err != nil { - return 0, err - } - - if currentResourceVersion == 0 { - return 0, fmt.Errorf("the current resource version must be greater than 0") - } - return uint64(currentResourceVersion), nil -} - // getBookmarkAfterResourceVersionLockedFunc returns a function that // spits a ResourceVersion after which the bookmark event will be delivered. // @@ -1318,7 +1281,7 @@ func (c *Cacher) getStartResourceVersionForWatchLockedFunc(ctx context.Context, func (c *Cacher) getCommonResourceVersionLockedFunc(ctx context.Context, parsedWatchResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) { switch { case len(opts.ResourceVersion) == 0: - rv, err := c.getCurrentResourceVersionFromStorage(ctx) + rv, err := storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String()) if err != nil { return nil, err } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index ebaad698d73..eecf2ddfc98 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -30,7 +30,6 @@ import ( "github.com/stretchr/testify/require" - "k8s.io/apimachinery/pkg/api/apitesting" apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -42,12 +41,9 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/apis/example" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" - example2v1 "k8s.io/apiserver/pkg/apis/example2/v1" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" - "k8s.io/apiserver/pkg/storage/etcd3" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" - "k8s.io/apiserver/pkg/storage/value/encrypt/identity" utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/utils/clock" @@ -1814,76 +1810,6 @@ func TestCacherWatchSemantics(t *testing.T) { } } -func TestGetCurrentResourceVersionFromStorage(t *testing.T) { - // test data - newEtcdTestStorage := func(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) { - server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) - storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion, example2v1.SchemeGroupVersion), func() runtime.Object { return &example.Pod{} }, prefix, schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), true, etcd3.NewDefaultLeaseManagerConfig()) - return server, storage - } - server, etcdStorage := newEtcdTestStorage(t, "") - defer server.Terminate(t) - podCacher, versioner, err := newTestCacher(etcdStorage) - if err != nil { - t.Fatalf("Couldn't create podCacher: %v", err) - } - defer podCacher.Stop() - - makePod := func(name string) *example.Pod { - return &example.Pod{ - ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name}, - } - } - createPod := func(obj *example.Pod) *example.Pod { - key := "pods/" + obj.Namespace + "/" + obj.Name - out := &example.Pod{} - err := etcdStorage.Create(context.TODO(), key, obj, out, 0) - require.NoError(t, err) - return out - } - getPod := func(name, ns string) *example.Pod { - key := "pods/" + ns + "/" + name - out := &example.Pod{} - err := etcdStorage.Get(context.TODO(), key, storage.GetOptions{}, out) - require.NoError(t, err) - return out - } - makeReplicaSet := func(name string) *example2v1.ReplicaSet { - return &example2v1.ReplicaSet{ - ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name}, - } - } - createReplicaSet := func(obj *example2v1.ReplicaSet) *example2v1.ReplicaSet { - key := "replicasets/" + obj.Namespace + "/" + obj.Name - out := &example2v1.ReplicaSet{} - err := etcdStorage.Create(context.TODO(), key, obj, out, 0) - require.NoError(t, err) - return out - } - - // create a pod and make sure its RV is equal to the one maintained by etcd - pod := createPod(makePod("pod-1")) - currentStorageRV, err := podCacher.getCurrentResourceVersionFromStorage(context.TODO()) - require.NoError(t, err) - podRV, err := versioner.ParseResourceVersion(pod.ResourceVersion) - require.NoError(t, err) - require.Equal(t, currentStorageRV, podRV, "expected the global etcd RV to be equal to pod's RV") - - // now create a replicaset (new resource) and make sure the target function returns global etcd RV - rs := createReplicaSet(makeReplicaSet("replicaset-1")) - currentStorageRV, err = podCacher.getCurrentResourceVersionFromStorage(context.TODO()) - require.NoError(t, err) - rsRV, err := versioner.ParseResourceVersion(rs.ResourceVersion) - require.NoError(t, err) - require.Equal(t, currentStorageRV, rsRV, "expected the global etcd RV to be equal to replicaset's RV") - - // ensure that the pod's RV hasn't been changed - currentPod := getPod(pod.Name, pod.Namespace) - currentPodRV, err := versioner.ParseResourceVersion(currentPod.ResourceVersion) - require.NoError(t, err) - require.Equal(t, currentPodRV, podRV, "didn't expect to see the pod's RV changed") -} - func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)() backingStorage := &dummyStorage{} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/util.go b/staging/src/k8s.io/apiserver/pkg/storage/util.go index 9da8d9713c1..460879bff88 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/util.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/util.go @@ -17,11 +17,15 @@ limitations under the License. package storage import ( + "context" "fmt" + "strconv" "sync/atomic" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/validation/path" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" ) @@ -79,3 +83,42 @@ func (hwm *HighWaterMark) Update(current int64) bool { } } } + +// GetCurrentResourceVersionFromStorage gets the current resource version from the underlying storage engine. +// This method issues an empty list request and reads only the ResourceVersion from the object metadata +func GetCurrentResourceVersionFromStorage(ctx context.Context, storage Interface, newListFunc func() runtime.Object, resourcePrefix, objectType string) (uint64, error) { + if storage == nil { + return 0, fmt.Errorf("storage wasn't provided for %s", objectType) + } + if newListFunc == nil { + return 0, fmt.Errorf("newListFunction wasn't provided for %s", objectType) + } + emptyList := newListFunc() + pred := SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + Limit: 1, // just in case we actually hit something + } + + err := storage.GetList(ctx, resourcePrefix, ListOptions{Predicate: pred}, emptyList) + if err != nil { + return 0, err + } + emptyListAccessor, err := meta.ListAccessor(emptyList) + if err != nil { + return 0, err + } + if emptyListAccessor == nil { + return 0, fmt.Errorf("unable to extract a list accessor from %T", emptyList) + } + + currentResourceVersion, err := strconv.Atoi(emptyListAccessor.GetResourceVersion()) + if err != nil { + return 0, err + } + + if currentResourceVersion == 0 { + return 0, fmt.Errorf("the current resource version must be greater than 0") + } + return uint64(currentResourceVersion), nil +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/util_test.go b/staging/src/k8s.io/apiserver/pkg/storage/util_test.go index c8043d5cadf..a8eb2fe3384 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/util_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/util_test.go @@ -14,16 +14,44 @@ See the License for the specific language governing permissions and limitations under the License. */ -package storage +package storage_test import ( + "context" "math/rand" "sync" "testing" + + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/api/apitesting" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apiserver/pkg/apis/example" + examplev1 "k8s.io/apiserver/pkg/apis/example/v1" + example2v1 "k8s.io/apiserver/pkg/apis/example2/v1" + "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/etcd3" + etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" + "k8s.io/apiserver/pkg/storage/value/encrypt/identity" ) +var ( + scheme = runtime.NewScheme() + codecs = serializer.NewCodecFactory(scheme) +) + +func init() { + metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion) + utilruntime.Must(example.AddToScheme(scheme)) + utilruntime.Must(examplev1.AddToScheme(scheme)) + utilruntime.Must(example2v1.AddToScheme(scheme)) +} + func TestHighWaterMark(t *testing.T) { - var h HighWaterMark + var h storage.HighWaterMark for i := int64(10); i < 20; i++ { if !h.Update(i) { @@ -52,3 +80,69 @@ func TestHighWaterMark(t *testing.T) { t.Errorf("unexpected value, wanted %v, got %v", m, int64(h)) } } + +func TestGetCurrentResourceVersionFromStorage(t *testing.T) { + // test data + newEtcdTestStorage := func(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) { + server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) + storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion, example2v1.SchemeGroupVersion), func() runtime.Object { return &example.Pod{} }, prefix, schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), true, etcd3.NewDefaultLeaseManagerConfig()) + return server, storage + } + server, etcdStorage := newEtcdTestStorage(t, "") + defer server.Terminate(t) + versioner := storage.APIObjectVersioner{} + + makePod := func(name string) *example.Pod { + return &example.Pod{ + ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name}, + } + } + createPod := func(obj *example.Pod) *example.Pod { + key := "pods/" + obj.Namespace + "/" + obj.Name + out := &example.Pod{} + err := etcdStorage.Create(context.TODO(), key, obj, out, 0) + require.NoError(t, err) + return out + } + getPod := func(name, ns string) *example.Pod { + key := "pods/" + ns + "/" + name + out := &example.Pod{} + err := etcdStorage.Get(context.TODO(), key, storage.GetOptions{}, out) + require.NoError(t, err) + return out + } + makeReplicaSet := func(name string) *example2v1.ReplicaSet { + return &example2v1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name}, + } + } + createReplicaSet := func(obj *example2v1.ReplicaSet) *example2v1.ReplicaSet { + key := "replicasets/" + obj.Namespace + "/" + obj.Name + out := &example2v1.ReplicaSet{} + err := etcdStorage.Create(context.TODO(), key, obj, out, 0) + require.NoError(t, err) + return out + } + + // create a pod and make sure its RV is equal to the one maintained by etcd + pod := createPod(makePod("pod-1")) + currentStorageRV, err := storage.GetCurrentResourceVersionFromStorage(context.TODO(), etcdStorage, func() runtime.Object { return &example.PodList{} }, "/pods", "Pod") + require.NoError(t, err) + podRV, err := versioner.ParseResourceVersion(pod.ResourceVersion) + require.NoError(t, err) + require.Equal(t, currentStorageRV, podRV, "expected the global etcd RV to be equal to pod's RV") + + // now create a replicaset (new resource) and make sure the target function returns global etcd RV + rs := createReplicaSet(makeReplicaSet("replicaset-1")) + currentStorageRV, err = storage.GetCurrentResourceVersionFromStorage(context.TODO(), etcdStorage, func() runtime.Object { return &example.PodList{} }, "/pods", "Pod") + require.NoError(t, err) + rsRV, err := versioner.ParseResourceVersion(rs.ResourceVersion) + require.NoError(t, err) + require.Equal(t, currentStorageRV, rsRV, "expected the global etcd RV to be equal to replicaset's RV") + + // ensure that the pod's RV hasn't been changed + currentPod := getPod(pod.Name, pod.Namespace) + currentPodRV, err := versioner.ParseResourceVersion(currentPod.ResourceVersion) + require.NoError(t, err) + require.Equal(t, currentPodRV, podRV, "didn't expect to see the pod's RV changed") +}