From 2de2093bcef6544ef17c84116fbd751fc429820a Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 19 Feb 2025 17:03:48 +0100 Subject: [PATCH] Add snapshotting of watch cache behind a feature gate --- pkg/features/versioned_kube_features.go | 4 + .../apiserver/pkg/features/kube_features.go | 10 ++ .../apiserver/pkg/storage/cacher/store.go | 2 + .../pkg/storage/cacher/store_btree.go | 112 ++++++++++++++++++ .../pkg/storage/cacher/store_btree_test.go | 68 +++++++++++ .../pkg/storage/cacher/watch_cache.go | 27 ++++- .../pkg/storage/cacher/watch_cache_test.go | 101 ++++++++++++++++ .../test_data/versioned_feature_list.yaml | 6 + 8 files changed, 329 insertions(+), 1 deletion(-) diff --git a/pkg/features/versioned_kube_features.go b/pkg/features/versioned_kube_features.go index 163de6476fa..3f8743b3c0d 100644 --- a/pkg/features/versioned_kube_features.go +++ b/pkg/features/versioned_kube_features.go @@ -295,6 +295,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.29"), Default: false, PreRelease: featuregate.Deprecated}, }, + genericfeatures.ListFromCacheSnapshot: { + {Version: version.MustParse("1.33"), Default: false, PreRelease: featuregate.Alpha}, + }, + genericfeatures.MutatingAdmissionPolicy: { {Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Alpha}, }, diff --git a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go index facbefe2fe1..d2a4cdc8b27 100644 --- a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go +++ b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go @@ -123,6 +123,12 @@ const ( // Enables KMS v1 API for encryption at rest. KMSv1 featuregate.Feature = "KMSv1" + // owner: @serathius + // kep: https://kep.k8s.io/4988 + // + // Enables generating snapshots of watch cache store and using them to serve LIST requests. + ListFromCacheSnapshot featuregate.Feature = "ListFromCacheSnapshot" + // owner: @alexzielenski, @cici37, @jiahuif, @jpbetz // kep: https://kep.k8s.io/3962 // @@ -334,6 +340,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.29"), Default: false, PreRelease: featuregate.Deprecated}, }, + ListFromCacheSnapshot: { + {Version: version.MustParse("1.33"), Default: false, PreRelease: featuregate.Alpha}, + }, + MutatingAdmissionPolicy: { {Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Alpha}, }, diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/store.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/store.go index f21bdf71745..3b9d9decd03 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/store.go @@ -74,6 +74,8 @@ type storeIndexer interface { type orderedLister interface { ListPrefix(prefix, continueKey string, limit int) (items []interface{}, hasMore bool) + Count(prefix, continueKey string) (count int) + Clone() orderedLister } func newStoreIndexer(indexers *cache.Indexers) storeIndexer { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/store_btree.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/store_btree.go index fd6da0167dd..3b5d57a9755 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/store_btree.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/store_btree.go @@ -46,6 +46,18 @@ type threadedStoreIndexer struct { var _ orderedLister = (*threadedStoreIndexer)(nil) +func (si *threadedStoreIndexer) Count(prefix, continueKey string) (count int) { + si.lock.RLock() + defer si.lock.RUnlock() + return si.store.Count(prefix, continueKey) +} + +func (si *threadedStoreIndexer) Clone() orderedLister { + si.lock.RLock() + defer si.lock.RUnlock() + return si.store.Clone() +} + func (si *threadedStoreIndexer) Add(obj interface{}) error { return si.addOrUpdate(obj) } @@ -140,6 +152,12 @@ type btreeStore struct { tree *btree.BTreeG[*storeElement] } +func (s *btreeStore) Clone() orderedLister { + return &btreeStore{ + tree: s.tree.Clone(), + } +} + func (s *btreeStore) Add(obj interface{}) error { if obj == nil { return fmt.Errorf("obj cannot be nil") @@ -387,3 +405,97 @@ func (i *indexer) delete(key, value string, index map[string]map[string]*storeEl delete(index, value) } } + +// newStoreSnapshotter returns a storeSnapshotter that stores snapshots for +// serving read requests with exact resource versions (RV) and pagination. +// +// Snapshots are created by calling Clone method on orderedLister, which is +// expected to be fast and efficient thanks to usage of B-trees. +// B-trees can create a lazy copy of the tree structure, minimizing overhead. +// +// Assuming the watch cache observes all events and snapshots cache after each of them, +// requests for a specific resource version can be served by retrieving +// the snapshot with the greatest RV less than or equal to the requested RV. +// To make snapshot retrivial efficient we need an ordered data structure, such as tree. +// +// The initial implementation uses a B-tree to achieve the following performance characteristics (n - number of snapshots stored): +// - `Add`: Adds a new snapshot. +// Complexity: O(log n). +// Executed for each watch event observed by the cache. +// - `GetLessOrEqual`: Retrieves the snapshot with the greatest RV less than or equal to the requested RV. +// Complexity: O(log n). +// Executed for each LIST request with match=Exact or continuation. +// - `RemoveLess`: Cleans up snapshots outside the watch history window. +// Complexity: O(k log n), k - number of snapshots to remove, usually only one if watch capacity was not reduced. +// Executed per watch event observed when the cache is full. +// - `Reset`: Cleans up all snapshots. +// Complexity: O(1). +// Executed when the watch cache is reinitialized. +// +// Further optimization is possible by leveraging the property that adds always +// increase the maximum RV and deletes only increase the minimum RV. +// For example, a binary search on a cyclic buffer of (RV, snapshot) +// should reduce number of allocations and improve removal complexity. +// However, this solution is more complex and is deferred for future implementation. +// +// TODO: Rewrite to use a cyclic buffer +func newStoreSnapshotter() *storeSnapshotter { + s := &storeSnapshotter{ + snapshots: btree.NewG[rvSnapshot](btreeDegree, func(a, b rvSnapshot) bool { + return a.resourceVersion < b.resourceVersion + }), + } + return s +} + +type storeSnapshotter struct { + mux sync.RWMutex + snapshots *btree.BTreeG[rvSnapshot] +} + +type rvSnapshot struct { + resourceVersion uint64 + snapshot orderedLister +} + +func (s *storeSnapshotter) Reset() { + s.mux.Lock() + defer s.mux.Unlock() + s.snapshots.Clear(false) +} + +func (s *storeSnapshotter) GetLessOrEqual(rv uint64) (orderedLister, bool) { + s.mux.RLock() + defer s.mux.RUnlock() + + var result *rvSnapshot + s.snapshots.DescendLessOrEqual(rvSnapshot{resourceVersion: rv}, func(rvs rvSnapshot) bool { + result = &rvs + return false + }) + if result == nil { + return nil, false + } + return result.snapshot, true +} + +func (s *storeSnapshotter) Add(rv uint64, indexer orderedLister) { + s.mux.Lock() + defer s.mux.Unlock() + s.snapshots.ReplaceOrInsert(rvSnapshot{resourceVersion: rv, snapshot: indexer.Clone()}) +} + +func (s *storeSnapshotter) RemoveLess(rv uint64) { + s.mux.Lock() + defer s.mux.Unlock() + for s.snapshots.Len() > 0 { + oldest, ok := s.snapshots.Min() + if !ok { + break + } + if rv <= oldest.resourceVersion { + break + } + s.snapshots.DeleteMin() + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/store_btree_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/store_btree_test.go index b0e05d628ec..a7e7071d7f3 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/store_btree_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/store_btree_test.go @@ -79,3 +79,71 @@ func TestStoreListPrefix(t *testing.T) { testStorageElement("bar", "baz", 4), }, items) } + +func TestStoreSnapshotter(t *testing.T) { + cache := newStoreSnapshotter() + cache.Add(10, fakeOrderedLister{rv: 10}) + cache.Add(20, fakeOrderedLister{rv: 20}) + cache.Add(30, fakeOrderedLister{rv: 30}) + cache.Add(40, fakeOrderedLister{rv: 40}) + assert.Equal(t, 4, cache.snapshots.Len()) + + t.Log("No snapshot from before first RV") + _, found := cache.GetLessOrEqual(9) + assert.False(t, found) + + t.Log("Get snapshot from first RV") + snapshot, found := cache.GetLessOrEqual(10) + assert.True(t, found) + assert.Equal(t, 10, snapshot.(fakeOrderedLister).rv) + + t.Log("Get first snapshot by larger RV") + snapshot, found = cache.GetLessOrEqual(11) + assert.True(t, found) + assert.Equal(t, 10, snapshot.(fakeOrderedLister).rv) + + t.Log("Get second snapshot by larger RV") + snapshot, found = cache.GetLessOrEqual(22) + assert.True(t, found) + assert.Equal(t, 20, snapshot.(fakeOrderedLister).rv) + + t.Log("Get third snapshot for future revision") + snapshot, found = cache.GetLessOrEqual(100) + assert.True(t, found) + assert.Equal(t, 40, snapshot.(fakeOrderedLister).rv) + + t.Log("Remove snapshot less than 30") + cache.RemoveLess(30) + + assert.Equal(t, 2, cache.snapshots.Len()) + _, found = cache.GetLessOrEqual(10) + assert.False(t, found) + + _, found = cache.GetLessOrEqual(20) + assert.False(t, found) + + snapshot, found = cache.GetLessOrEqual(30) + assert.True(t, found) + assert.Equal(t, 30, snapshot.(fakeOrderedLister).rv) + + t.Log("Remove removing all RVs") + cache.Reset() + assert.Equal(t, 0, cache.snapshots.Len()) + _, found = cache.GetLessOrEqual(30) + assert.False(t, found) + _, found = cache.GetLessOrEqual(40) + assert.False(t, found) +} + +type fakeOrderedLister struct { + rv int +} + +func (f fakeOrderedLister) Add(obj interface{}) error { return nil } +func (f fakeOrderedLister) Update(obj interface{}) error { return nil } +func (f fakeOrderedLister) Delete(obj interface{}) error { return nil } +func (f fakeOrderedLister) Clone() orderedLister { return f } +func (f fakeOrderedLister) ListPrefix(prefixKey, continueKey string, limit int) ([]interface{}, bool) { + return nil, false +} +func (f fakeOrderedLister) Count(prefixKey, continueKey string) int { return 0 } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index b95ae2c17ae..a59666488dd 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -151,6 +151,9 @@ type watchCache struct { // Requests progress notification if there are requests waiting for watch // to be fresh waitingUntilFresh *conditionalProgressRequester + + // Stores previous snapshots of orderedLister to allow serving requests from previous revisions. + snapshots *storeSnapshotter } func newWatchCache( @@ -182,6 +185,9 @@ func newWatchCache( groupResource: groupResource, waitingUntilFresh: progressRequester, } + if utilfeature.DefaultFeatureGate.Enabled(features.ListFromCacheSnapshot) { + wc.snapshots = newStoreSnapshotter() + } metrics.WatchCacheCapacity.WithLabelValues(groupResource.String()).Set(float64(wc.capacity)) wc.cond = sync.NewCond(wc.RLocker()) wc.indexValidator = wc.isIndexValidLocked @@ -286,7 +292,20 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd w.resourceVersion = resourceVersion defer w.cond.Broadcast() - return updateFunc(elem) + err := updateFunc(elem) + if err != nil { + return err + } + if w.snapshots != nil { + if orderedLister, ordered := w.store.(orderedLister); ordered { + if w.isCacheFullLocked() { + oldestRV := w.cache[w.startIndex%w.capacity].ResourceVersion + w.snapshots.RemoveLess(oldestRV) + } + w.snapshots.Add(w.resourceVersion, orderedLister) + } + } + return err }(); err != nil { return err } @@ -601,6 +620,12 @@ func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error { if err := w.store.Replace(toReplace, resourceVersion); err != nil { return err } + if w.snapshots != nil { + w.snapshots.Reset() + if orderedLister, ordered := w.store.(orderedLister); ordered { + w.snapshots.Add(version, orderedLister) + } + } w.listResourceVersion = version w.resourceVersion = version if w.onReplace != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index 21e21d04fc3..9766ab90dee 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -24,6 +24,9 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" @@ -1247,3 +1250,101 @@ func TestHistogramCacheReadWait(t *testing.T) { }) } } + +func TestCacheSnapshots(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, true) + + store := newTestWatchCache(3, DefaultEventFreshDuration, &cache.Indexers{}) + defer store.Stop() + store.upperBoundCapacity = 3 + store.lowerBoundCapacity = 1 + clock := store.clock.(*testingclock.FakeClock) + + _, found := store.snapshots.GetLessOrEqual(100) + assert.False(t, found, "Expected empty cache to not include any snapshots") + + t.Log("Test cache on rev 100") + require.NoError(t, store.Add(makeTestPod("foo", 100))) + require.NoError(t, store.Update(makeTestPod("foo", 200))) + clock.Step(time.Second) + require.NoError(t, store.Delete(makeTestPod("foo", 300))) + + t.Log("Test cache on rev 100") + _, found = store.snapshots.GetLessOrEqual(99) + assert.False(t, found, "Expected store to not include rev 99") + lister, found := store.snapshots.GetLessOrEqual(100) + assert.True(t, found, "Expected store to not include rev 100") + elements, _ := lister.ListPrefix("", "", 0) + assert.Len(t, elements, 1) + assert.Equal(t, makeTestPod("foo", 100), elements[0].(*storeElement).Object) + + t.Log("Overflow cache to remove rev 100") + require.NoError(t, store.Add(makeTestPod("foo", 400))) + _, found = store.snapshots.GetLessOrEqual(100) + assert.False(t, found, "Expected overfilled cache to delete oldest rev 100") + + t.Log("Test cache on rev 200") + lister, found = store.snapshots.GetLessOrEqual(200) + assert.True(t, found, "Expected store to still keep rev 200") + elements, _ = lister.ListPrefix("", "", 0) + assert.Len(t, elements, 1) + assert.Equal(t, makeTestPod("foo", 200), elements[0].(*storeElement).Object) + + t.Log("Test cache on rev 300") + lister, found = store.snapshots.GetLessOrEqual(300) + assert.True(t, found, "Expected store to still keep rev 300") + elements, _ = lister.ListPrefix("", "", 0) + assert.Empty(t, elements) + + t.Log("Test cache on rev 400") + lister, found = store.snapshots.GetLessOrEqual(400) + assert.True(t, found, "Expected store to still keep rev 400") + elements, _ = lister.ListPrefix("", "", 0) + assert.Len(t, elements, 1) + assert.Equal(t, makeTestPod("foo", 400), elements[0].(*storeElement).Object) + + t.Log("Add event outside the event fresh window to force cache capacity downsize") + assert.Equal(t, 3, store.capacity) + clock.Step(DefaultEventFreshDuration + 1) + require.NoError(t, store.Update(makeTestPod("foo", 500))) + assert.Equal(t, 1, store.capacity) + assert.Equal(t, 1, store.snapshots.snapshots.Len()) + _, found = store.snapshots.GetLessOrEqual(499) + assert.False(t, found, "Expected overfilled cache to delete events below 500") + + t.Log("Test cache on rev 500") + lister, found = store.snapshots.GetLessOrEqual(500) + assert.True(t, found, "Expected store to still keep rev 500") + elements, _ = lister.ListPrefix("", "", 0) + assert.Len(t, elements, 1) + assert.Equal(t, makeTestPod("foo", 500), elements[0].(*storeElement).Object) + + t.Log("Add event to force capacity upsize") + require.NoError(t, store.Update(makeTestPod("foo", 600))) + assert.Equal(t, 2, store.capacity) + assert.Equal(t, 2, store.snapshots.snapshots.Len()) + + t.Log("Test cache on rev 600") + lister, found = store.snapshots.GetLessOrEqual(600) + assert.True(t, found, "Expected replace to be snapshotted") + elements, _ = lister.ListPrefix("", "", 0) + assert.Len(t, elements, 1) + assert.Equal(t, makeTestPod("foo", 600), elements[0].(*storeElement).Object) + + t.Log("Replace cache to remove history") + _, found = store.snapshots.GetLessOrEqual(500) + assert.True(t, found, "Confirm that cache stores history before replace") + err := store.Replace([]interface{}{makeTestPod("foo", 600)}, "700") + require.NoError(t, err) + _, found = store.snapshots.GetLessOrEqual(500) + assert.False(t, found, "Expected replace to remove history") + _, found = store.snapshots.GetLessOrEqual(600) + assert.False(t, found, "Expected replace to remove history") + + t.Log("Test cache on rev 700") + lister, found = store.snapshots.GetLessOrEqual(700) + assert.True(t, found, "Expected replace to be snapshotted") + elements, _ = lister.ListPrefix("", "", 0) + assert.Len(t, elements, 1) + assert.Equal(t, makeTestPod("foo", 600), elements[0].(*storeElement).Object) +} diff --git a/test/featuregates_linter/test_data/versioned_feature_list.yaml b/test/featuregates_linter/test_data/versioned_feature_list.yaml index 565290d616c..e636fab0790 100644 --- a/test/featuregates_linter/test_data/versioned_feature_list.yaml +++ b/test/featuregates_linter/test_data/versioned_feature_list.yaml @@ -728,6 +728,12 @@ lockToDefault: false preRelease: Deprecated version: "1.33" +- name: ListFromCacheSnapshot + versionedSpecs: + - default: false + lockToDefault: false + preRelease: Alpha + version: "1.33" - name: LoadBalancerIPMode versionedSpecs: - default: false