Merge pull request #134827 from michaelasp/rvStore

Add Resource Version query and Bookmarks to thread safe store

Kubernetes-commit: 03da2455859aacb1e542e1e46a4c8b41750d55dc
This commit is contained in:
Kubernetes Publisher
2026-02-13 03:58:00 +05:30
11 changed files with 507 additions and 8 deletions

View File

@@ -653,6 +653,12 @@ func processDeltas(
return err
}
handler.OnDelete(obj)
case Bookmark:
info, ok := obj.(BookmarkInfo)
if !ok {
return fmt.Errorf("bookmark delta did not contain BookmarkInfo: %T", obj)
}
clientState.Bookmark(info.ResourceVersion)
}
}
return nil
@@ -853,6 +859,7 @@ func newQueueFIFO(logger klog.Logger, objectType any, clientState Store, transfo
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.AtomicFIFO) {
options.AtomicEvents = true
options.UnlockWhileProcessing = clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.UnlockWhileProcessingFIFO)
options.EmitDeltaTypeBookmark = true
} else {
options.KnownObjects = clientState
}

View File

@@ -921,7 +921,7 @@ func TestReplaceEvents(t *testing.T) {
fifoOptions.KnownObjects = store
}
fifo := NewRealFIFOWithOptions(fifoOptions)
recorder := newEventRecorder(store)
recorder := newEventRecorder(store, DeletionHandlingMetaNamespaceKeyFunc)
cfg := &Config{
Queue: fifo,
@@ -1058,7 +1058,7 @@ func TestResetWatch(t *testing.T) {
fifoOptions.KnownObjects = store
}
fifo := NewRealFIFOWithOptions(fifoOptions)
recorder := newEventRecorder(store)
recorder := newEventRecorder(store, DeletionHandlingMetaNamespaceKeyFunc)
cfg := &Config{
Queue: fifo,
@@ -1134,6 +1134,195 @@ func testResetWatch(t *testing.T, ctx context.Context, source *fcache.FakeContro
assert.False(t, addEvent.IsInInitialList)
}
func TestStoreResourceVersion(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
source := fcache.NewFakeControllerSource()
source.Add(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
Namespace: "default",
},
})
t.Cleanup(func() {
source.Shutdown()
})
store := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
fifo := NewRealFIFOWithOptions(RealFIFOOptions{
AtomicEvents: true,
})
recorder := newEventRecorder(store, DeletionHandlingMetaNamespaceKeyFunc)
logger, _ := ktesting.NewTestContext(t)
cfg := &Config{
Queue: fifo,
ListerWatcher: source,
ObjectType: &v1.Pod{},
FullResyncPeriod: 0,
Process: func(obj interface{}, isInInitialList bool) error {
if deltas, ok := obj.(Deltas); ok {
return processDeltas(logger, recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
}
return errors.New("object given as Process argument is not Deltas")
},
ProcessBatch: func(deltaList []Delta, isInInitialList bool) error {
return processDeltasInBatch(logger, recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
},
}
c := New(cfg)
go c.RunWithContext(ctx)
if !WaitForCacheSync(ctx.Done(), c.HasSynced) {
t.Fatal("Timed out waiting for cache sync")
}
testResourceVersion(t, ctx, source, recorder, store)
}
func testResourceVersion(t *testing.T, ctx context.Context, source *fcache.FakeControllerSource, recorder *eventRecorder, store Store) {
// The first event should be the initial add.
require.NoError(t, recorder.waitForEventCount(ctx, 1, 5*time.Second), "Controller failed to receive initial setup all events, got: %v", recorder.getHistory())
history := recorder.getHistory()
assert.Len(t, history, 1)
assert.Equal(t, "add", history[0].Action)
assert.Equal(t, "default/pod-1", history[0].Key)
assert.True(t, history[0].IsInInitialList)
recorder.clearHistory()
// Update the pod labels to increment the resource version.
source.Modify(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
Namespace: "default",
Labels: map[string]string{
"foo": "bar",
},
},
})
require.NoError(t, recorder.waitForEventCount(ctx, 1, 5*time.Second), "Controller failed to receive update event, got: %v", recorder.getHistory())
history = recorder.getHistory()
// The stored resource version should have changed.
assert.Equal(t, history[0].EventRV, store.LastStoreSyncResourceVersion())
recorder.clearHistory()
// Delete the pod.
source.Delete(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
Namespace: "default",
},
})
require.NoError(t, recorder.waitForEventCount(ctx, 1, 5*time.Second), "Controller failed to receive delete event, got: %v", recorder.getHistory())
history = recorder.getHistory()
// The stored resource version should have changed.
assert.Equal(t, history[0].EventRV, store.LastStoreSyncResourceVersion())
}
func TestStoreResourceVersionWithNonMetaTransform(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
source := fcache.NewFakeControllerSource()
source.Add(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
Namespace: "default",
},
})
t.Cleanup(func() {
source.Shutdown()
})
// Create a keyfunc that does not rely on object metadata.
keyFunc := func(obj interface{}) (string, error) {
return "pod-1", nil
}
store := NewStore(keyFunc)
fifo := NewRealFIFOWithOptions(RealFIFOOptions{
Transformer: func(obj interface{}) (interface{}, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return nil, fmt.Errorf("expected *v1.Pod, got %T", obj)
}
return pod.Name, nil
},
KeyFunction: keyFunc,
AtomicEvents: true,
})
recorder := newEventRecorder(store, keyFunc)
logger, _ := ktesting.NewTestContext(t)
cfg := &Config{
Queue: fifo,
ListerWatcher: source,
ObjectType: &v1.Pod{},
FullResyncPeriod: 0,
Process: func(obj interface{}, isInInitialList bool) error {
if deltas, ok := obj.(Deltas); ok {
return processDeltas(logger, recorder, store, deltas, isInInitialList, keyFunc)
}
return errors.New("object given as Process argument is not Deltas")
},
ProcessBatch: func(deltaList []Delta, isInInitialList bool) error {
return processDeltasInBatch(logger, recorder, store, deltaList, isInInitialList, keyFunc)
},
}
c := New(cfg)
go c.RunWithContext(ctx)
if !WaitForCacheSync(ctx.Done(), c.HasSynced) {
t.Fatal("Timed out waiting for cache sync")
}
testResourceVersionWithNonMetaTransform(t, ctx, source, recorder, store)
}
func testResourceVersionWithNonMetaTransform(t *testing.T, ctx context.Context, source *fcache.FakeControllerSource, recorder *eventRecorder, store Store) {
// The first event should be the initial add.
require.NoError(t, recorder.waitForEventCount(ctx, 1, 5*time.Second), "Controller failed to receive initial setup all events, got: %v", recorder.getHistory())
history := recorder.getHistory()
assert.Len(t, history, 1)
assert.Equal(t, "add", history[0].Action)
assert.Equal(t, "pod-1", history[0].Key)
assert.True(t, history[0].IsInInitialList)
// The stored resource version should stay at the initial value given by the replace event since
// the update event does not have metadata.
replaceRV := store.LastStoreSyncResourceVersion()
recorder.clearHistory()
// Update the pod labels to increment the resource version.
source.Modify(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
Namespace: "default",
Labels: map[string]string{
"foo": "bar",
},
},
})
require.NoError(t, recorder.waitForEventCount(ctx, 1, 5*time.Second), "Controller failed to receive update event, got: %v", recorder.getHistory())
// The stored resource version should not be changed due to the transform.
assert.Equal(t, replaceRV, store.LastStoreSyncResourceVersion())
recorder.clearHistory()
// Modify the pod but do not send the event.
source.ModifyDropWatch(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
Namespace: "default",
Labels: map[string]string{
"foo": "bar2",
},
},
})
// Trigger a replace by resetting the watch
source.ResetWatch()
require.NoError(t, recorder.waitForEventCount(ctx, 1, 5*time.Second), "Controller failed to receive replace event, got: %v", recorder.getHistory())
// The stored resource version should be updated to the replace events RV.
assert.Equal(t, "3", store.LastStoreSyncResourceVersion())
}
type eventRecord struct {
Action string
Key string
@@ -1146,14 +1335,16 @@ type eventRecorder struct {
historyLock sync.Mutex
history []eventRecord
store Store
keyFunc KeyFunc
updateCh chan bool
}
func newEventRecorder(store Store) *eventRecorder {
func newEventRecorder(store Store, keyFunc KeyFunc) *eventRecorder {
return &eventRecorder{
store: store,
updateCh: make(chan bool, 1),
keyFunc: keyFunc,
}
}
@@ -1170,7 +1361,7 @@ func (m *eventRecorder) OnDelete(obj interface{}) {
func (m *eventRecorder) record(action string, obj interface{}, isInInitialList bool) {
m.historyLock.Lock()
defer m.historyLock.Unlock()
key, _ := DeletionHandlingMetaNamespaceKeyFunc(obj)
key, _ := m.keyFunc(obj)
if deleted, ok := obj.(DeletedFinalStateUnknown); ok {
obj = deleted.Obj

View File

@@ -202,6 +202,9 @@ const (
// SyncAll indicates all known objects should be reprocessed.
// This event contains an object of type SyncAllInfo.
SyncAll DeltaType = "SyncAll"
// Bookmark is emitted on Bookmark calls and Replace calls to pass resource
// version information to the consumer.
Bookmark DeltaType = "Bookmark"
)
// Delta is a member of Deltas (a list of Delta objects) which

View File

@@ -137,6 +137,16 @@ func (c *ExpirationCache) List() []interface{} {
return list
}
// LastStoreSyncResourceVersion returns the latest resource version that the cache has seen.
func (c *ExpirationCache) LastStoreSyncResourceVersion() string {
return c.cacheStorage.LastStoreSyncResourceVersion()
}
// Bookmark observes a new resource version in the cache.
func (c *ExpirationCache) Bookmark(rv string) {
c.cacheStorage.Bookmark(rv)
}
// ListKeys returns a list of all keys in the expiration cache.
func (c *ExpirationCache) ListKeys() []string {
return c.cacheStorage.ListKeys()
@@ -170,7 +180,7 @@ func (c *ExpirationCache) Delete(obj interface{}) error {
}
c.expirationLock.Lock()
defer c.expirationLock.Unlock()
c.cacheStorage.Delete(key)
c.cacheStorage.DeleteWithObject(key, obj)
return nil
}

View File

@@ -35,6 +35,15 @@ func (f *FIFO) List() []interface{} {
return list
}
// LastStoreSyncResourceVersion is unimplemented for FIFO, only used in unit testing.
func (f *FIFO) LastStoreSyncResourceVersion() string {
return ""
}
// Bookmark is unimplemented for FIFO, only used in unit testing.
func (f *FIFO) Bookmark(rv string) {
}
// ListKeys returns a list of all the keys of the objects currently
// in the FIFO.
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.

View File

@@ -76,6 +76,12 @@ type ReflectorStore interface {
Resync() error
}
// ReflectorBookmarkStore is an optional interface that allows a store
// to be informed of bookmark events received by the reflector.
type ReflectorBookmarkStore interface {
Bookmark(resourceVersion string) error
}
// TransformingStore is an optional interface that can be implemented by the provided store.
// If implemented on the provided store reflector will use the same transformer in its internal stores.
type TransformingStore interface {
@@ -1006,6 +1012,13 @@ loop:
if meta.GetAnnotations()[metav1.InitialEventsAnnotationKey] == "true" {
watchListBookmarkReceived = true
}
// Propagate the resource version from the bookmark event to stores which indicate they want it
if bookmarkStore, ok := store.(ReflectorBookmarkStore); ok {
err := bookmarkStore.Bookmark(resourceVersion)
if err != nil {
utilruntime.HandleErrorWithContext(ctx, err, "Unable to send bookmark event to store", "reflector", name, "object", event.Object)
}
}
default:
utilruntime.HandleErrorWithContext(ctx, err, "Unknown watch event", "reflector", name, "event", event)
}

19
tools/cache/store.go vendored
View File

@@ -55,6 +55,15 @@ type Store interface {
// ListKeys returns a list of all the keys currently associated with non-empty accumulators
ListKeys() []string
// LastStoreSyncResourceVersion returns the latest resource version that the store has seen.
// This is used to determine the latest resource version the store has seen from objects
// observed being written to the store.
LastStoreSyncResourceVersion() string
// Bookmark observes a new resource version passed into it and
// will be used to get the latest resource version of the store.
Bookmark(rv string)
// Get returns the accumulator associated with the given object's key
Get(obj interface{}) (item interface{}, exists bool, err error)
@@ -276,7 +285,7 @@ func (c *cache) Delete(obj interface{}) error {
if err != nil {
return KeyError{obj, err}
}
c.cacheStorage.Delete(key)
c.cacheStorage.DeleteWithObject(key, obj)
return nil
}
@@ -292,6 +301,14 @@ func (c *cache) ListKeys() []string {
return c.cacheStorage.ListKeys()
}
func (c *cache) LastStoreSyncResourceVersion() string {
return c.cacheStorage.LastStoreSyncResourceVersion()
}
func (c *cache) Bookmark(rv string) {
c.cacheStorage.Bookmark(rv)
}
// GetIndexers returns the indexers of cache
func (c *cache) GetIndexers() Indexers {
return c.cacheStorage.GetIndexers()

View File

@@ -73,6 +73,10 @@ type RealFIFOOptions struct {
// MetricsProvider is used to create metrics for the FIFO.
MetricsProvider FIFOMetricsProvider
// EmitDeltaTypeBookmark is used to specify whether the RealFIFO will emit
// bookmark deltas or not. This can only be set if AtomicEvents is true.
EmitDeltaTypeBookmark bool
}
const (
@@ -148,6 +152,11 @@ type RealFIFO struct {
// metrics holds all metrics for this FIFO.
metrics *fifoMetrics
// emitDeltaTypeBookmark defines whether bookmark deltas should be emitted.
// This may only be set if emitAtomicEvents is true, which avoids events
// propagating out of RV order during Replace and Resync.
emitDeltaTypeBookmark bool
}
// ReplacedAllInfo is the object associated with a Delta of type=ReplacedAll
@@ -159,6 +168,12 @@ type ReplacedAllInfo struct {
Objects []interface{}
}
// BookmarkInfo is the object associated with a Delta of type=Bookmark
type BookmarkInfo struct {
// ResourceVersion is the resource version passed to the Bookmark() call that created this Delta
ResourceVersion string
}
// SyncAllInfo is the object associated with a Delta of type=SyncAll
// It is used to trigger a resync of the entire queue.
type SyncAllInfo struct{}
@@ -564,6 +579,21 @@ func (f *RealFIFO) PopBatch(processBatch ProcessBatchFunc, processSingle PopProc
})
}
func (f *RealFIFO) Bookmark(resourceVersion string) error {
if !f.emitDeltaTypeBookmark {
return nil
}
f.lock.Lock()
defer f.lock.Unlock()
f.items = append(f.items, Delta{
Type: Bookmark,
Object: BookmarkInfo{ResourceVersion: resourceVersion},
})
f.cond.Broadcast()
return nil
}
// Replace
// 1. finds those items in f.items that are not in newItems and creates synthetic deletes for them
// 2. finds items in knownObjects that are not in newItems and creates synthetic deletes for them
@@ -790,6 +820,10 @@ func NewRealFIFOWithOptions(opts RealFIFOOptions) *RealFIFO {
if opts.KnownObjects == nil {
panic("coding error: knownObjects must be provided when AtomicEvents is false")
}
// If we are not emitting atomic events, we must not emit bookmark deltas.
if opts.EmitDeltaTypeBookmark {
panic("coding error: EmitDeltaTypeBookmark must be false when AtomicEvents is false")
}
}
f := &RealFIFO{
@@ -802,6 +836,7 @@ func NewRealFIFOWithOptions(opts RealFIFOOptions) *RealFIFO {
transformer: opts.Transformer,
batchSize: defaultBatchSize,
emitAtomicEvents: opts.AtomicEvents,
emitDeltaTypeBookmark: opts.EmitDeltaTypeBookmark,
unlockWhileProcessing: opts.UnlockWhileProcessing,
identifier: opts.Identifier,
metrics: newFIFOMetrics(opts.Identifier, opts.MetricsProvider),

View File

@@ -41,6 +41,7 @@ func (f *RealFIFO) getItems() []Delta {
const closedFIFOName = "FIFO WAS CLOSED"
const isAtomic = "ATOMIC REPLACED OBJ"
const isBookmark = "BOOKMARK OBJ"
func popN(queue Queue, count int) []interface{} {
result := []interface{}{}
@@ -66,6 +67,9 @@ func testRealFIFOPop(f *RealFIFO) testFifoObject {
}
return testFifoObject{name: isAtomic, val: objs}
}
if val.(Deltas).Newest().Type == Bookmark {
return testFifoObject{name: isBookmark}
}
return val.(Deltas).Newest().Object.(testFifoObject)
}
@@ -231,6 +235,15 @@ func TestRealFIFOW_ReplaceMakesDeletionsForObjectsOnlyInQueue(t *testing.T) {
{Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: objV2}},
},
},
{
name: "Bookmark object should not be added without atomic",
operations: func(f *RealFIFO) {
f.bookmarkTest(t, "123")
f.replaceTest(t, []interface{}{}, "0")
f.replaceTest(t, []interface{}{}, "1")
},
expectedDeltas: Deltas{},
},
}
for _, tt := range table {
tt := tt
@@ -1131,7 +1144,7 @@ func TestRealFIFO_PopMultipleDeltaInBatch(t *testing.T) {
for i, item := range tc.initialItems {
initialItems[i] = item
}
_ = f.Replace(initialItems, "")
_ = f.Replace(initialItems, "123")
for _, action := range tc.actions {
action(f)
}
@@ -1365,6 +1378,20 @@ func TestRealFIFO_ReplaceAtomic(t *testing.T) {
}},
},
},
{
name: "Bookmark object should not be included in Replace",
operations: func(f *RealFIFO) {
f.bookmarkTest(t, "123")
f.replaceTest(t, []interface{}{}, "1234")
},
expectedDeltas: Deltas{
{Type: Bookmark, Object: BookmarkInfo{ResourceVersion: "123"}},
{Type: ReplacedAll, Object: ReplacedAllInfo{
ResourceVersion: "1234",
Objects: []interface{}{},
}},
},
},
}
for _, tt := range table {
tt := tt
@@ -1379,6 +1406,7 @@ func TestRealFIFO_ReplaceAtomic(t *testing.T) {
nil,
)
f.emitAtomicEvents = true
f.emitDeltaTypeBookmark = true
tt.operations(f)
actualDeltasWithKnownObjects := popN(f, len(f.getItems()))
actualAsDeltas := collapseDeltas(actualDeltasWithKnownObjects)
@@ -1512,3 +1540,10 @@ func (f *RealFIFO) resyncTest(t *testing.T) {
t.Fatalf("Test error on RealFIFO resync: %s", err)
}
}
func (f *RealFIFO) bookmarkTest(t *testing.T, bookmark string) {
err := f.Bookmark(bookmark)
if err != nil {
t.Fatalf("Test error on RealFIFO bookmark: %s", err)
}
}

View File

@@ -21,7 +21,9 @@ import (
"sync"
"time"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/util/sets"
clientgofeaturegate "k8s.io/client-go/features"
utiltrace "k8s.io/utils/trace"
)
@@ -43,13 +45,19 @@ import (
type ThreadSafeStore interface {
Add(key string, obj interface{})
Update(key string, obj interface{})
// Delete is equivalent to calling DeleteWithObject(key, nil) however it is
// not recommended to use this function as it will not update the resource
// version of the store, possibly causing it to be out of date.
Delete(key string)
DeleteWithObject(key string, obj interface{})
Get(key string) (item interface{}, exists bool)
List() []interface{}
ListKeys() []string
Replace(map[string]interface{}, string)
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexedValue string) ([]string, error)
Bookmark(rv string)
LastStoreSyncResourceVersion() string
ListIndexFuncValues(name string) []string
ByIndex(indexName, indexedValue string) ([]interface{}, error)
GetIndexers() Indexers
@@ -242,9 +250,15 @@ type threadSafeMap struct {
// index implements the indexing functionality
index *storeIndex
rv string
}
func (c *threadSafeMap) Transaction(txns ...ThreadSafeStoreTransaction) {
if len(txns) == 0 {
return
}
finalObj := txns[len(txns)-1].Object
rv, rvErr := rvFromObject(finalObj)
c.lock.Lock()
defer c.lock.Unlock()
trace := utiltrace.New("ThreadSafeMap Transaction Process",
@@ -262,6 +276,9 @@ func (c *threadSafeMap) Transaction(txns ...ThreadSafeStoreTransaction) {
c.deleteLocked(txn.Key)
}
}
if rvErr == nil {
c.rv = rv
}
}
func (c *threadSafeMap) Add(key string, obj interface{}) {
@@ -273,9 +290,13 @@ func (c *threadSafeMap) addLocked(key string, obj interface{}) {
}
func (c *threadSafeMap) Update(key string, obj interface{}) {
rv, rvErr := rvFromObject(obj)
c.lock.Lock()
defer c.lock.Unlock()
c.updateLocked(key, obj)
if rvErr == nil {
c.rv = rv
}
}
func (c *threadSafeMap) updateLocked(key string, obj interface{}) {
@@ -285,9 +306,21 @@ func (c *threadSafeMap) updateLocked(key string, obj interface{}) {
}
func (c *threadSafeMap) Delete(key string) {
c.DeleteWithObject(key, nil)
}
func (c *threadSafeMap) DeleteWithObject(key string, obj interface{}) {
var rv string
var rvErr error
if obj != nil {
rv, rvErr = rvFromObject(obj)
}
c.lock.Lock()
defer c.lock.Unlock()
c.deleteLocked(key)
if obj != nil && rvErr == nil {
c.rv = rv
}
}
func (c *threadSafeMap) deleteLocked(key string) {
@@ -330,7 +363,7 @@ func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion st
c.lock.Lock()
defer c.lock.Unlock()
c.items = items
c.rv = resourceVersion
// rebuild any index
c.index.reset()
for key, item := range c.items {
@@ -338,6 +371,15 @@ func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion st
}
}
func rvFromObject(obj interface{}) (rv string, err error) {
meta, err := meta.Accessor(obj)
if err != nil {
return "", err
}
rv = meta.GetResourceVersion()
return rv, nil
}
// Index returns a list of items that match the given object on the index function.
// Index is thread-safe so long as you treat all items as immutable.
func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
@@ -356,6 +398,24 @@ func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{},
return list, nil
}
// LastStoreSyncResourceVersion returns the latest resource version that the store has seen.
func (c *threadSafeMap) LastStoreSyncResourceVersion() string {
// We cannot return the resource version if the AtomicFIFO feature gate is not enabled.
if !clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.AtomicFIFO) {
return ""
}
c.lock.RLock()
defer c.lock.RUnlock()
return c.rv
}
// Bookmark sets the latest resource version that the store has seen.
func (c *threadSafeMap) Bookmark(rv string) {
c.lock.Lock()
defer c.lock.Unlock()
c.rv = rv
}
// ByIndex returns a list of the items whose indexed values in the given index include the given indexed value
func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
c.lock.RLock()

View File

@@ -23,6 +23,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
)
@@ -166,6 +167,124 @@ func TestThreadSafeStoreIndexingFunctionsWithMultipleValues(t *testing.T) {
assert.NoError(compare("baz", []string{}))
}
func TestThreadSafeStoreRV(t *testing.T) {
t.Run("Initial state", func(t *testing.T) {
store := NewThreadSafeStore(Indexers{}, Indices{}).(*threadSafeMap)
if rv := store.LastStoreSyncResourceVersion(); rv != "" {
t.Errorf("Expected initial RV to be \"\", got %q", rv)
}
})
t.Run("Add Update and Delete", func(t *testing.T) {
store := NewThreadSafeStore(Indexers{}, Indices{}).(*threadSafeMap)
// Add obj with RV "10"
store.Add("key1", &metav1.ObjectMeta{ResourceVersion: "10"})
if rv := store.LastStoreSyncResourceVersion(); rv != "10" {
t.Errorf("Expected RV to be \"10\", got %q", rv)
}
// Add obj with same RV "10"
store.Add("key3", &metav1.ObjectMeta{ResourceVersion: "10"})
if rv := store.LastStoreSyncResourceVersion(); rv != "10" {
t.Errorf("Expected RV to remain \"10\" after adding same RV, got %q", rv)
}
// Add obj with higher RV "20"
store.Add("key4", &metav1.ObjectMeta{ResourceVersion: "20"})
if rv := store.LastStoreSyncResourceVersion(); rv != "20" {
t.Errorf("Expected RV to be \"20\", got %q", rv)
}
// Delete an earlier object with a newer RV
store.DeleteWithObject("key4", &metav1.ObjectMeta{ResourceVersion: "30"})
if rv := store.LastStoreSyncResourceVersion(); rv != "30" {
t.Errorf("Expected RV to become \"30\" after deletion %q", rv)
}
// Add non-meta object, should not be updated
store.Add("key5", "just a string")
if rv := store.LastStoreSyncResourceVersion(); rv != "30" {
t.Errorf("Expected RV to remain \"30\" after adding non-meta object, got %q", rv)
}
// Add obj with RV "40"
store.Add("key6", &metav1.ObjectMeta{ResourceVersion: "40"})
if rv := store.LastStoreSyncResourceVersion(); rv != "40" {
t.Errorf("Expected RV to be \"40\", got %q", rv)
}
// Nil delete
store.Delete("key6")
if rv := store.LastStoreSyncResourceVersion(); rv != "40" {
t.Errorf("Expected RV to remain \"40\" after delete, got %q", rv)
}
txns := []ThreadSafeStoreTransaction{
{
Transaction{
Object: &metav1.ObjectMeta{ResourceVersion: "40"},
Type: TransactionTypeUpdate,
},
"key9",
},
{
Transaction{
Object: &metav1.ObjectMeta{ResourceVersion: "30"},
Type: TransactionTypeUpdate,
},
"key10",
},
{
Transaction{
Object: &metav1.ObjectMeta{ResourceVersion: "50"},
Type: TransactionTypeUpdate,
},
"key11",
},
}
store.Transaction(txns...)
if rv := store.LastStoreSyncResourceVersion(); rv != "50" {
t.Errorf("Expected RV to be \"50\" after transaction, got %q", rv)
}
})
t.Run("Replace", func(t *testing.T) {
store := NewThreadSafeStore(Indexers{}, Indices{}).(*threadSafeMap)
store.Add("key1", &metav1.ObjectMeta{ResourceVersion: "10"})
if rv := store.LastStoreSyncResourceVersion(); rv != "10" {
t.Fatalf("Setup failed, expected RV \"10\", got %q", rv)
}
items := map[string]interface{}{
"key3": &metav1.ObjectMeta{ResourceVersion: "40"},
"key2": &metav1.ObjectMeta{ResourceVersion: "30"},
}
store.Replace(items, "50")
if rv := store.LastStoreSyncResourceVersion(); rv != "50" {
t.Errorf("Expected RV to be \"50\" after Replace(), got %q", rv)
}
})
t.Run("Delete", func(t *testing.T) {
store := NewThreadSafeStore(Indexers{}, Indices{}).(*threadSafeMap)
store.Add("key1", &metav1.ObjectMeta{ResourceVersion: "10"})
if rv := store.LastStoreSyncResourceVersion(); rv != "10" {
t.Fatalf("Setup failed, expected RV \"10\", got %q", rv)
}
store.DeleteWithObject("key1", &metav1.ObjectMeta{ResourceVersion: "20"})
if rv := store.LastStoreSyncResourceVersion(); rv != "20" {
t.Errorf("Expected RV to be \"20\" after Delete(), got %q", rv)
}
})
}
func BenchmarkIndexer(b *testing.B) {
testIndexer := "testIndexer"