Add snapshotting of watch cache behind a feature gate

This commit is contained in:
Marek Siarkowicz 2025-02-19 17:03:48 +01:00 committed by Marek Siarkowicz
parent 9988145383
commit 2de2093bce
8 changed files with 329 additions and 1 deletions

View File

@ -295,6 +295,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.29"), Default: false, PreRelease: featuregate.Deprecated},
},
genericfeatures.ListFromCacheSnapshot: {
{Version: version.MustParse("1.33"), Default: false, PreRelease: featuregate.Alpha},
},
genericfeatures.MutatingAdmissionPolicy: {
{Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Alpha},
},

View File

@ -123,6 +123,12 @@ const (
// Enables KMS v1 API for encryption at rest.
KMSv1 featuregate.Feature = "KMSv1"
// owner: @serathius
// kep: https://kep.k8s.io/4988
//
// Enables generating snapshots of watch cache store and using them to serve LIST requests.
ListFromCacheSnapshot featuregate.Feature = "ListFromCacheSnapshot"
// owner: @alexzielenski, @cici37, @jiahuif, @jpbetz
// kep: https://kep.k8s.io/3962
//
@ -334,6 +340,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.29"), Default: false, PreRelease: featuregate.Deprecated},
},
ListFromCacheSnapshot: {
{Version: version.MustParse("1.33"), Default: false, PreRelease: featuregate.Alpha},
},
MutatingAdmissionPolicy: {
{Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Alpha},
},

View File

@ -74,6 +74,8 @@ type storeIndexer interface {
type orderedLister interface {
ListPrefix(prefix, continueKey string, limit int) (items []interface{}, hasMore bool)
Count(prefix, continueKey string) (count int)
Clone() orderedLister
}
func newStoreIndexer(indexers *cache.Indexers) storeIndexer {

View File

@ -46,6 +46,18 @@ type threadedStoreIndexer struct {
var _ orderedLister = (*threadedStoreIndexer)(nil)
func (si *threadedStoreIndexer) Count(prefix, continueKey string) (count int) {
si.lock.RLock()
defer si.lock.RUnlock()
return si.store.Count(prefix, continueKey)
}
func (si *threadedStoreIndexer) Clone() orderedLister {
si.lock.RLock()
defer si.lock.RUnlock()
return si.store.Clone()
}
func (si *threadedStoreIndexer) Add(obj interface{}) error {
return si.addOrUpdate(obj)
}
@ -140,6 +152,12 @@ type btreeStore struct {
tree *btree.BTreeG[*storeElement]
}
func (s *btreeStore) Clone() orderedLister {
return &btreeStore{
tree: s.tree.Clone(),
}
}
func (s *btreeStore) Add(obj interface{}) error {
if obj == nil {
return fmt.Errorf("obj cannot be nil")
@ -387,3 +405,97 @@ func (i *indexer) delete(key, value string, index map[string]map[string]*storeEl
delete(index, value)
}
}
// newStoreSnapshotter returns a storeSnapshotter that stores snapshots for
// serving read requests with exact resource versions (RV) and pagination.
//
// Snapshots are created by calling Clone method on orderedLister, which is
// expected to be fast and efficient thanks to usage of B-trees.
// B-trees can create a lazy copy of the tree structure, minimizing overhead.
//
// Assuming the watch cache observes all events and snapshots cache after each of them,
// requests for a specific resource version can be served by retrieving
// the snapshot with the greatest RV less than or equal to the requested RV.
// To make snapshot retrivial efficient we need an ordered data structure, such as tree.
//
// The initial implementation uses a B-tree to achieve the following performance characteristics (n - number of snapshots stored):
// - `Add`: Adds a new snapshot.
// Complexity: O(log n).
// Executed for each watch event observed by the cache.
// - `GetLessOrEqual`: Retrieves the snapshot with the greatest RV less than or equal to the requested RV.
// Complexity: O(log n).
// Executed for each LIST request with match=Exact or continuation.
// - `RemoveLess`: Cleans up snapshots outside the watch history window.
// Complexity: O(k log n), k - number of snapshots to remove, usually only one if watch capacity was not reduced.
// Executed per watch event observed when the cache is full.
// - `Reset`: Cleans up all snapshots.
// Complexity: O(1).
// Executed when the watch cache is reinitialized.
//
// Further optimization is possible by leveraging the property that adds always
// increase the maximum RV and deletes only increase the minimum RV.
// For example, a binary search on a cyclic buffer of (RV, snapshot)
// should reduce number of allocations and improve removal complexity.
// However, this solution is more complex and is deferred for future implementation.
//
// TODO: Rewrite to use a cyclic buffer
func newStoreSnapshotter() *storeSnapshotter {
s := &storeSnapshotter{
snapshots: btree.NewG[rvSnapshot](btreeDegree, func(a, b rvSnapshot) bool {
return a.resourceVersion < b.resourceVersion
}),
}
return s
}
type storeSnapshotter struct {
mux sync.RWMutex
snapshots *btree.BTreeG[rvSnapshot]
}
type rvSnapshot struct {
resourceVersion uint64
snapshot orderedLister
}
func (s *storeSnapshotter) Reset() {
s.mux.Lock()
defer s.mux.Unlock()
s.snapshots.Clear(false)
}
func (s *storeSnapshotter) GetLessOrEqual(rv uint64) (orderedLister, bool) {
s.mux.RLock()
defer s.mux.RUnlock()
var result *rvSnapshot
s.snapshots.DescendLessOrEqual(rvSnapshot{resourceVersion: rv}, func(rvs rvSnapshot) bool {
result = &rvs
return false
})
if result == nil {
return nil, false
}
return result.snapshot, true
}
func (s *storeSnapshotter) Add(rv uint64, indexer orderedLister) {
s.mux.Lock()
defer s.mux.Unlock()
s.snapshots.ReplaceOrInsert(rvSnapshot{resourceVersion: rv, snapshot: indexer.Clone()})
}
func (s *storeSnapshotter) RemoveLess(rv uint64) {
s.mux.Lock()
defer s.mux.Unlock()
for s.snapshots.Len() > 0 {
oldest, ok := s.snapshots.Min()
if !ok {
break
}
if rv <= oldest.resourceVersion {
break
}
s.snapshots.DeleteMin()
}
}

View File

@ -79,3 +79,71 @@ func TestStoreListPrefix(t *testing.T) {
testStorageElement("bar", "baz", 4),
}, items)
}
func TestStoreSnapshotter(t *testing.T) {
cache := newStoreSnapshotter()
cache.Add(10, fakeOrderedLister{rv: 10})
cache.Add(20, fakeOrderedLister{rv: 20})
cache.Add(30, fakeOrderedLister{rv: 30})
cache.Add(40, fakeOrderedLister{rv: 40})
assert.Equal(t, 4, cache.snapshots.Len())
t.Log("No snapshot from before first RV")
_, found := cache.GetLessOrEqual(9)
assert.False(t, found)
t.Log("Get snapshot from first RV")
snapshot, found := cache.GetLessOrEqual(10)
assert.True(t, found)
assert.Equal(t, 10, snapshot.(fakeOrderedLister).rv)
t.Log("Get first snapshot by larger RV")
snapshot, found = cache.GetLessOrEqual(11)
assert.True(t, found)
assert.Equal(t, 10, snapshot.(fakeOrderedLister).rv)
t.Log("Get second snapshot by larger RV")
snapshot, found = cache.GetLessOrEqual(22)
assert.True(t, found)
assert.Equal(t, 20, snapshot.(fakeOrderedLister).rv)
t.Log("Get third snapshot for future revision")
snapshot, found = cache.GetLessOrEqual(100)
assert.True(t, found)
assert.Equal(t, 40, snapshot.(fakeOrderedLister).rv)
t.Log("Remove snapshot less than 30")
cache.RemoveLess(30)
assert.Equal(t, 2, cache.snapshots.Len())
_, found = cache.GetLessOrEqual(10)
assert.False(t, found)
_, found = cache.GetLessOrEqual(20)
assert.False(t, found)
snapshot, found = cache.GetLessOrEqual(30)
assert.True(t, found)
assert.Equal(t, 30, snapshot.(fakeOrderedLister).rv)
t.Log("Remove removing all RVs")
cache.Reset()
assert.Equal(t, 0, cache.snapshots.Len())
_, found = cache.GetLessOrEqual(30)
assert.False(t, found)
_, found = cache.GetLessOrEqual(40)
assert.False(t, found)
}
type fakeOrderedLister struct {
rv int
}
func (f fakeOrderedLister) Add(obj interface{}) error { return nil }
func (f fakeOrderedLister) Update(obj interface{}) error { return nil }
func (f fakeOrderedLister) Delete(obj interface{}) error { return nil }
func (f fakeOrderedLister) Clone() orderedLister { return f }
func (f fakeOrderedLister) ListPrefix(prefixKey, continueKey string, limit int) ([]interface{}, bool) {
return nil, false
}
func (f fakeOrderedLister) Count(prefixKey, continueKey string) int { return 0 }

View File

@ -151,6 +151,9 @@ type watchCache struct {
// Requests progress notification if there are requests waiting for watch
// to be fresh
waitingUntilFresh *conditionalProgressRequester
// Stores previous snapshots of orderedLister to allow serving requests from previous revisions.
snapshots *storeSnapshotter
}
func newWatchCache(
@ -182,6 +185,9 @@ func newWatchCache(
groupResource: groupResource,
waitingUntilFresh: progressRequester,
}
if utilfeature.DefaultFeatureGate.Enabled(features.ListFromCacheSnapshot) {
wc.snapshots = newStoreSnapshotter()
}
metrics.WatchCacheCapacity.WithLabelValues(groupResource.String()).Set(float64(wc.capacity))
wc.cond = sync.NewCond(wc.RLocker())
wc.indexValidator = wc.isIndexValidLocked
@ -286,7 +292,20 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
w.resourceVersion = resourceVersion
defer w.cond.Broadcast()
return updateFunc(elem)
err := updateFunc(elem)
if err != nil {
return err
}
if w.snapshots != nil {
if orderedLister, ordered := w.store.(orderedLister); ordered {
if w.isCacheFullLocked() {
oldestRV := w.cache[w.startIndex%w.capacity].ResourceVersion
w.snapshots.RemoveLess(oldestRV)
}
w.snapshots.Add(w.resourceVersion, orderedLister)
}
}
return err
}(); err != nil {
return err
}
@ -601,6 +620,12 @@ func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
if err := w.store.Replace(toReplace, resourceVersion); err != nil {
return err
}
if w.snapshots != nil {
w.snapshots.Reset()
if orderedLister, ordered := w.store.(orderedLister); ordered {
w.snapshots.Add(version, orderedLister)
}
}
w.listResourceVersion = version
w.resourceVersion = version
if w.onReplace != nil {

View File

@ -24,6 +24,9 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
@ -1247,3 +1250,101 @@ func TestHistogramCacheReadWait(t *testing.T) {
})
}
}
func TestCacheSnapshots(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, true)
store := newTestWatchCache(3, DefaultEventFreshDuration, &cache.Indexers{})
defer store.Stop()
store.upperBoundCapacity = 3
store.lowerBoundCapacity = 1
clock := store.clock.(*testingclock.FakeClock)
_, found := store.snapshots.GetLessOrEqual(100)
assert.False(t, found, "Expected empty cache to not include any snapshots")
t.Log("Test cache on rev 100")
require.NoError(t, store.Add(makeTestPod("foo", 100)))
require.NoError(t, store.Update(makeTestPod("foo", 200)))
clock.Step(time.Second)
require.NoError(t, store.Delete(makeTestPod("foo", 300)))
t.Log("Test cache on rev 100")
_, found = store.snapshots.GetLessOrEqual(99)
assert.False(t, found, "Expected store to not include rev 99")
lister, found := store.snapshots.GetLessOrEqual(100)
assert.True(t, found, "Expected store to not include rev 100")
elements, _ := lister.ListPrefix("", "", 0)
assert.Len(t, elements, 1)
assert.Equal(t, makeTestPod("foo", 100), elements[0].(*storeElement).Object)
t.Log("Overflow cache to remove rev 100")
require.NoError(t, store.Add(makeTestPod("foo", 400)))
_, found = store.snapshots.GetLessOrEqual(100)
assert.False(t, found, "Expected overfilled cache to delete oldest rev 100")
t.Log("Test cache on rev 200")
lister, found = store.snapshots.GetLessOrEqual(200)
assert.True(t, found, "Expected store to still keep rev 200")
elements, _ = lister.ListPrefix("", "", 0)
assert.Len(t, elements, 1)
assert.Equal(t, makeTestPod("foo", 200), elements[0].(*storeElement).Object)
t.Log("Test cache on rev 300")
lister, found = store.snapshots.GetLessOrEqual(300)
assert.True(t, found, "Expected store to still keep rev 300")
elements, _ = lister.ListPrefix("", "", 0)
assert.Empty(t, elements)
t.Log("Test cache on rev 400")
lister, found = store.snapshots.GetLessOrEqual(400)
assert.True(t, found, "Expected store to still keep rev 400")
elements, _ = lister.ListPrefix("", "", 0)
assert.Len(t, elements, 1)
assert.Equal(t, makeTestPod("foo", 400), elements[0].(*storeElement).Object)
t.Log("Add event outside the event fresh window to force cache capacity downsize")
assert.Equal(t, 3, store.capacity)
clock.Step(DefaultEventFreshDuration + 1)
require.NoError(t, store.Update(makeTestPod("foo", 500)))
assert.Equal(t, 1, store.capacity)
assert.Equal(t, 1, store.snapshots.snapshots.Len())
_, found = store.snapshots.GetLessOrEqual(499)
assert.False(t, found, "Expected overfilled cache to delete events below 500")
t.Log("Test cache on rev 500")
lister, found = store.snapshots.GetLessOrEqual(500)
assert.True(t, found, "Expected store to still keep rev 500")
elements, _ = lister.ListPrefix("", "", 0)
assert.Len(t, elements, 1)
assert.Equal(t, makeTestPod("foo", 500), elements[0].(*storeElement).Object)
t.Log("Add event to force capacity upsize")
require.NoError(t, store.Update(makeTestPod("foo", 600)))
assert.Equal(t, 2, store.capacity)
assert.Equal(t, 2, store.snapshots.snapshots.Len())
t.Log("Test cache on rev 600")
lister, found = store.snapshots.GetLessOrEqual(600)
assert.True(t, found, "Expected replace to be snapshotted")
elements, _ = lister.ListPrefix("", "", 0)
assert.Len(t, elements, 1)
assert.Equal(t, makeTestPod("foo", 600), elements[0].(*storeElement).Object)
t.Log("Replace cache to remove history")
_, found = store.snapshots.GetLessOrEqual(500)
assert.True(t, found, "Confirm that cache stores history before replace")
err := store.Replace([]interface{}{makeTestPod("foo", 600)}, "700")
require.NoError(t, err)
_, found = store.snapshots.GetLessOrEqual(500)
assert.False(t, found, "Expected replace to remove history")
_, found = store.snapshots.GetLessOrEqual(600)
assert.False(t, found, "Expected replace to remove history")
t.Log("Test cache on rev 700")
lister, found = store.snapshots.GetLessOrEqual(700)
assert.True(t, found, "Expected replace to be snapshotted")
elements, _ = lister.ListPrefix("", "", 0)
assert.Len(t, elements, 1)
assert.Equal(t, makeTestPod("foo", 600), elements[0].(*storeElement).Object)
}

View File

@ -728,6 +728,12 @@
lockToDefault: false
preRelease: Deprecated
version: "1.33"
- name: ListFromCacheSnapshot
versionedSpecs:
- default: false
lockToDefault: false
preRelease: Alpha
version: "1.33"
- name: LoadBalancerIPMode
versionedSpecs:
- default: false