diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 1f5e71ad593..1e43aaef388 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -1248,7 +1248,7 @@ func (c *Cacher) getWatchCacheResourceVersion(ctx context.Context, parsedWatchRe if !utilfeature.DefaultFeatureGate.Enabled(features.WatchFromStorageWithoutResourceVersion) && opts.SendInitialEvents == nil && opts.ResourceVersion == "" { return 0, nil } - rv, err := storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String()) + rv, err := c.storage.GetCurrentResourceVersion(ctx) return rv, err } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 27c36cc2b62..fd81aa682f6 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -113,6 +113,7 @@ type dummyStorage struct { sync.RWMutex err error getListFn func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error + getRVFn func(_ context.Context) (uint64, error) watchFn func(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) // use getRequestWatchProgressCounter when reading @@ -199,6 +200,13 @@ func (d *dummyStorage) injectError(err error) { d.err = err } +func (d *dummyStorage) GetCurrentResourceVersion(ctx context.Context) (uint64, error) { + if d.getRVFn != nil { + return d.getRVFn(ctx) + } + return 100, nil +} + func TestGetListCacheBypass(t *testing.T) { type opts struct { ResourceVersion string @@ -463,6 +471,14 @@ apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", su podList.ResourceVersion = tc.storageRV return nil } + backingStorage.getRVFn = func(_ context.Context) (uint64, error) { + requestToStorageCount += 1 + rv, err := strconv.Atoi(tc.storageRV) + if err != nil { + t.Fatalf("failed to parse RV: %s", err) + } + return uint64(rv), nil + } result := &example.PodList{} ctx, clockStepCancelFn := context.WithTimeout(context.TODO(), time.Minute) @@ -2111,6 +2127,15 @@ func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) { listAccessor.SetResourceVersion("105") return nil } + s.getRVFn = func(_ context.Context) (uint64, error) { + // the first call to this function + // primes the cacher + if !hasBeenPrimed { + hasBeenPrimed = true + return 100, nil + } + return 105, nil + } return s }(), verifyBackingStore: func(t *testing.T, s *dummyStorage) { @@ -2146,6 +2171,15 @@ func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) { listAccessor.SetResourceVersion("105") return nil } + s.getRVFn = func(_ context.Context) (uint64, error) { + // the first call to this function + // primes the cacher + if !hasBeenPrimed { + hasBeenPrimed = true + return 100, nil + } + return 105, nil + } return s }(), verifyBackingStore: func(t *testing.T, s *dummyStorage) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go index 79824f6d67a..b1a5044598d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go @@ -57,6 +57,10 @@ func (c *CacheDelegator) Create(ctx context.Context, key string, obj, out runtim return c.storage.Create(ctx, key, obj, out, ttl) } +func (c *CacheDelegator) GetCurrentResourceVersion(ctx context.Context) (uint64, error) { + return c.storage.GetCurrentResourceVersion(ctx) +} + func (c *CacheDelegator) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object, opts storage.DeleteOptions) error { // Ignore the suggestion and try to pass down the current version of the object // read from cache. @@ -160,7 +164,7 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) consistentRead := opts.ResourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported if consistentRead { - listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.cacher.newListFunc, c.cacher.resourcePrefix, c.cacher.objectType.String()) + listRV, err = c.storage.GetCurrentResourceVersion(ctx) if err != nil { return err } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index f38b160aa15..4adf797d8ca 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -22,6 +22,7 @@ import ( "fmt" "path" "reflect" + "strconv" "strings" "time" @@ -35,6 +36,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/conversion" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" @@ -84,6 +87,9 @@ type store struct { leaseManager *leaseManager decoder Decoder listErrAggrFactory func() ListErrorAggregator + + resourcePrefix string + newListFunc func() runtime.Object } func (s *store) RequestWatchProgress(ctx context.Context) error { @@ -185,10 +191,13 @@ func newStore(c *kubernetes.Client, codec runtime.Codec, newFunc, newListFunc fu leaseManager: newDefaultLeaseManager(c.Client, leaseManagerConfig), decoder: decoder, listErrAggrFactory: listErrAggrFactory, + + resourcePrefix: resourcePrefix, + newListFunc: newListFunc, } w.getCurrentStorageRV = func(ctx context.Context) (uint64, error) { - return storage.GetCurrentResourceVersionFromStorage(ctx, s, newListFunc, resourcePrefix, w.objectType) + return s.GetCurrentResourceVersion(ctx) } if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) || utilfeature.DefaultFeatureGate.Enabled(features.WatchList) { etcdfeature.DefaultFeatureSupportChecker.CheckClient(c.Ctx(), c, storage.RequestWatchProgress) @@ -677,6 +686,37 @@ func (s *store) resolveGetListRev(continueKey string, continueRV int64, opts sto return withRev, nil } +func (s *store) GetCurrentResourceVersion(ctx context.Context) (uint64, error) { + emptyList := s.newListFunc() + pred := storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + Limit: 1, // just in case we actually hit something + } + + err := s.GetList(ctx, s.resourcePrefix, storage.ListOptions{Predicate: pred}, emptyList) + if err != nil { + return 0, err + } + emptyListAccessor, err := meta.ListAccessor(emptyList) + if err != nil { + return 0, err + } + if emptyListAccessor == nil { + return 0, fmt.Errorf("unable to extract a list accessor from %T", emptyList) + } + + currentResourceVersion, err := strconv.Atoi(emptyListAccessor.GetResourceVersion()) + if err != nil { + return 0, err + } + + if currentResourceVersion == 0 { + return 0, fmt.Errorf("the current resource version must be greater than 0") + } + return uint64(currentResourceVersion), nil +} + // GetList implements storage.Interface. func (s *store) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { keyPrefix, err := s.prepareKey(key) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index b376cf22e24..c7238c17f58 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -28,6 +28,7 @@ import ( "testing" "github.com/go-logr/logr" + "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/kubernetes" "go.etcd.io/etcd/server/v3/embed" @@ -994,3 +995,49 @@ func BenchmarkStoreList(b *testing.B) { func computePodKey(obj *example.Pod) string { return fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name) } + +func TestGetCurrentResourceVersion(t *testing.T) { + ctx, store, _ := testSetup(t) + + makePod := func(name string) *example.Pod { + return &example.Pod{ + ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name}, + } + } + createPod := func(obj *example.Pod) *example.Pod { + key := "pods/" + obj.Namespace + "/" + obj.Name + out := &example.Pod{} + err := store.Create(context.TODO(), key, obj, out, 0) + require.NoError(t, err) + return out + } + getPod := func(name, ns string) *example.Pod { + key := "pods/" + ns + "/" + name + out := &example.Pod{} + err := store.Get(context.TODO(), key, storage.GetOptions{}, out) + require.NoError(t, err) + return out + } + + // create a pod and make sure its RV is equal to the one maintained by etcd + pod := createPod(makePod("pod-1")) + currentStorageRV, err := store.GetCurrentResourceVersion(context.TODO()) + require.NoError(t, err) + podRV, err := store.versioner.ParseResourceVersion(pod.ResourceVersion) + require.NoError(t, err) + require.Equal(t, currentStorageRV, podRV, "expected the global etcd RV to be equal to pod's RV") + + // now make unrelated write and make sure the target function returns global etcd RV + resp, err := store.client.KV.Put(ctx, "compact_rev_key", pod.ResourceVersion) + require.NoError(t, err) + currentStorageRV, err = store.GetCurrentResourceVersion(context.TODO()) + require.NoError(t, err) + require.NoError(t, err) + require.Equal(t, currentStorageRV, uint64(resp.Header.Revision), "expected the global etcd RV to be equal to replicaset's RV") + + // ensure that the pod's RV hasn't been changed + currentPod := getPod(pod.Name, pod.Namespace) + currentPodRV, err := store.versioner.ParseResourceVersion(currentPod.ResourceVersion) + require.NoError(t, err) + require.Equal(t, currentPodRV, podRV, "didn't expect to see the pod's RV changed") +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go index 3932f0caee2..2c756c3cb37 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go @@ -262,6 +262,10 @@ type Interface interface { // TODO: Remove when storage.Interface will be separate from etc3.store. // Deprecated: Added temporarily to simplify exposing RequestProgress for watch cache. RequestWatchProgress(ctx context.Context) error + + // GetCurrentResourceVersion gets the current resource version from etcd. + // This method issues an empty list request and reads only the ResourceVersion from the object metadata + GetCurrentResourceVersion(ctx context.Context) (uint64, error) } // GetOptions provides the options that may be provided for storage get operations. diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go index e8363f0fb26..61335dea9f9 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go @@ -1532,7 +1532,7 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac } if scenario.useCurrentRV { - currentStorageRV, err := storage.GetCurrentResourceVersionFromStorage(ctx, store, func() runtime.Object { return &example.PodList{} }, "/pods", "") + currentStorageRV, err := store.GetCurrentResourceVersion(ctx) require.NoError(t, err) scenario.resourceVersion = fmt.Sprintf("%d", currentStorageRV) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/util.go b/staging/src/k8s.io/apiserver/pkg/storage/util.go index 1aca351d99f..bb231df3038 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/util.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/util.go @@ -17,16 +17,12 @@ limitations under the License. package storage import ( - "context" "fmt" - "strconv" "sync/atomic" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/validation/path" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" ) @@ -81,45 +77,6 @@ func (hwm *HighWaterMark) Update(current int64) bool { } } -// GetCurrentResourceVersionFromStorage gets the current resource version from the underlying storage engine. -// This method issues an empty list request and reads only the ResourceVersion from the object metadata -func GetCurrentResourceVersionFromStorage(ctx context.Context, storage Interface, newListFunc func() runtime.Object, resourcePrefix, objectType string) (uint64, error) { - if storage == nil { - return 0, fmt.Errorf("storage wasn't provided for %s", objectType) - } - if newListFunc == nil { - return 0, fmt.Errorf("newListFunction wasn't provided for %s", objectType) - } - emptyList := newListFunc() - pred := SelectionPredicate{ - Label: labels.Everything(), - Field: fields.Everything(), - Limit: 1, // just in case we actually hit something - } - - err := storage.GetList(ctx, resourcePrefix, ListOptions{Predicate: pred}, emptyList) - if err != nil { - return 0, err - } - emptyListAccessor, err := meta.ListAccessor(emptyList) - if err != nil { - return 0, err - } - if emptyListAccessor == nil { - return 0, fmt.Errorf("unable to extract a list accessor from %T", emptyList) - } - - currentResourceVersion, err := strconv.Atoi(emptyListAccessor.GetResourceVersion()) - if err != nil { - return 0, err - } - - if currentResourceVersion == 0 { - return 0, fmt.Errorf("the current resource version must be greater than 0") - } - return uint64(currentResourceVersion), nil -} - // AnnotateInitialEventsEndBookmark adds a special annotation to the given object // which indicates that the initial events have been sent. // diff --git a/staging/src/k8s.io/apiserver/pkg/storage/util_test.go b/staging/src/k8s.io/apiserver/pkg/storage/util_test.go index fd8bca7455f..737e6f8ab2e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/util_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/util_test.go @@ -17,30 +17,22 @@ limitations under the License. package storage_test import ( - "context" "math/rand" "sync" "testing" "github.com/stretchr/testify/require" - "k8s.io/apimachinery/pkg/api/apitesting" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/runtime/serializer" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apiserver/pkg/apis/example" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" example2v1 "k8s.io/apiserver/pkg/apis/example2/v1" "k8s.io/apiserver/pkg/storage" - "k8s.io/apiserver/pkg/storage/etcd3" - etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" - "k8s.io/apiserver/pkg/storage/value/encrypt/identity" ) var ( scheme = runtime.NewScheme() - codecs = serializer.NewCodecFactory(scheme) ) func init() { @@ -81,74 +73,6 @@ func TestHighWaterMark(t *testing.T) { } } -func TestGetCurrentResourceVersionFromStorage(t *testing.T) { - // test data - newEtcdTestStorage := func(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) { - versioner := storage.APIObjectVersioner{} - codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion, example2v1.SchemeGroupVersion) - server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) - storage := etcd3.New(server.V3Client, codec, func() runtime.Object { return &example.Pod{} }, func() runtime.Object { return &example.PodList{} }, prefix, "/pods", schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), etcd3.NewDefaultLeaseManagerConfig(), etcd3.NewDefaultDecoder(codec, versioner), versioner) - return server, storage - } - server, etcdStorage := newEtcdTestStorage(t, "") - defer server.Terminate(t) - versioner := storage.APIObjectVersioner{} - - makePod := func(name string) *example.Pod { - return &example.Pod{ - ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name}, - } - } - createPod := func(obj *example.Pod) *example.Pod { - key := "pods/" + obj.Namespace + "/" + obj.Name - out := &example.Pod{} - err := etcdStorage.Create(context.TODO(), key, obj, out, 0) - require.NoError(t, err) - return out - } - getPod := func(name, ns string) *example.Pod { - key := "pods/" + ns + "/" + name - out := &example.Pod{} - err := etcdStorage.Get(context.TODO(), key, storage.GetOptions{}, out) - require.NoError(t, err) - return out - } - makeReplicaSet := func(name string) *example2v1.ReplicaSet { - return &example2v1.ReplicaSet{ - ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name}, - } - } - createReplicaSet := func(obj *example2v1.ReplicaSet) *example2v1.ReplicaSet { - key := "replicasets/" + obj.Namespace + "/" + obj.Name - out := &example2v1.ReplicaSet{} - err := etcdStorage.Create(context.TODO(), key, obj, out, 0) - require.NoError(t, err) - return out - } - - // create a pod and make sure its RV is equal to the one maintained by etcd - pod := createPod(makePod("pod-1")) - currentStorageRV, err := storage.GetCurrentResourceVersionFromStorage(context.TODO(), etcdStorage, func() runtime.Object { return &example.PodList{} }, "/pods", "Pod") - require.NoError(t, err) - podRV, err := versioner.ParseResourceVersion(pod.ResourceVersion) - require.NoError(t, err) - require.Equal(t, currentStorageRV, podRV, "expected the global etcd RV to be equal to pod's RV") - - // now create a replicaset (new resource) and make sure the target function returns global etcd RV - rs := createReplicaSet(makeReplicaSet("replicaset-1")) - currentStorageRV, err = storage.GetCurrentResourceVersionFromStorage(context.TODO(), etcdStorage, func() runtime.Object { return &example.PodList{} }, "/pods", "Pod") - require.NoError(t, err) - rsRV, err := versioner.ParseResourceVersion(rs.ResourceVersion) - require.NoError(t, err) - require.Equal(t, currentStorageRV, rsRV, "expected the global etcd RV to be equal to replicaset's RV") - - // ensure that the pod's RV hasn't been changed - currentPod := getPod(pod.Name, pod.Namespace) - currentPodRV, err := versioner.ParseResourceVersion(currentPod.ResourceVersion) - require.NoError(t, err) - require.Equal(t, currentPodRV, podRV, "didn't expect to see the pod's RV changed") -} - func TestHasInitialEventsEndBookmarkAnnotation(t *testing.T) { createPod := func(name string) *example.Pod { return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: name}}