storage/watch_cache: rework getAllEventsSinceLocked

This commit is contained in:
Lukasz Szaszkiewicz 2024-02-26 12:22:05 +01:00
parent d629d3fa35
commit ecaf2093f5
2 changed files with 33 additions and 23 deletions

View File

@ -691,7 +691,11 @@ func (w *watchCache) isIndexValidLocked(index int) bool {
// getAllEventsSinceLocked returns a watchCacheInterval that can be used to // getAllEventsSinceLocked returns a watchCacheInterval that can be used to
// retrieve events since a certain resourceVersion. This function assumes to // retrieve events since a certain resourceVersion. This function assumes to
// be called under the watchCache lock. // be called under the watchCache lock.
func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64) (*watchCacheInterval, error) { func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64, opts storage.ListOptions) (*watchCacheInterval, error) {
if opts.SendInitialEvents != nil && *opts.SendInitialEvents {
return w.getIntervalFromStoreLocked()
}
size := w.endIndex - w.startIndex size := w.endIndex - w.startIndex
var oldest uint64 var oldest uint64
switch { switch {
@ -711,13 +715,19 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64) (*watchCach
} }
if resourceVersion == 0 { if resourceVersion == 0 {
// resourceVersion = 0 means that we don't require any specific starting point if opts.SendInitialEvents == nil {
// and we would like to start watching from ~now. // resourceVersion = 0 means that we don't require any specific starting point
// However, to keep backward compatibility, we additionally need to return the // and we would like to start watching from ~now.
// current state and only then start watching from that point. // However, to keep backward compatibility, we additionally need to return the
// // current state and only then start watching from that point.
// TODO: In v2 api, we should stop returning the current state - #13969. //
return w.getIntervalFromStoreLocked() // TODO: In v2 api, we should stop returning the current state - #13969.
return w.getIntervalFromStoreLocked()
}
// SendInitialEvents = false and resourceVersion = 0
// means that the request would like to start watching
// from Any resourceVersion
resourceVersion = w.resourceVersion
} }
if resourceVersion < oldest-1 { if resourceVersion < oldest-1 {
return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1)) return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1))

View File

@ -77,8 +77,8 @@ type testWatchCache struct {
stopCh chan struct{} stopCh chan struct{}
} }
func (w *testWatchCache) getAllEventsSince(resourceVersion uint64) ([]*watchCacheEvent, error) { func (w *testWatchCache) getAllEventsSince(resourceVersion uint64, opts storage.ListOptions) ([]*watchCacheEvent, error) {
cacheInterval, err := w.getCacheIntervalForEvents(resourceVersion) cacheInterval, err := w.getCacheIntervalForEvents(resourceVersion, opts)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -98,11 +98,11 @@ func (w *testWatchCache) getAllEventsSince(resourceVersion uint64) ([]*watchCach
return result, nil return result, nil
} }
func (w *testWatchCache) getCacheIntervalForEvents(resourceVersion uint64) (*watchCacheInterval, error) { func (w *testWatchCache) getCacheIntervalForEvents(resourceVersion uint64, opts storage.ListOptions) (*watchCacheInterval, error) {
w.RLock() w.RLock()
defer w.RUnlock() defer w.RUnlock()
return w.getAllEventsSinceLocked(resourceVersion) return w.getAllEventsSinceLocked(resourceVersion, opts)
} }
// newTestWatchCache just adds a fake clock. // newTestWatchCache just adds a fake clock.
@ -269,7 +269,7 @@ func TestEvents(t *testing.T) {
// Test for Added event. // Test for Added event.
{ {
_, err := store.getAllEventsSince(1) _, err := store.getAllEventsSince(1, storage.ListOptions{})
if err == nil { if err == nil {
t.Errorf("expected error too old") t.Errorf("expected error too old")
} }
@ -278,7 +278,7 @@ func TestEvents(t *testing.T) {
} }
} }
{ {
result, err := store.getAllEventsSince(2) result, err := store.getAllEventsSince(2, storage.ListOptions{})
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -302,13 +302,13 @@ func TestEvents(t *testing.T) {
// Test with not full cache. // Test with not full cache.
{ {
_, err := store.getAllEventsSince(1) _, err := store.getAllEventsSince(1, storage.ListOptions{})
if err == nil { if err == nil {
t.Errorf("expected error too old") t.Errorf("expected error too old")
} }
} }
{ {
result, err := store.getAllEventsSince(3) result, err := store.getAllEventsSince(3, storage.ListOptions{})
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -336,13 +336,13 @@ func TestEvents(t *testing.T) {
// Test with full cache - there should be elements from 5 to 9. // Test with full cache - there should be elements from 5 to 9.
{ {
_, err := store.getAllEventsSince(3) _, err := store.getAllEventsSince(3, storage.ListOptions{})
if err == nil { if err == nil {
t.Errorf("expected error too old") t.Errorf("expected error too old")
} }
} }
{ {
result, err := store.getAllEventsSince(4) result, err := store.getAllEventsSince(4, storage.ListOptions{})
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -361,7 +361,7 @@ func TestEvents(t *testing.T) {
store.Delete(makeTestPod("pod", uint64(10))) store.Delete(makeTestPod("pod", uint64(10)))
{ {
result, err := store.getAllEventsSince(9) result, err := store.getAllEventsSince(9, storage.ListOptions{})
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -392,13 +392,13 @@ func TestMarker(t *testing.T) {
makeTestPod("pod2", 9), makeTestPod("pod2", 9),
}, "9") }, "9")
_, err := store.getAllEventsSince(8) _, err := store.getAllEventsSince(8, storage.ListOptions{})
if err == nil || !strings.Contains(err.Error(), "too old resource version") { if err == nil || !strings.Contains(err.Error(), "too old resource version") {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
// Getting events from 8 should return no events, // Getting events from 8 should return no events,
// even though there is a marker there. // even though there is a marker there.
result, err := store.getAllEventsSince(9) result, err := store.getAllEventsSince(9, storage.ListOptions{})
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
@ -409,7 +409,7 @@ func TestMarker(t *testing.T) {
pod := makeTestPod("pods", 12) pod := makeTestPod("pods", 12)
store.Add(pod) store.Add(pod)
// Getting events from 8 should still work and return one event. // Getting events from 8 should still work and return one event.
result, err = store.getAllEventsSince(9) result, err = store.getAllEventsSince(9, storage.ListOptions{})
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
@ -975,7 +975,7 @@ func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) {
// Force cache resize. // Force cache resize.
addEvent("key4", 50, later.Add(time.Second)) addEvent("key4", 50, later.Add(time.Second))
_, err := store.getAllEventsSince(15) _, err := store.getAllEventsSince(15, storage.ListOptions{})
if err == nil || !strings.Contains(err.Error(), "too old resource version") { if err == nil || !strings.Contains(err.Error(), "too old resource version") {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }