From f87e4a19c88fa908eb176ee7925f211bafba9b45 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Fri, 20 Sep 2024 08:47:49 +0200 Subject: [PATCH 1/3] storage/cacher/cache_watcher: add RV to watchCacheInterval --- .../pkg/storage/cacher/watch_cache.go | 2 +- .../storage/cacher/watch_cache_interval.go | 24 ++++++++++++------- .../cacher/watch_cache_interval_test.go | 3 ++- 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 04a2617d640..63599f9aab7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -746,7 +746,7 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64, key string, indexerFunc := func(i int) *watchCacheEvent { return w.cache[i%w.capacity] } - ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, w.RWMutex.RLocker()) + ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, resourceVersion, w.RWMutex.RLocker()) return ci, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go index babd74e0c82..2522854d5c2 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go @@ -91,6 +91,10 @@ type watchCacheInterval struct { // lock on each invocation of Next(). buffer *watchCacheIntervalBuffer + // resourceVersion is the resourceVersion from which + // the interval was constructed. + resourceVersion uint64 + // lock effectively protects access to the underlying source // of events through - indexer and indexValidator. // @@ -103,14 +107,15 @@ type attrFunc func(runtime.Object) (labels.Set, fields.Set, error) type indexerFunc func(int) *watchCacheEvent type indexValidator func(int) bool -func newCacheInterval(startIndex, endIndex int, indexer indexerFunc, indexValidator indexValidator, locker sync.Locker) *watchCacheInterval { +func newCacheInterval(startIndex, endIndex int, indexer indexerFunc, indexValidator indexValidator, resourceVersion uint64, locker sync.Locker) *watchCacheInterval { return &watchCacheInterval{ - startIndex: startIndex, - endIndex: endIndex, - indexer: indexer, - indexValidator: indexValidator, - buffer: &watchCacheIntervalBuffer{buffer: make([]*watchCacheEvent, bufferSize)}, - lock: locker, + startIndex: startIndex, + endIndex: endIndex, + indexer: indexer, + indexValidator: indexValidator, + buffer: &watchCacheIntervalBuffer{buffer: make([]*watchCacheEvent, bufferSize)}, + resourceVersion: resourceVersion, + lock: locker, } } @@ -172,8 +177,9 @@ func newCacheIntervalFromStore(resourceVersion uint64, store storeIndexer, getAt ci := &watchCacheInterval{ startIndex: 0, // Simulate that we already have all the events we're looking for. - endIndex: 0, - buffer: buffer, + endIndex: 0, + buffer: buffer, + resourceVersion: resourceVersion, } return ci, nil diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval_test.go index 65dbf033fdf..487a5ac1e8d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval_test.go @@ -41,7 +41,7 @@ func intervalFromEvents(events []*watchCacheEvent) *watchCacheInterval { } indexValidator := func(_ int) bool { return true } - return newCacheInterval(startIndex, endIndex, indexer, indexValidator, locker) + return newCacheInterval(startIndex, endIndex, indexer, indexValidator, 0, locker) } func bufferFromEvents(events []*watchCacheEvent) *watchCacheIntervalBuffer { @@ -300,6 +300,7 @@ func TestCacheIntervalNextFromWatchCache(t *testing.T) { wc.endIndex, indexerFunc, wc.isIndexValidLocked, + wc.resourceVersion, &wc.RWMutex, ) From de735be512767dd2eced78530693d2e3ae997e6e Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Fri, 20 Sep 2024 08:48:31 +0200 Subject: [PATCH 2/3] storage/cacher/cache_watcher: processInterval sets RV from the snapshot --- .../k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go index 595fd5036d0..e07d00d3104 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go @@ -454,6 +454,13 @@ func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watch const initProcessThreshold = 500 * time.Millisecond startTime := time.Now() + // cacheInterval may be created from a version being more fresh than requested + // (e.g. for NotOlderThan semantic). In such a case, we need to prevent watch event + // with lower resourceVersion from being delivered to ensure watch contract. + if cacheInterval.resourceVersion > resourceVersion { + resourceVersion = cacheInterval.resourceVersion + } + initEventCount := 0 for { event, err := cacheInterval.Next() From e7e2123feb2f1957be490a21aa606cb2dc718432 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Fri, 20 Sep 2024 09:37:53 +0200 Subject: [PATCH 3/3] pkg/storage/testing/watcher_tests: RunWatchSemantics checks if the storage has been primed with init data --- .../pkg/storage/testing/watcher_tests.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go index 89672d80624..952da81728e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go @@ -1460,6 +1460,21 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac createdPods = append(createdPods, out) } + if len(createdPods) > 0 { + // this list call ensures that the cache has seen the created pods. + // this makes the watch request below deterministic. + listObject := &example.PodList{} + opts := storage.ListOptions{ + Predicate: storage.Everything, + Recursive: true, + ResourceVersion: createdPods[len(createdPods)-1].ResourceVersion, + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + } + err := store.GetList(ctx, fmt.Sprintf("/pods/%s", ns), opts, listObject) + require.NoError(t, err) + require.Len(t, listObject.Items, len(createdPods)) + } + if scenario.useCurrentRV { currentStorageRV, err := storage.GetCurrentResourceVersionFromStorage(ctx, store, func() runtime.Object { return &example.PodList{} }, "/pods", "") require.NoError(t, err)