From d883e606da1b0c14623e7bc1fb82dcf944745560 Mon Sep 17 00:00:00 2001 From: Michael Aspinwall Date: Wed, 28 Jan 2026 22:29:39 +0000 Subject: [PATCH] Add rv query to store Kubernetes-commit: 9c6ac550efa6e721c85b0f1074f26f734286fba9 --- tools/cache/controller_test.go | 199 +++++++++++++++++++++++++- tools/cache/expiration_cache.go | 12 +- tools/cache/fifo_test.go | 9 ++ tools/cache/store.go | 19 ++- tools/cache/thread_safe_store.go | 62 +++++++- tools/cache/thread_safe_store_test.go | 119 +++++++++++++++ 6 files changed, 413 insertions(+), 7 deletions(-) diff --git a/tools/cache/controller_test.go b/tools/cache/controller_test.go index 9528dc6ca..1751bc4cb 100644 --- a/tools/cache/controller_test.go +++ b/tools/cache/controller_test.go @@ -921,7 +921,7 @@ func TestReplaceEvents(t *testing.T) { fifoOptions.KnownObjects = store } fifo := NewRealFIFOWithOptions(fifoOptions) - recorder := newEventRecorder(store) + recorder := newEventRecorder(store, DeletionHandlingMetaNamespaceKeyFunc) cfg := &Config{ Queue: fifo, @@ -1058,7 +1058,7 @@ func TestResetWatch(t *testing.T) { fifoOptions.KnownObjects = store } fifo := NewRealFIFOWithOptions(fifoOptions) - recorder := newEventRecorder(store) + recorder := newEventRecorder(store, DeletionHandlingMetaNamespaceKeyFunc) cfg := &Config{ Queue: fifo, @@ -1134,6 +1134,195 @@ func testResetWatch(t *testing.T, ctx context.Context, source *fcache.FakeContro assert.False(t, addEvent.IsInInitialList) } +func TestStoreResourceVersion(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + source := fcache.NewFakeControllerSource() + source.Add(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "default", + }, + }) + t.Cleanup(func() { + source.Shutdown() + }) + + store := NewStore(DeletionHandlingMetaNamespaceKeyFunc) + fifo := NewRealFIFOWithOptions(RealFIFOOptions{ + AtomicEvents: true, + }) + recorder := newEventRecorder(store, DeletionHandlingMetaNamespaceKeyFunc) + logger, _ := ktesting.NewTestContext(t) + + cfg := &Config{ + Queue: fifo, + ListerWatcher: source, + ObjectType: &v1.Pod{}, + FullResyncPeriod: 0, + Process: func(obj interface{}, isInInitialList bool) error { + if deltas, ok := obj.(Deltas); ok { + return processDeltas(logger, recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) + } + return errors.New("object given as Process argument is not Deltas") + }, + ProcessBatch: func(deltaList []Delta, isInInitialList bool) error { + return processDeltasInBatch(logger, recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) + }, + } + c := New(cfg) + go c.RunWithContext(ctx) + if !WaitForCacheSync(ctx.Done(), c.HasSynced) { + t.Fatal("Timed out waiting for cache sync") + } + testResourceVersion(t, ctx, source, recorder, store) +} + +func testResourceVersion(t *testing.T, ctx context.Context, source *fcache.FakeControllerSource, recorder *eventRecorder, store Store) { + // The first event should be the initial add. + require.NoError(t, recorder.waitForEventCount(ctx, 1, 5*time.Second), "Controller failed to receive initial setup all events, got: %v", recorder.getHistory()) + history := recorder.getHistory() + assert.Len(t, history, 1) + assert.Equal(t, "add", history[0].Action) + assert.Equal(t, "default/pod-1", history[0].Key) + assert.True(t, history[0].IsInInitialList) + + recorder.clearHistory() + // Update the pod labels to increment the resource version. + source.Modify(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "default", + Labels: map[string]string{ + "foo": "bar", + }, + }, + }) + require.NoError(t, recorder.waitForEventCount(ctx, 1, 5*time.Second), "Controller failed to receive update event, got: %v", recorder.getHistory()) + history = recorder.getHistory() + // The stored resource version should have changed. + assert.Equal(t, history[0].EventRV, store.LastStoreSyncResourceVersion()) + + recorder.clearHistory() + // Delete the pod. + source.Delete(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "default", + }, + }) + require.NoError(t, recorder.waitForEventCount(ctx, 1, 5*time.Second), "Controller failed to receive delete event, got: %v", recorder.getHistory()) + history = recorder.getHistory() + // The stored resource version should have changed. + assert.Equal(t, history[0].EventRV, store.LastStoreSyncResourceVersion()) +} + +func TestStoreResourceVersionWithNonMetaTransform(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + source := fcache.NewFakeControllerSource() + source.Add(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "default", + }, + }) + t.Cleanup(func() { + source.Shutdown() + }) + + // Create a keyfunc that does not rely on object metadata. + keyFunc := func(obj interface{}) (string, error) { + return "pod-1", nil + } + + store := NewStore(keyFunc) + fifo := NewRealFIFOWithOptions(RealFIFOOptions{ + Transformer: func(obj interface{}) (interface{}, error) { + pod, ok := obj.(*v1.Pod) + if !ok { + return nil, fmt.Errorf("expected *v1.Pod, got %T", obj) + } + return pod.Name, nil + }, + KeyFunction: keyFunc, + AtomicEvents: true, + }) + recorder := newEventRecorder(store, keyFunc) + logger, _ := ktesting.NewTestContext(t) + + cfg := &Config{ + Queue: fifo, + ListerWatcher: source, + ObjectType: &v1.Pod{}, + FullResyncPeriod: 0, + + Process: func(obj interface{}, isInInitialList bool) error { + if deltas, ok := obj.(Deltas); ok { + return processDeltas(logger, recorder, store, deltas, isInInitialList, keyFunc) + } + return errors.New("object given as Process argument is not Deltas") + }, + ProcessBatch: func(deltaList []Delta, isInInitialList bool) error { + return processDeltasInBatch(logger, recorder, store, deltaList, isInInitialList, keyFunc) + }, + } + c := New(cfg) + go c.RunWithContext(ctx) + if !WaitForCacheSync(ctx.Done(), c.HasSynced) { + t.Fatal("Timed out waiting for cache sync") + } + testResourceVersionWithNonMetaTransform(t, ctx, source, recorder, store) +} + +func testResourceVersionWithNonMetaTransform(t *testing.T, ctx context.Context, source *fcache.FakeControllerSource, recorder *eventRecorder, store Store) { + // The first event should be the initial add. + require.NoError(t, recorder.waitForEventCount(ctx, 1, 5*time.Second), "Controller failed to receive initial setup all events, got: %v", recorder.getHistory()) + history := recorder.getHistory() + assert.Len(t, history, 1) + assert.Equal(t, "add", history[0].Action) + assert.Equal(t, "pod-1", history[0].Key) + assert.True(t, history[0].IsInInitialList) + + // The stored resource version should stay at the initial value given by the replace event since + // the update event does not have metadata. + replaceRV := store.LastStoreSyncResourceVersion() + recorder.clearHistory() + // Update the pod labels to increment the resource version. + source.Modify(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "default", + Labels: map[string]string{ + "foo": "bar", + }, + }, + }) + require.NoError(t, recorder.waitForEventCount(ctx, 1, 5*time.Second), "Controller failed to receive update event, got: %v", recorder.getHistory()) + // The stored resource version should not be changed due to the transform. + assert.Equal(t, replaceRV, store.LastStoreSyncResourceVersion()) + + recorder.clearHistory() + // Modify the pod but do not send the event. + source.ModifyDropWatch(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "default", + Labels: map[string]string{ + "foo": "bar2", + }, + }, + }) + // Trigger a replace by resetting the watch + source.ResetWatch() + + require.NoError(t, recorder.waitForEventCount(ctx, 1, 5*time.Second), "Controller failed to receive replace event, got: %v", recorder.getHistory()) + // The stored resource version should be updated to the replace events RV. + assert.Equal(t, "3", store.LastStoreSyncResourceVersion()) +} + type eventRecord struct { Action string Key string @@ -1146,14 +1335,16 @@ type eventRecorder struct { historyLock sync.Mutex history []eventRecord store Store + keyFunc KeyFunc updateCh chan bool } -func newEventRecorder(store Store) *eventRecorder { +func newEventRecorder(store Store, keyFunc KeyFunc) *eventRecorder { return &eventRecorder{ store: store, updateCh: make(chan bool, 1), + keyFunc: keyFunc, } } @@ -1170,7 +1361,7 @@ func (m *eventRecorder) OnDelete(obj interface{}) { func (m *eventRecorder) record(action string, obj interface{}, isInInitialList bool) { m.historyLock.Lock() defer m.historyLock.Unlock() - key, _ := DeletionHandlingMetaNamespaceKeyFunc(obj) + key, _ := m.keyFunc(obj) if deleted, ok := obj.(DeletedFinalStateUnknown); ok { obj = deleted.Obj diff --git a/tools/cache/expiration_cache.go b/tools/cache/expiration_cache.go index 813916ebf..19c2226b9 100644 --- a/tools/cache/expiration_cache.go +++ b/tools/cache/expiration_cache.go @@ -137,6 +137,16 @@ func (c *ExpirationCache) List() []interface{} { return list } +// LastStoreSyncResourceVersion returns the latest resource version that the cache has seen. +func (c *ExpirationCache) LastStoreSyncResourceVersion() string { + return c.cacheStorage.LastStoreSyncResourceVersion() +} + +// Bookmark observes a new resource version in the cache. +func (c *ExpirationCache) Bookmark(rv string) { + c.cacheStorage.Bookmark(rv) +} + // ListKeys returns a list of all keys in the expiration cache. func (c *ExpirationCache) ListKeys() []string { return c.cacheStorage.ListKeys() @@ -170,7 +180,7 @@ func (c *ExpirationCache) Delete(obj interface{}) error { } c.expirationLock.Lock() defer c.expirationLock.Unlock() - c.cacheStorage.Delete(key) + c.cacheStorage.DeleteWithObject(key, obj) return nil } diff --git a/tools/cache/fifo_test.go b/tools/cache/fifo_test.go index 41bdee727..6803566ec 100644 --- a/tools/cache/fifo_test.go +++ b/tools/cache/fifo_test.go @@ -35,6 +35,15 @@ func (f *FIFO) List() []interface{} { return list } +// LastStoreSyncResourceVersion is unimplemented for FIFO, only used in unit testing. +func (f *FIFO) LastStoreSyncResourceVersion() string { + return "" +} + +// Bookmark is unimplemented for FIFO, only used in unit testing. +func (f *FIFO) Bookmark(rv string) { +} + // ListKeys returns a list of all the keys of the objects currently // in the FIFO. // This function was moved here because it is not consistent with normal list semantics, but is used in unit testing. diff --git a/tools/cache/store.go b/tools/cache/store.go index a412fd701..805d42cee 100644 --- a/tools/cache/store.go +++ b/tools/cache/store.go @@ -55,6 +55,15 @@ type Store interface { // ListKeys returns a list of all the keys currently associated with non-empty accumulators ListKeys() []string + // LastStoreSyncResourceVersion returns the latest resource version that the store has seen. + // This is used to determine the latest resource version the store has seen from objects + // observed being written to the store. + LastStoreSyncResourceVersion() string + + // Bookmark observes a new resource version passed into it and + // will be used to get the latest resource version of the store. + Bookmark(rv string) + // Get returns the accumulator associated with the given object's key Get(obj interface{}) (item interface{}, exists bool, err error) @@ -276,7 +285,7 @@ func (c *cache) Delete(obj interface{}) error { if err != nil { return KeyError{obj, err} } - c.cacheStorage.Delete(key) + c.cacheStorage.DeleteWithObject(key, obj) return nil } @@ -292,6 +301,14 @@ func (c *cache) ListKeys() []string { return c.cacheStorage.ListKeys() } +func (c *cache) LastStoreSyncResourceVersion() string { + return c.cacheStorage.LastStoreSyncResourceVersion() +} + +func (c *cache) Bookmark(rv string) { + c.cacheStorage.Bookmark(rv) +} + // GetIndexers returns the indexers of cache func (c *cache) GetIndexers() Indexers { return c.cacheStorage.GetIndexers() diff --git a/tools/cache/thread_safe_store.go b/tools/cache/thread_safe_store.go index ef3a599a9..a306468ec 100644 --- a/tools/cache/thread_safe_store.go +++ b/tools/cache/thread_safe_store.go @@ -21,7 +21,9 @@ import ( "sync" "time" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/util/sets" + clientgofeaturegate "k8s.io/client-go/features" utiltrace "k8s.io/utils/trace" ) @@ -43,13 +45,19 @@ import ( type ThreadSafeStore interface { Add(key string, obj interface{}) Update(key string, obj interface{}) + // Delete is equivalent to calling DeleteWithObject(key, nil) however it is + // not recommended to use this function as it will not update the resource + // version of the store, possibly causing it to be out of date. Delete(key string) + DeleteWithObject(key string, obj interface{}) Get(key string) (item interface{}, exists bool) List() []interface{} ListKeys() []string Replace(map[string]interface{}, string) Index(indexName string, obj interface{}) ([]interface{}, error) IndexKeys(indexName, indexedValue string) ([]string, error) + Bookmark(rv string) + LastStoreSyncResourceVersion() string ListIndexFuncValues(name string) []string ByIndex(indexName, indexedValue string) ([]interface{}, error) GetIndexers() Indexers @@ -242,9 +250,15 @@ type threadSafeMap struct { // index implements the indexing functionality index *storeIndex + rv string } func (c *threadSafeMap) Transaction(txns ...ThreadSafeStoreTransaction) { + if len(txns) == 0 { + return + } + finalObj := txns[len(txns)-1].Object + rv, rvErr := rvFromObject(finalObj) c.lock.Lock() defer c.lock.Unlock() trace := utiltrace.New("ThreadSafeMap Transaction Process", @@ -262,6 +276,9 @@ func (c *threadSafeMap) Transaction(txns ...ThreadSafeStoreTransaction) { c.deleteLocked(txn.Key) } } + if rvErr == nil { + c.rv = rv + } } func (c *threadSafeMap) Add(key string, obj interface{}) { @@ -273,9 +290,13 @@ func (c *threadSafeMap) addLocked(key string, obj interface{}) { } func (c *threadSafeMap) Update(key string, obj interface{}) { + rv, rvErr := rvFromObject(obj) c.lock.Lock() defer c.lock.Unlock() c.updateLocked(key, obj) + if rvErr == nil { + c.rv = rv + } } func (c *threadSafeMap) updateLocked(key string, obj interface{}) { @@ -285,9 +306,21 @@ func (c *threadSafeMap) updateLocked(key string, obj interface{}) { } func (c *threadSafeMap) Delete(key string) { + c.DeleteWithObject(key, nil) +} + +func (c *threadSafeMap) DeleteWithObject(key string, obj interface{}) { + var rv string + var rvErr error + if obj != nil { + rv, rvErr = rvFromObject(obj) + } c.lock.Lock() defer c.lock.Unlock() c.deleteLocked(key) + if obj != nil && rvErr == nil { + c.rv = rv + } } func (c *threadSafeMap) deleteLocked(key string) { @@ -330,7 +363,7 @@ func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion st c.lock.Lock() defer c.lock.Unlock() c.items = items - + c.rv = resourceVersion // rebuild any index c.index.reset() for key, item := range c.items { @@ -338,6 +371,15 @@ func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion st } } +func rvFromObject(obj interface{}) (rv string, err error) { + meta, err := meta.Accessor(obj) + if err != nil { + return "", err + } + rv = meta.GetResourceVersion() + return rv, nil +} + // Index returns a list of items that match the given object on the index function. // Index is thread-safe so long as you treat all items as immutable. func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) { @@ -356,6 +398,24 @@ func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, return list, nil } +// LastStoreSyncResourceVersion returns the latest resource version that the store has seen. +func (c *threadSafeMap) LastStoreSyncResourceVersion() string { + // We cannot return the resource version if the AtomicFIFO feature gate is not enabled. + if !clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.AtomicFIFO) { + return "" + } + c.lock.RLock() + defer c.lock.RUnlock() + return c.rv +} + +// Bookmark sets the latest resource version that the store has seen. +func (c *threadSafeMap) Bookmark(rv string) { + c.lock.Lock() + defer c.lock.Unlock() + c.rv = rv +} + // ByIndex returns a list of the items whose indexed values in the given index include the given indexed value func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) { c.lock.RLock() diff --git a/tools/cache/thread_safe_store_test.go b/tools/cache/thread_safe_store_test.go index 44d620e53..8a9d9edb0 100644 --- a/tools/cache/thread_safe_store_test.go +++ b/tools/cache/thread_safe_store_test.go @@ -23,6 +23,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" ) @@ -166,6 +167,124 @@ func TestThreadSafeStoreIndexingFunctionsWithMultipleValues(t *testing.T) { assert.NoError(compare("baz", []string{})) } +func TestThreadSafeStoreRV(t *testing.T) { + t.Run("Initial state", func(t *testing.T) { + store := NewThreadSafeStore(Indexers{}, Indices{}).(*threadSafeMap) + if rv := store.LastStoreSyncResourceVersion(); rv != "" { + t.Errorf("Expected initial RV to be \"\", got %q", rv) + } + }) + + t.Run("Add Update and Delete", func(t *testing.T) { + store := NewThreadSafeStore(Indexers{}, Indices{}).(*threadSafeMap) + + // Add obj with RV "10" + store.Add("key1", &metav1.ObjectMeta{ResourceVersion: "10"}) + if rv := store.LastStoreSyncResourceVersion(); rv != "10" { + t.Errorf("Expected RV to be \"10\", got %q", rv) + } + + // Add obj with same RV "10" + store.Add("key3", &metav1.ObjectMeta{ResourceVersion: "10"}) + if rv := store.LastStoreSyncResourceVersion(); rv != "10" { + t.Errorf("Expected RV to remain \"10\" after adding same RV, got %q", rv) + } + + // Add obj with higher RV "20" + store.Add("key4", &metav1.ObjectMeta{ResourceVersion: "20"}) + if rv := store.LastStoreSyncResourceVersion(); rv != "20" { + t.Errorf("Expected RV to be \"20\", got %q", rv) + } + + // Delete an earlier object with a newer RV + store.DeleteWithObject("key4", &metav1.ObjectMeta{ResourceVersion: "30"}) + if rv := store.LastStoreSyncResourceVersion(); rv != "30" { + t.Errorf("Expected RV to become \"30\" after deletion %q", rv) + } + + // Add non-meta object, should not be updated + store.Add("key5", "just a string") + if rv := store.LastStoreSyncResourceVersion(); rv != "30" { + t.Errorf("Expected RV to remain \"30\" after adding non-meta object, got %q", rv) + } + + // Add obj with RV "40" + store.Add("key6", &metav1.ObjectMeta{ResourceVersion: "40"}) + if rv := store.LastStoreSyncResourceVersion(); rv != "40" { + t.Errorf("Expected RV to be \"40\", got %q", rv) + } + + // Nil delete + store.Delete("key6") + if rv := store.LastStoreSyncResourceVersion(); rv != "40" { + t.Errorf("Expected RV to remain \"40\" after delete, got %q", rv) + } + + txns := []ThreadSafeStoreTransaction{ + { + Transaction{ + Object: &metav1.ObjectMeta{ResourceVersion: "40"}, + Type: TransactionTypeUpdate, + }, + "key9", + }, + { + Transaction{ + Object: &metav1.ObjectMeta{ResourceVersion: "30"}, + Type: TransactionTypeUpdate, + }, + "key10", + }, + { + Transaction{ + Object: &metav1.ObjectMeta{ResourceVersion: "50"}, + Type: TransactionTypeUpdate, + }, + "key11", + }, + } + store.Transaction(txns...) + if rv := store.LastStoreSyncResourceVersion(); rv != "50" { + t.Errorf("Expected RV to be \"50\" after transaction, got %q", rv) + } + }) + + t.Run("Replace", func(t *testing.T) { + store := NewThreadSafeStore(Indexers{}, Indices{}).(*threadSafeMap) + store.Add("key1", &metav1.ObjectMeta{ResourceVersion: "10"}) + + if rv := store.LastStoreSyncResourceVersion(); rv != "10" { + t.Fatalf("Setup failed, expected RV \"10\", got %q", rv) + } + + items := map[string]interface{}{ + "key3": &metav1.ObjectMeta{ResourceVersion: "40"}, + "key2": &metav1.ObjectMeta{ResourceVersion: "30"}, + } + + store.Replace(items, "50") + + if rv := store.LastStoreSyncResourceVersion(); rv != "50" { + t.Errorf("Expected RV to be \"50\" after Replace(), got %q", rv) + } + }) + + t.Run("Delete", func(t *testing.T) { + store := NewThreadSafeStore(Indexers{}, Indices{}).(*threadSafeMap) + store.Add("key1", &metav1.ObjectMeta{ResourceVersion: "10"}) + + if rv := store.LastStoreSyncResourceVersion(); rv != "10" { + t.Fatalf("Setup failed, expected RV \"10\", got %q", rv) + } + + store.DeleteWithObject("key1", &metav1.ObjectMeta{ResourceVersion: "20"}) + + if rv := store.LastStoreSyncResourceVersion(); rv != "20" { + t.Errorf("Expected RV to be \"20\" after Delete(), got %q", rv) + } + }) +} + func BenchmarkIndexer(b *testing.B) { testIndexer := "testIndexer"