From ecaf2093f51fed5f544520b0ac00fb33a474b7f5 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Mon, 26 Feb 2024 12:22:05 +0100 Subject: [PATCH] storage/watch_cache: rework getAllEventsSinceLocked --- .../pkg/storage/cacher/watch_cache.go | 26 +++++++++++----- .../pkg/storage/cacher/watch_cache_test.go | 30 +++++++++---------- 2 files changed, 33 insertions(+), 23 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index c27ca053b78..cc797621b7a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -691,7 +691,11 @@ func (w *watchCache) isIndexValidLocked(index int) bool { // getAllEventsSinceLocked returns a watchCacheInterval that can be used to // retrieve events since a certain resourceVersion. This function assumes to // be called under the watchCache lock. -func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64) (*watchCacheInterval, error) { +func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64, opts storage.ListOptions) (*watchCacheInterval, error) { + if opts.SendInitialEvents != nil && *opts.SendInitialEvents { + return w.getIntervalFromStoreLocked() + } + size := w.endIndex - w.startIndex var oldest uint64 switch { @@ -711,13 +715,19 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64) (*watchCach } if resourceVersion == 0 { - // resourceVersion = 0 means that we don't require any specific starting point - // and we would like to start watching from ~now. - // However, to keep backward compatibility, we additionally need to return the - // current state and only then start watching from that point. - // - // TODO: In v2 api, we should stop returning the current state - #13969. - return w.getIntervalFromStoreLocked() + if opts.SendInitialEvents == nil { + // resourceVersion = 0 means that we don't require any specific starting point + // and we would like to start watching from ~now. + // However, to keep backward compatibility, we additionally need to return the + // current state and only then start watching from that point. + // + // TODO: In v2 api, we should stop returning the current state - #13969. + return w.getIntervalFromStoreLocked() + } + // SendInitialEvents = false and resourceVersion = 0 + // means that the request would like to start watching + // from Any resourceVersion + resourceVersion = w.resourceVersion } if resourceVersion < oldest-1 { return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1)) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index 6809225d76d..8e37e0cf83e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -77,8 +77,8 @@ type testWatchCache struct { stopCh chan struct{} } -func (w *testWatchCache) getAllEventsSince(resourceVersion uint64) ([]*watchCacheEvent, error) { - cacheInterval, err := w.getCacheIntervalForEvents(resourceVersion) +func (w *testWatchCache) getAllEventsSince(resourceVersion uint64, opts storage.ListOptions) ([]*watchCacheEvent, error) { + cacheInterval, err := w.getCacheIntervalForEvents(resourceVersion, opts) if err != nil { return nil, err } @@ -98,11 +98,11 @@ func (w *testWatchCache) getAllEventsSince(resourceVersion uint64) ([]*watchCach return result, nil } -func (w *testWatchCache) getCacheIntervalForEvents(resourceVersion uint64) (*watchCacheInterval, error) { +func (w *testWatchCache) getCacheIntervalForEvents(resourceVersion uint64, opts storage.ListOptions) (*watchCacheInterval, error) { w.RLock() defer w.RUnlock() - return w.getAllEventsSinceLocked(resourceVersion) + return w.getAllEventsSinceLocked(resourceVersion, opts) } // newTestWatchCache just adds a fake clock. @@ -269,7 +269,7 @@ func TestEvents(t *testing.T) { // Test for Added event. { - _, err := store.getAllEventsSince(1) + _, err := store.getAllEventsSince(1, storage.ListOptions{}) if err == nil { t.Errorf("expected error too old") } @@ -278,7 +278,7 @@ func TestEvents(t *testing.T) { } } { - result, err := store.getAllEventsSince(2) + result, err := store.getAllEventsSince(2, storage.ListOptions{}) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -302,13 +302,13 @@ func TestEvents(t *testing.T) { // Test with not full cache. { - _, err := store.getAllEventsSince(1) + _, err := store.getAllEventsSince(1, storage.ListOptions{}) if err == nil { t.Errorf("expected error too old") } } { - result, err := store.getAllEventsSince(3) + result, err := store.getAllEventsSince(3, storage.ListOptions{}) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -336,13 +336,13 @@ func TestEvents(t *testing.T) { // Test with full cache - there should be elements from 5 to 9. { - _, err := store.getAllEventsSince(3) + _, err := store.getAllEventsSince(3, storage.ListOptions{}) if err == nil { t.Errorf("expected error too old") } } { - result, err := store.getAllEventsSince(4) + result, err := store.getAllEventsSince(4, storage.ListOptions{}) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -361,7 +361,7 @@ func TestEvents(t *testing.T) { store.Delete(makeTestPod("pod", uint64(10))) { - result, err := store.getAllEventsSince(9) + result, err := store.getAllEventsSince(9, storage.ListOptions{}) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -392,13 +392,13 @@ func TestMarker(t *testing.T) { makeTestPod("pod2", 9), }, "9") - _, err := store.getAllEventsSince(8) + _, err := store.getAllEventsSince(8, storage.ListOptions{}) if err == nil || !strings.Contains(err.Error(), "too old resource version") { t.Errorf("unexpected error: %v", err) } // Getting events from 8 should return no events, // even though there is a marker there. - result, err := store.getAllEventsSince(9) + result, err := store.getAllEventsSince(9, storage.ListOptions{}) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -409,7 +409,7 @@ func TestMarker(t *testing.T) { pod := makeTestPod("pods", 12) store.Add(pod) // Getting events from 8 should still work and return one event. - result, err = store.getAllEventsSince(9) + result, err = store.getAllEventsSince(9, storage.ListOptions{}) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -975,7 +975,7 @@ func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) { // Force cache resize. addEvent("key4", 50, later.Add(time.Second)) - _, err := store.getAllEventsSince(15) + _, err := store.getAllEventsSince(15, storage.ListOptions{}) if err == nil || !strings.Contains(err.Error(), "too old resource version") { t.Errorf("unexpected error: %v", err) }