diff --git a/tools/cache/controller.go b/tools/cache/controller.go index 3a4c4cb5c..55ab62c76 100644 --- a/tools/cache/controller.go +++ b/tools/cache/controller.go @@ -653,6 +653,12 @@ func processDeltas( return err } handler.OnDelete(obj) + case Bookmark: + info, ok := obj.(BookmarkInfo) + if !ok { + return fmt.Errorf("bookmark delta did not contain BookmarkInfo: %T", obj) + } + clientState.Bookmark(info.ResourceVersion) } } return nil @@ -853,6 +859,7 @@ func newQueueFIFO(logger klog.Logger, objectType any, clientState Store, transfo if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.AtomicFIFO) { options.AtomicEvents = true options.UnlockWhileProcessing = clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.UnlockWhileProcessingFIFO) + options.EmitDeltaTypeBookmark = true } else { options.KnownObjects = clientState } diff --git a/tools/cache/controller_test.go b/tools/cache/controller_test.go index 9528dc6ca..1751bc4cb 100644 --- a/tools/cache/controller_test.go +++ b/tools/cache/controller_test.go @@ -921,7 +921,7 @@ func TestReplaceEvents(t *testing.T) { fifoOptions.KnownObjects = store } fifo := NewRealFIFOWithOptions(fifoOptions) - recorder := newEventRecorder(store) + recorder := newEventRecorder(store, DeletionHandlingMetaNamespaceKeyFunc) cfg := &Config{ Queue: fifo, @@ -1058,7 +1058,7 @@ func TestResetWatch(t *testing.T) { fifoOptions.KnownObjects = store } fifo := NewRealFIFOWithOptions(fifoOptions) - recorder := newEventRecorder(store) + recorder := newEventRecorder(store, DeletionHandlingMetaNamespaceKeyFunc) cfg := &Config{ Queue: fifo, @@ -1134,6 +1134,195 @@ func testResetWatch(t *testing.T, ctx context.Context, source *fcache.FakeContro assert.False(t, addEvent.IsInInitialList) } +func TestStoreResourceVersion(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + source := fcache.NewFakeControllerSource() + source.Add(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "default", + }, + }) + t.Cleanup(func() { + source.Shutdown() + }) + + store := NewStore(DeletionHandlingMetaNamespaceKeyFunc) + fifo := NewRealFIFOWithOptions(RealFIFOOptions{ + AtomicEvents: true, + }) + recorder := newEventRecorder(store, DeletionHandlingMetaNamespaceKeyFunc) + logger, _ := ktesting.NewTestContext(t) + + cfg := &Config{ + Queue: fifo, + ListerWatcher: source, + ObjectType: &v1.Pod{}, + FullResyncPeriod: 0, + Process: func(obj interface{}, isInInitialList bool) error { + if deltas, ok := obj.(Deltas); ok { + return processDeltas(logger, recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) + } + return errors.New("object given as Process argument is not Deltas") + }, + ProcessBatch: func(deltaList []Delta, isInInitialList bool) error { + return processDeltasInBatch(logger, recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) + }, + } + c := New(cfg) + go c.RunWithContext(ctx) + if !WaitForCacheSync(ctx.Done(), c.HasSynced) { + t.Fatal("Timed out waiting for cache sync") + } + testResourceVersion(t, ctx, source, recorder, store) +} + +func testResourceVersion(t *testing.T, ctx context.Context, source *fcache.FakeControllerSource, recorder *eventRecorder, store Store) { + // The first event should be the initial add. + require.NoError(t, recorder.waitForEventCount(ctx, 1, 5*time.Second), "Controller failed to receive initial setup all events, got: %v", recorder.getHistory()) + history := recorder.getHistory() + assert.Len(t, history, 1) + assert.Equal(t, "add", history[0].Action) + assert.Equal(t, "default/pod-1", history[0].Key) + assert.True(t, history[0].IsInInitialList) + + recorder.clearHistory() + // Update the pod labels to increment the resource version. + source.Modify(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "default", + Labels: map[string]string{ + "foo": "bar", + }, + }, + }) + require.NoError(t, recorder.waitForEventCount(ctx, 1, 5*time.Second), "Controller failed to receive update event, got: %v", recorder.getHistory()) + history = recorder.getHistory() + // The stored resource version should have changed. + assert.Equal(t, history[0].EventRV, store.LastStoreSyncResourceVersion()) + + recorder.clearHistory() + // Delete the pod. + source.Delete(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "default", + }, + }) + require.NoError(t, recorder.waitForEventCount(ctx, 1, 5*time.Second), "Controller failed to receive delete event, got: %v", recorder.getHistory()) + history = recorder.getHistory() + // The stored resource version should have changed. + assert.Equal(t, history[0].EventRV, store.LastStoreSyncResourceVersion()) +} + +func TestStoreResourceVersionWithNonMetaTransform(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + source := fcache.NewFakeControllerSource() + source.Add(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "default", + }, + }) + t.Cleanup(func() { + source.Shutdown() + }) + + // Create a keyfunc that does not rely on object metadata. + keyFunc := func(obj interface{}) (string, error) { + return "pod-1", nil + } + + store := NewStore(keyFunc) + fifo := NewRealFIFOWithOptions(RealFIFOOptions{ + Transformer: func(obj interface{}) (interface{}, error) { + pod, ok := obj.(*v1.Pod) + if !ok { + return nil, fmt.Errorf("expected *v1.Pod, got %T", obj) + } + return pod.Name, nil + }, + KeyFunction: keyFunc, + AtomicEvents: true, + }) + recorder := newEventRecorder(store, keyFunc) + logger, _ := ktesting.NewTestContext(t) + + cfg := &Config{ + Queue: fifo, + ListerWatcher: source, + ObjectType: &v1.Pod{}, + FullResyncPeriod: 0, + + Process: func(obj interface{}, isInInitialList bool) error { + if deltas, ok := obj.(Deltas); ok { + return processDeltas(logger, recorder, store, deltas, isInInitialList, keyFunc) + } + return errors.New("object given as Process argument is not Deltas") + }, + ProcessBatch: func(deltaList []Delta, isInInitialList bool) error { + return processDeltasInBatch(logger, recorder, store, deltaList, isInInitialList, keyFunc) + }, + } + c := New(cfg) + go c.RunWithContext(ctx) + if !WaitForCacheSync(ctx.Done(), c.HasSynced) { + t.Fatal("Timed out waiting for cache sync") + } + testResourceVersionWithNonMetaTransform(t, ctx, source, recorder, store) +} + +func testResourceVersionWithNonMetaTransform(t *testing.T, ctx context.Context, source *fcache.FakeControllerSource, recorder *eventRecorder, store Store) { + // The first event should be the initial add. + require.NoError(t, recorder.waitForEventCount(ctx, 1, 5*time.Second), "Controller failed to receive initial setup all events, got: %v", recorder.getHistory()) + history := recorder.getHistory() + assert.Len(t, history, 1) + assert.Equal(t, "add", history[0].Action) + assert.Equal(t, "pod-1", history[0].Key) + assert.True(t, history[0].IsInInitialList) + + // The stored resource version should stay at the initial value given by the replace event since + // the update event does not have metadata. + replaceRV := store.LastStoreSyncResourceVersion() + recorder.clearHistory() + // Update the pod labels to increment the resource version. + source.Modify(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "default", + Labels: map[string]string{ + "foo": "bar", + }, + }, + }) + require.NoError(t, recorder.waitForEventCount(ctx, 1, 5*time.Second), "Controller failed to receive update event, got: %v", recorder.getHistory()) + // The stored resource version should not be changed due to the transform. + assert.Equal(t, replaceRV, store.LastStoreSyncResourceVersion()) + + recorder.clearHistory() + // Modify the pod but do not send the event. + source.ModifyDropWatch(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "default", + Labels: map[string]string{ + "foo": "bar2", + }, + }, + }) + // Trigger a replace by resetting the watch + source.ResetWatch() + + require.NoError(t, recorder.waitForEventCount(ctx, 1, 5*time.Second), "Controller failed to receive replace event, got: %v", recorder.getHistory()) + // The stored resource version should be updated to the replace events RV. + assert.Equal(t, "3", store.LastStoreSyncResourceVersion()) +} + type eventRecord struct { Action string Key string @@ -1146,14 +1335,16 @@ type eventRecorder struct { historyLock sync.Mutex history []eventRecord store Store + keyFunc KeyFunc updateCh chan bool } -func newEventRecorder(store Store) *eventRecorder { +func newEventRecorder(store Store, keyFunc KeyFunc) *eventRecorder { return &eventRecorder{ store: store, updateCh: make(chan bool, 1), + keyFunc: keyFunc, } } @@ -1170,7 +1361,7 @@ func (m *eventRecorder) OnDelete(obj interface{}) { func (m *eventRecorder) record(action string, obj interface{}, isInInitialList bool) { m.historyLock.Lock() defer m.historyLock.Unlock() - key, _ := DeletionHandlingMetaNamespaceKeyFunc(obj) + key, _ := m.keyFunc(obj) if deleted, ok := obj.(DeletedFinalStateUnknown); ok { obj = deleted.Obj diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index b57cc4728..b2a191f78 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -202,6 +202,9 @@ const ( // SyncAll indicates all known objects should be reprocessed. // This event contains an object of type SyncAllInfo. SyncAll DeltaType = "SyncAll" + // Bookmark is emitted on Bookmark calls and Replace calls to pass resource + // version information to the consumer. + Bookmark DeltaType = "Bookmark" ) // Delta is a member of Deltas (a list of Delta objects) which diff --git a/tools/cache/expiration_cache.go b/tools/cache/expiration_cache.go index 813916ebf..19c2226b9 100644 --- a/tools/cache/expiration_cache.go +++ b/tools/cache/expiration_cache.go @@ -137,6 +137,16 @@ func (c *ExpirationCache) List() []interface{} { return list } +// LastStoreSyncResourceVersion returns the latest resource version that the cache has seen. +func (c *ExpirationCache) LastStoreSyncResourceVersion() string { + return c.cacheStorage.LastStoreSyncResourceVersion() +} + +// Bookmark observes a new resource version in the cache. +func (c *ExpirationCache) Bookmark(rv string) { + c.cacheStorage.Bookmark(rv) +} + // ListKeys returns a list of all keys in the expiration cache. func (c *ExpirationCache) ListKeys() []string { return c.cacheStorage.ListKeys() @@ -170,7 +180,7 @@ func (c *ExpirationCache) Delete(obj interface{}) error { } c.expirationLock.Lock() defer c.expirationLock.Unlock() - c.cacheStorage.Delete(key) + c.cacheStorage.DeleteWithObject(key, obj) return nil } diff --git a/tools/cache/fifo_test.go b/tools/cache/fifo_test.go index 41bdee727..6803566ec 100644 --- a/tools/cache/fifo_test.go +++ b/tools/cache/fifo_test.go @@ -35,6 +35,15 @@ func (f *FIFO) List() []interface{} { return list } +// LastStoreSyncResourceVersion is unimplemented for FIFO, only used in unit testing. +func (f *FIFO) LastStoreSyncResourceVersion() string { + return "" +} + +// Bookmark is unimplemented for FIFO, only used in unit testing. +func (f *FIFO) Bookmark(rv string) { +} + // ListKeys returns a list of all the keys of the objects currently // in the FIFO. // This function was moved here because it is not consistent with normal list semantics, but is used in unit testing. diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index 62ea68c72..66af9febc 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -76,6 +76,12 @@ type ReflectorStore interface { Resync() error } +// ReflectorBookmarkStore is an optional interface that allows a store +// to be informed of bookmark events received by the reflector. +type ReflectorBookmarkStore interface { + Bookmark(resourceVersion string) error +} + // TransformingStore is an optional interface that can be implemented by the provided store. // If implemented on the provided store reflector will use the same transformer in its internal stores. type TransformingStore interface { @@ -1006,6 +1012,13 @@ loop: if meta.GetAnnotations()[metav1.InitialEventsAnnotationKey] == "true" { watchListBookmarkReceived = true } + // Propagate the resource version from the bookmark event to stores which indicate they want it + if bookmarkStore, ok := store.(ReflectorBookmarkStore); ok { + err := bookmarkStore.Bookmark(resourceVersion) + if err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "Unable to send bookmark event to store", "reflector", name, "object", event.Object) + } + } default: utilruntime.HandleErrorWithContext(ctx, err, "Unknown watch event", "reflector", name, "event", event) } diff --git a/tools/cache/store.go b/tools/cache/store.go index a412fd701..805d42cee 100644 --- a/tools/cache/store.go +++ b/tools/cache/store.go @@ -55,6 +55,15 @@ type Store interface { // ListKeys returns a list of all the keys currently associated with non-empty accumulators ListKeys() []string + // LastStoreSyncResourceVersion returns the latest resource version that the store has seen. + // This is used to determine the latest resource version the store has seen from objects + // observed being written to the store. + LastStoreSyncResourceVersion() string + + // Bookmark observes a new resource version passed into it and + // will be used to get the latest resource version of the store. + Bookmark(rv string) + // Get returns the accumulator associated with the given object's key Get(obj interface{}) (item interface{}, exists bool, err error) @@ -276,7 +285,7 @@ func (c *cache) Delete(obj interface{}) error { if err != nil { return KeyError{obj, err} } - c.cacheStorage.Delete(key) + c.cacheStorage.DeleteWithObject(key, obj) return nil } @@ -292,6 +301,14 @@ func (c *cache) ListKeys() []string { return c.cacheStorage.ListKeys() } +func (c *cache) LastStoreSyncResourceVersion() string { + return c.cacheStorage.LastStoreSyncResourceVersion() +} + +func (c *cache) Bookmark(rv string) { + c.cacheStorage.Bookmark(rv) +} + // GetIndexers returns the indexers of cache func (c *cache) GetIndexers() Indexers { return c.cacheStorage.GetIndexers() diff --git a/tools/cache/the_real_fifo.go b/tools/cache/the_real_fifo.go index 4856a9baf..c07c4eef8 100644 --- a/tools/cache/the_real_fifo.go +++ b/tools/cache/the_real_fifo.go @@ -73,6 +73,10 @@ type RealFIFOOptions struct { // MetricsProvider is used to create metrics for the FIFO. MetricsProvider FIFOMetricsProvider + + // EmitDeltaTypeBookmark is used to specify whether the RealFIFO will emit + // bookmark deltas or not. This can only be set if AtomicEvents is true. + EmitDeltaTypeBookmark bool } const ( @@ -148,6 +152,11 @@ type RealFIFO struct { // metrics holds all metrics for this FIFO. metrics *fifoMetrics + + // emitDeltaTypeBookmark defines whether bookmark deltas should be emitted. + // This may only be set if emitAtomicEvents is true, which avoids events + // propagating out of RV order during Replace and Resync. + emitDeltaTypeBookmark bool } // ReplacedAllInfo is the object associated with a Delta of type=ReplacedAll @@ -159,6 +168,12 @@ type ReplacedAllInfo struct { Objects []interface{} } +// BookmarkInfo is the object associated with a Delta of type=Bookmark +type BookmarkInfo struct { + // ResourceVersion is the resource version passed to the Bookmark() call that created this Delta + ResourceVersion string +} + // SyncAllInfo is the object associated with a Delta of type=SyncAll // It is used to trigger a resync of the entire queue. type SyncAllInfo struct{} @@ -564,6 +579,21 @@ func (f *RealFIFO) PopBatch(processBatch ProcessBatchFunc, processSingle PopProc }) } +func (f *RealFIFO) Bookmark(resourceVersion string) error { + if !f.emitDeltaTypeBookmark { + return nil + } + f.lock.Lock() + defer f.lock.Unlock() + + f.items = append(f.items, Delta{ + Type: Bookmark, + Object: BookmarkInfo{ResourceVersion: resourceVersion}, + }) + f.cond.Broadcast() + return nil +} + // Replace // 1. finds those items in f.items that are not in newItems and creates synthetic deletes for them // 2. finds items in knownObjects that are not in newItems and creates synthetic deletes for them @@ -790,6 +820,10 @@ func NewRealFIFOWithOptions(opts RealFIFOOptions) *RealFIFO { if opts.KnownObjects == nil { panic("coding error: knownObjects must be provided when AtomicEvents is false") } + // If we are not emitting atomic events, we must not emit bookmark deltas. + if opts.EmitDeltaTypeBookmark { + panic("coding error: EmitDeltaTypeBookmark must be false when AtomicEvents is false") + } } f := &RealFIFO{ @@ -802,6 +836,7 @@ func NewRealFIFOWithOptions(opts RealFIFOOptions) *RealFIFO { transformer: opts.Transformer, batchSize: defaultBatchSize, emitAtomicEvents: opts.AtomicEvents, + emitDeltaTypeBookmark: opts.EmitDeltaTypeBookmark, unlockWhileProcessing: opts.UnlockWhileProcessing, identifier: opts.Identifier, metrics: newFIFOMetrics(opts.Identifier, opts.MetricsProvider), diff --git a/tools/cache/the_real_fifo_test.go b/tools/cache/the_real_fifo_test.go index 18363f1a0..6b9efbd6e 100644 --- a/tools/cache/the_real_fifo_test.go +++ b/tools/cache/the_real_fifo_test.go @@ -41,6 +41,7 @@ func (f *RealFIFO) getItems() []Delta { const closedFIFOName = "FIFO WAS CLOSED" const isAtomic = "ATOMIC REPLACED OBJ" +const isBookmark = "BOOKMARK OBJ" func popN(queue Queue, count int) []interface{} { result := []interface{}{} @@ -66,6 +67,9 @@ func testRealFIFOPop(f *RealFIFO) testFifoObject { } return testFifoObject{name: isAtomic, val: objs} } + if val.(Deltas).Newest().Type == Bookmark { + return testFifoObject{name: isBookmark} + } return val.(Deltas).Newest().Object.(testFifoObject) } @@ -231,6 +235,15 @@ func TestRealFIFOW_ReplaceMakesDeletionsForObjectsOnlyInQueue(t *testing.T) { {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: objV2}}, }, }, + { + name: "Bookmark object should not be added without atomic", + operations: func(f *RealFIFO) { + f.bookmarkTest(t, "123") + f.replaceTest(t, []interface{}{}, "0") + f.replaceTest(t, []interface{}{}, "1") + }, + expectedDeltas: Deltas{}, + }, } for _, tt := range table { tt := tt @@ -1131,7 +1144,7 @@ func TestRealFIFO_PopMultipleDeltaInBatch(t *testing.T) { for i, item := range tc.initialItems { initialItems[i] = item } - _ = f.Replace(initialItems, "") + _ = f.Replace(initialItems, "123") for _, action := range tc.actions { action(f) } @@ -1365,6 +1378,20 @@ func TestRealFIFO_ReplaceAtomic(t *testing.T) { }}, }, }, + { + name: "Bookmark object should not be included in Replace", + operations: func(f *RealFIFO) { + f.bookmarkTest(t, "123") + f.replaceTest(t, []interface{}{}, "1234") + }, + expectedDeltas: Deltas{ + {Type: Bookmark, Object: BookmarkInfo{ResourceVersion: "123"}}, + {Type: ReplacedAll, Object: ReplacedAllInfo{ + ResourceVersion: "1234", + Objects: []interface{}{}, + }}, + }, + }, } for _, tt := range table { tt := tt @@ -1379,6 +1406,7 @@ func TestRealFIFO_ReplaceAtomic(t *testing.T) { nil, ) f.emitAtomicEvents = true + f.emitDeltaTypeBookmark = true tt.operations(f) actualDeltasWithKnownObjects := popN(f, len(f.getItems())) actualAsDeltas := collapseDeltas(actualDeltasWithKnownObjects) @@ -1512,3 +1540,10 @@ func (f *RealFIFO) resyncTest(t *testing.T) { t.Fatalf("Test error on RealFIFO resync: %s", err) } } + +func (f *RealFIFO) bookmarkTest(t *testing.T, bookmark string) { + err := f.Bookmark(bookmark) + if err != nil { + t.Fatalf("Test error on RealFIFO bookmark: %s", err) + } +} diff --git a/tools/cache/thread_safe_store.go b/tools/cache/thread_safe_store.go index ef3a599a9..a306468ec 100644 --- a/tools/cache/thread_safe_store.go +++ b/tools/cache/thread_safe_store.go @@ -21,7 +21,9 @@ import ( "sync" "time" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/util/sets" + clientgofeaturegate "k8s.io/client-go/features" utiltrace "k8s.io/utils/trace" ) @@ -43,13 +45,19 @@ import ( type ThreadSafeStore interface { Add(key string, obj interface{}) Update(key string, obj interface{}) + // Delete is equivalent to calling DeleteWithObject(key, nil) however it is + // not recommended to use this function as it will not update the resource + // version of the store, possibly causing it to be out of date. Delete(key string) + DeleteWithObject(key string, obj interface{}) Get(key string) (item interface{}, exists bool) List() []interface{} ListKeys() []string Replace(map[string]interface{}, string) Index(indexName string, obj interface{}) ([]interface{}, error) IndexKeys(indexName, indexedValue string) ([]string, error) + Bookmark(rv string) + LastStoreSyncResourceVersion() string ListIndexFuncValues(name string) []string ByIndex(indexName, indexedValue string) ([]interface{}, error) GetIndexers() Indexers @@ -242,9 +250,15 @@ type threadSafeMap struct { // index implements the indexing functionality index *storeIndex + rv string } func (c *threadSafeMap) Transaction(txns ...ThreadSafeStoreTransaction) { + if len(txns) == 0 { + return + } + finalObj := txns[len(txns)-1].Object + rv, rvErr := rvFromObject(finalObj) c.lock.Lock() defer c.lock.Unlock() trace := utiltrace.New("ThreadSafeMap Transaction Process", @@ -262,6 +276,9 @@ func (c *threadSafeMap) Transaction(txns ...ThreadSafeStoreTransaction) { c.deleteLocked(txn.Key) } } + if rvErr == nil { + c.rv = rv + } } func (c *threadSafeMap) Add(key string, obj interface{}) { @@ -273,9 +290,13 @@ func (c *threadSafeMap) addLocked(key string, obj interface{}) { } func (c *threadSafeMap) Update(key string, obj interface{}) { + rv, rvErr := rvFromObject(obj) c.lock.Lock() defer c.lock.Unlock() c.updateLocked(key, obj) + if rvErr == nil { + c.rv = rv + } } func (c *threadSafeMap) updateLocked(key string, obj interface{}) { @@ -285,9 +306,21 @@ func (c *threadSafeMap) updateLocked(key string, obj interface{}) { } func (c *threadSafeMap) Delete(key string) { + c.DeleteWithObject(key, nil) +} + +func (c *threadSafeMap) DeleteWithObject(key string, obj interface{}) { + var rv string + var rvErr error + if obj != nil { + rv, rvErr = rvFromObject(obj) + } c.lock.Lock() defer c.lock.Unlock() c.deleteLocked(key) + if obj != nil && rvErr == nil { + c.rv = rv + } } func (c *threadSafeMap) deleteLocked(key string) { @@ -330,7 +363,7 @@ func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion st c.lock.Lock() defer c.lock.Unlock() c.items = items - + c.rv = resourceVersion // rebuild any index c.index.reset() for key, item := range c.items { @@ -338,6 +371,15 @@ func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion st } } +func rvFromObject(obj interface{}) (rv string, err error) { + meta, err := meta.Accessor(obj) + if err != nil { + return "", err + } + rv = meta.GetResourceVersion() + return rv, nil +} + // Index returns a list of items that match the given object on the index function. // Index is thread-safe so long as you treat all items as immutable. func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) { @@ -356,6 +398,24 @@ func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, return list, nil } +// LastStoreSyncResourceVersion returns the latest resource version that the store has seen. +func (c *threadSafeMap) LastStoreSyncResourceVersion() string { + // We cannot return the resource version if the AtomicFIFO feature gate is not enabled. + if !clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.AtomicFIFO) { + return "" + } + c.lock.RLock() + defer c.lock.RUnlock() + return c.rv +} + +// Bookmark sets the latest resource version that the store has seen. +func (c *threadSafeMap) Bookmark(rv string) { + c.lock.Lock() + defer c.lock.Unlock() + c.rv = rv +} + // ByIndex returns a list of the items whose indexed values in the given index include the given indexed value func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) { c.lock.RLock() diff --git a/tools/cache/thread_safe_store_test.go b/tools/cache/thread_safe_store_test.go index 44d620e53..8a9d9edb0 100644 --- a/tools/cache/thread_safe_store_test.go +++ b/tools/cache/thread_safe_store_test.go @@ -23,6 +23,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" ) @@ -166,6 +167,124 @@ func TestThreadSafeStoreIndexingFunctionsWithMultipleValues(t *testing.T) { assert.NoError(compare("baz", []string{})) } +func TestThreadSafeStoreRV(t *testing.T) { + t.Run("Initial state", func(t *testing.T) { + store := NewThreadSafeStore(Indexers{}, Indices{}).(*threadSafeMap) + if rv := store.LastStoreSyncResourceVersion(); rv != "" { + t.Errorf("Expected initial RV to be \"\", got %q", rv) + } + }) + + t.Run("Add Update and Delete", func(t *testing.T) { + store := NewThreadSafeStore(Indexers{}, Indices{}).(*threadSafeMap) + + // Add obj with RV "10" + store.Add("key1", &metav1.ObjectMeta{ResourceVersion: "10"}) + if rv := store.LastStoreSyncResourceVersion(); rv != "10" { + t.Errorf("Expected RV to be \"10\", got %q", rv) + } + + // Add obj with same RV "10" + store.Add("key3", &metav1.ObjectMeta{ResourceVersion: "10"}) + if rv := store.LastStoreSyncResourceVersion(); rv != "10" { + t.Errorf("Expected RV to remain \"10\" after adding same RV, got %q", rv) + } + + // Add obj with higher RV "20" + store.Add("key4", &metav1.ObjectMeta{ResourceVersion: "20"}) + if rv := store.LastStoreSyncResourceVersion(); rv != "20" { + t.Errorf("Expected RV to be \"20\", got %q", rv) + } + + // Delete an earlier object with a newer RV + store.DeleteWithObject("key4", &metav1.ObjectMeta{ResourceVersion: "30"}) + if rv := store.LastStoreSyncResourceVersion(); rv != "30" { + t.Errorf("Expected RV to become \"30\" after deletion %q", rv) + } + + // Add non-meta object, should not be updated + store.Add("key5", "just a string") + if rv := store.LastStoreSyncResourceVersion(); rv != "30" { + t.Errorf("Expected RV to remain \"30\" after adding non-meta object, got %q", rv) + } + + // Add obj with RV "40" + store.Add("key6", &metav1.ObjectMeta{ResourceVersion: "40"}) + if rv := store.LastStoreSyncResourceVersion(); rv != "40" { + t.Errorf("Expected RV to be \"40\", got %q", rv) + } + + // Nil delete + store.Delete("key6") + if rv := store.LastStoreSyncResourceVersion(); rv != "40" { + t.Errorf("Expected RV to remain \"40\" after delete, got %q", rv) + } + + txns := []ThreadSafeStoreTransaction{ + { + Transaction{ + Object: &metav1.ObjectMeta{ResourceVersion: "40"}, + Type: TransactionTypeUpdate, + }, + "key9", + }, + { + Transaction{ + Object: &metav1.ObjectMeta{ResourceVersion: "30"}, + Type: TransactionTypeUpdate, + }, + "key10", + }, + { + Transaction{ + Object: &metav1.ObjectMeta{ResourceVersion: "50"}, + Type: TransactionTypeUpdate, + }, + "key11", + }, + } + store.Transaction(txns...) + if rv := store.LastStoreSyncResourceVersion(); rv != "50" { + t.Errorf("Expected RV to be \"50\" after transaction, got %q", rv) + } + }) + + t.Run("Replace", func(t *testing.T) { + store := NewThreadSafeStore(Indexers{}, Indices{}).(*threadSafeMap) + store.Add("key1", &metav1.ObjectMeta{ResourceVersion: "10"}) + + if rv := store.LastStoreSyncResourceVersion(); rv != "10" { + t.Fatalf("Setup failed, expected RV \"10\", got %q", rv) + } + + items := map[string]interface{}{ + "key3": &metav1.ObjectMeta{ResourceVersion: "40"}, + "key2": &metav1.ObjectMeta{ResourceVersion: "30"}, + } + + store.Replace(items, "50") + + if rv := store.LastStoreSyncResourceVersion(); rv != "50" { + t.Errorf("Expected RV to be \"50\" after Replace(), got %q", rv) + } + }) + + t.Run("Delete", func(t *testing.T) { + store := NewThreadSafeStore(Indexers{}, Indices{}).(*threadSafeMap) + store.Add("key1", &metav1.ObjectMeta{ResourceVersion: "10"}) + + if rv := store.LastStoreSyncResourceVersion(); rv != "10" { + t.Fatalf("Setup failed, expected RV \"10\", got %q", rv) + } + + store.DeleteWithObject("key1", &metav1.ObjectMeta{ResourceVersion: "20"}) + + if rv := store.LastStoreSyncResourceVersion(); rv != "20" { + t.Errorf("Expected RV to be \"20\" after Delete(), got %q", rv) + } + }) +} + func BenchmarkIndexer(b *testing.B) { testIndexer := "testIndexer"