Add rv query to store

Kubernetes-commit: 9c6ac550efa6e721c85b0f1074f26f734286fba9
This commit is contained in:
Michael Aspinwall
2026-01-28 22:29:39 +00:00
committed by Kubernetes Publisher
parent 7aaede7872
commit d883e606da
6 changed files with 413 additions and 7 deletions

View File

@@ -921,7 +921,7 @@ func TestReplaceEvents(t *testing.T) {
fifoOptions.KnownObjects = store
}
fifo := NewRealFIFOWithOptions(fifoOptions)
recorder := newEventRecorder(store)
recorder := newEventRecorder(store, DeletionHandlingMetaNamespaceKeyFunc)
cfg := &Config{
Queue: fifo,
@@ -1058,7 +1058,7 @@ func TestResetWatch(t *testing.T) {
fifoOptions.KnownObjects = store
}
fifo := NewRealFIFOWithOptions(fifoOptions)
recorder := newEventRecorder(store)
recorder := newEventRecorder(store, DeletionHandlingMetaNamespaceKeyFunc)
cfg := &Config{
Queue: fifo,
@@ -1134,6 +1134,195 @@ func testResetWatch(t *testing.T, ctx context.Context, source *fcache.FakeContro
assert.False(t, addEvent.IsInInitialList)
}
func TestStoreResourceVersion(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
source := fcache.NewFakeControllerSource()
source.Add(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
Namespace: "default",
},
})
t.Cleanup(func() {
source.Shutdown()
})
store := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
fifo := NewRealFIFOWithOptions(RealFIFOOptions{
AtomicEvents: true,
})
recorder := newEventRecorder(store, DeletionHandlingMetaNamespaceKeyFunc)
logger, _ := ktesting.NewTestContext(t)
cfg := &Config{
Queue: fifo,
ListerWatcher: source,
ObjectType: &v1.Pod{},
FullResyncPeriod: 0,
Process: func(obj interface{}, isInInitialList bool) error {
if deltas, ok := obj.(Deltas); ok {
return processDeltas(logger, recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
}
return errors.New("object given as Process argument is not Deltas")
},
ProcessBatch: func(deltaList []Delta, isInInitialList bool) error {
return processDeltasInBatch(logger, recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
},
}
c := New(cfg)
go c.RunWithContext(ctx)
if !WaitForCacheSync(ctx.Done(), c.HasSynced) {
t.Fatal("Timed out waiting for cache sync")
}
testResourceVersion(t, ctx, source, recorder, store)
}
func testResourceVersion(t *testing.T, ctx context.Context, source *fcache.FakeControllerSource, recorder *eventRecorder, store Store) {
// The first event should be the initial add.
require.NoError(t, recorder.waitForEventCount(ctx, 1, 5*time.Second), "Controller failed to receive initial setup all events, got: %v", recorder.getHistory())
history := recorder.getHistory()
assert.Len(t, history, 1)
assert.Equal(t, "add", history[0].Action)
assert.Equal(t, "default/pod-1", history[0].Key)
assert.True(t, history[0].IsInInitialList)
recorder.clearHistory()
// Update the pod labels to increment the resource version.
source.Modify(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
Namespace: "default",
Labels: map[string]string{
"foo": "bar",
},
},
})
require.NoError(t, recorder.waitForEventCount(ctx, 1, 5*time.Second), "Controller failed to receive update event, got: %v", recorder.getHistory())
history = recorder.getHistory()
// The stored resource version should have changed.
assert.Equal(t, history[0].EventRV, store.LastStoreSyncResourceVersion())
recorder.clearHistory()
// Delete the pod.
source.Delete(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
Namespace: "default",
},
})
require.NoError(t, recorder.waitForEventCount(ctx, 1, 5*time.Second), "Controller failed to receive delete event, got: %v", recorder.getHistory())
history = recorder.getHistory()
// The stored resource version should have changed.
assert.Equal(t, history[0].EventRV, store.LastStoreSyncResourceVersion())
}
func TestStoreResourceVersionWithNonMetaTransform(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
source := fcache.NewFakeControllerSource()
source.Add(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
Namespace: "default",
},
})
t.Cleanup(func() {
source.Shutdown()
})
// Create a keyfunc that does not rely on object metadata.
keyFunc := func(obj interface{}) (string, error) {
return "pod-1", nil
}
store := NewStore(keyFunc)
fifo := NewRealFIFOWithOptions(RealFIFOOptions{
Transformer: func(obj interface{}) (interface{}, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return nil, fmt.Errorf("expected *v1.Pod, got %T", obj)
}
return pod.Name, nil
},
KeyFunction: keyFunc,
AtomicEvents: true,
})
recorder := newEventRecorder(store, keyFunc)
logger, _ := ktesting.NewTestContext(t)
cfg := &Config{
Queue: fifo,
ListerWatcher: source,
ObjectType: &v1.Pod{},
FullResyncPeriod: 0,
Process: func(obj interface{}, isInInitialList bool) error {
if deltas, ok := obj.(Deltas); ok {
return processDeltas(logger, recorder, store, deltas, isInInitialList, keyFunc)
}
return errors.New("object given as Process argument is not Deltas")
},
ProcessBatch: func(deltaList []Delta, isInInitialList bool) error {
return processDeltasInBatch(logger, recorder, store, deltaList, isInInitialList, keyFunc)
},
}
c := New(cfg)
go c.RunWithContext(ctx)
if !WaitForCacheSync(ctx.Done(), c.HasSynced) {
t.Fatal("Timed out waiting for cache sync")
}
testResourceVersionWithNonMetaTransform(t, ctx, source, recorder, store)
}
func testResourceVersionWithNonMetaTransform(t *testing.T, ctx context.Context, source *fcache.FakeControllerSource, recorder *eventRecorder, store Store) {
// The first event should be the initial add.
require.NoError(t, recorder.waitForEventCount(ctx, 1, 5*time.Second), "Controller failed to receive initial setup all events, got: %v", recorder.getHistory())
history := recorder.getHistory()
assert.Len(t, history, 1)
assert.Equal(t, "add", history[0].Action)
assert.Equal(t, "pod-1", history[0].Key)
assert.True(t, history[0].IsInInitialList)
// The stored resource version should stay at the initial value given by the replace event since
// the update event does not have metadata.
replaceRV := store.LastStoreSyncResourceVersion()
recorder.clearHistory()
// Update the pod labels to increment the resource version.
source.Modify(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
Namespace: "default",
Labels: map[string]string{
"foo": "bar",
},
},
})
require.NoError(t, recorder.waitForEventCount(ctx, 1, 5*time.Second), "Controller failed to receive update event, got: %v", recorder.getHistory())
// The stored resource version should not be changed due to the transform.
assert.Equal(t, replaceRV, store.LastStoreSyncResourceVersion())
recorder.clearHistory()
// Modify the pod but do not send the event.
source.ModifyDropWatch(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
Namespace: "default",
Labels: map[string]string{
"foo": "bar2",
},
},
})
// Trigger a replace by resetting the watch
source.ResetWatch()
require.NoError(t, recorder.waitForEventCount(ctx, 1, 5*time.Second), "Controller failed to receive replace event, got: %v", recorder.getHistory())
// The stored resource version should be updated to the replace events RV.
assert.Equal(t, "3", store.LastStoreSyncResourceVersion())
}
type eventRecord struct {
Action string
Key string
@@ -1146,14 +1335,16 @@ type eventRecorder struct {
historyLock sync.Mutex
history []eventRecord
store Store
keyFunc KeyFunc
updateCh chan bool
}
func newEventRecorder(store Store) *eventRecorder {
func newEventRecorder(store Store, keyFunc KeyFunc) *eventRecorder {
return &eventRecorder{
store: store,
updateCh: make(chan bool, 1),
keyFunc: keyFunc,
}
}
@@ -1170,7 +1361,7 @@ func (m *eventRecorder) OnDelete(obj interface{}) {
func (m *eventRecorder) record(action string, obj interface{}, isInInitialList bool) {
m.historyLock.Lock()
defer m.historyLock.Unlock()
key, _ := DeletionHandlingMetaNamespaceKeyFunc(obj)
key, _ := m.keyFunc(obj)
if deleted, ok := obj.(DeletedFinalStateUnknown); ok {
obj = deleted.Obj

View File

@@ -137,6 +137,16 @@ func (c *ExpirationCache) List() []interface{} {
return list
}
// LastStoreSyncResourceVersion returns the latest resource version that the cache has seen.
func (c *ExpirationCache) LastStoreSyncResourceVersion() string {
return c.cacheStorage.LastStoreSyncResourceVersion()
}
// Bookmark observes a new resource version in the cache.
func (c *ExpirationCache) Bookmark(rv string) {
c.cacheStorage.Bookmark(rv)
}
// ListKeys returns a list of all keys in the expiration cache.
func (c *ExpirationCache) ListKeys() []string {
return c.cacheStorage.ListKeys()
@@ -170,7 +180,7 @@ func (c *ExpirationCache) Delete(obj interface{}) error {
}
c.expirationLock.Lock()
defer c.expirationLock.Unlock()
c.cacheStorage.Delete(key)
c.cacheStorage.DeleteWithObject(key, obj)
return nil
}

View File

@@ -35,6 +35,15 @@ func (f *FIFO) List() []interface{} {
return list
}
// LastStoreSyncResourceVersion is unimplemented for FIFO, only used in unit testing.
func (f *FIFO) LastStoreSyncResourceVersion() string {
return ""
}
// Bookmark is unimplemented for FIFO, only used in unit testing.
func (f *FIFO) Bookmark(rv string) {
}
// ListKeys returns a list of all the keys of the objects currently
// in the FIFO.
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.

19
tools/cache/store.go vendored
View File

@@ -55,6 +55,15 @@ type Store interface {
// ListKeys returns a list of all the keys currently associated with non-empty accumulators
ListKeys() []string
// LastStoreSyncResourceVersion returns the latest resource version that the store has seen.
// This is used to determine the latest resource version the store has seen from objects
// observed being written to the store.
LastStoreSyncResourceVersion() string
// Bookmark observes a new resource version passed into it and
// will be used to get the latest resource version of the store.
Bookmark(rv string)
// Get returns the accumulator associated with the given object's key
Get(obj interface{}) (item interface{}, exists bool, err error)
@@ -276,7 +285,7 @@ func (c *cache) Delete(obj interface{}) error {
if err != nil {
return KeyError{obj, err}
}
c.cacheStorage.Delete(key)
c.cacheStorage.DeleteWithObject(key, obj)
return nil
}
@@ -292,6 +301,14 @@ func (c *cache) ListKeys() []string {
return c.cacheStorage.ListKeys()
}
func (c *cache) LastStoreSyncResourceVersion() string {
return c.cacheStorage.LastStoreSyncResourceVersion()
}
func (c *cache) Bookmark(rv string) {
c.cacheStorage.Bookmark(rv)
}
// GetIndexers returns the indexers of cache
func (c *cache) GetIndexers() Indexers {
return c.cacheStorage.GetIndexers()

View File

@@ -21,7 +21,9 @@ import (
"sync"
"time"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/util/sets"
clientgofeaturegate "k8s.io/client-go/features"
utiltrace "k8s.io/utils/trace"
)
@@ -43,13 +45,19 @@ import (
type ThreadSafeStore interface {
Add(key string, obj interface{})
Update(key string, obj interface{})
// Delete is equivalent to calling DeleteWithObject(key, nil) however it is
// not recommended to use this function as it will not update the resource
// version of the store, possibly causing it to be out of date.
Delete(key string)
DeleteWithObject(key string, obj interface{})
Get(key string) (item interface{}, exists bool)
List() []interface{}
ListKeys() []string
Replace(map[string]interface{}, string)
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexedValue string) ([]string, error)
Bookmark(rv string)
LastStoreSyncResourceVersion() string
ListIndexFuncValues(name string) []string
ByIndex(indexName, indexedValue string) ([]interface{}, error)
GetIndexers() Indexers
@@ -242,9 +250,15 @@ type threadSafeMap struct {
// index implements the indexing functionality
index *storeIndex
rv string
}
func (c *threadSafeMap) Transaction(txns ...ThreadSafeStoreTransaction) {
if len(txns) == 0 {
return
}
finalObj := txns[len(txns)-1].Object
rv, rvErr := rvFromObject(finalObj)
c.lock.Lock()
defer c.lock.Unlock()
trace := utiltrace.New("ThreadSafeMap Transaction Process",
@@ -262,6 +276,9 @@ func (c *threadSafeMap) Transaction(txns ...ThreadSafeStoreTransaction) {
c.deleteLocked(txn.Key)
}
}
if rvErr == nil {
c.rv = rv
}
}
func (c *threadSafeMap) Add(key string, obj interface{}) {
@@ -273,9 +290,13 @@ func (c *threadSafeMap) addLocked(key string, obj interface{}) {
}
func (c *threadSafeMap) Update(key string, obj interface{}) {
rv, rvErr := rvFromObject(obj)
c.lock.Lock()
defer c.lock.Unlock()
c.updateLocked(key, obj)
if rvErr == nil {
c.rv = rv
}
}
func (c *threadSafeMap) updateLocked(key string, obj interface{}) {
@@ -285,9 +306,21 @@ func (c *threadSafeMap) updateLocked(key string, obj interface{}) {
}
func (c *threadSafeMap) Delete(key string) {
c.DeleteWithObject(key, nil)
}
func (c *threadSafeMap) DeleteWithObject(key string, obj interface{}) {
var rv string
var rvErr error
if obj != nil {
rv, rvErr = rvFromObject(obj)
}
c.lock.Lock()
defer c.lock.Unlock()
c.deleteLocked(key)
if obj != nil && rvErr == nil {
c.rv = rv
}
}
func (c *threadSafeMap) deleteLocked(key string) {
@@ -330,7 +363,7 @@ func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion st
c.lock.Lock()
defer c.lock.Unlock()
c.items = items
c.rv = resourceVersion
// rebuild any index
c.index.reset()
for key, item := range c.items {
@@ -338,6 +371,15 @@ func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion st
}
}
func rvFromObject(obj interface{}) (rv string, err error) {
meta, err := meta.Accessor(obj)
if err != nil {
return "", err
}
rv = meta.GetResourceVersion()
return rv, nil
}
// Index returns a list of items that match the given object on the index function.
// Index is thread-safe so long as you treat all items as immutable.
func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
@@ -356,6 +398,24 @@ func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{},
return list, nil
}
// LastStoreSyncResourceVersion returns the latest resource version that the store has seen.
func (c *threadSafeMap) LastStoreSyncResourceVersion() string {
// We cannot return the resource version if the AtomicFIFO feature gate is not enabled.
if !clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.AtomicFIFO) {
return ""
}
c.lock.RLock()
defer c.lock.RUnlock()
return c.rv
}
// Bookmark sets the latest resource version that the store has seen.
func (c *threadSafeMap) Bookmark(rv string) {
c.lock.Lock()
defer c.lock.Unlock()
c.rv = rv
}
// ByIndex returns a list of the items whose indexed values in the given index include the given indexed value
func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
c.lock.RLock()

View File

@@ -23,6 +23,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
)
@@ -166,6 +167,124 @@ func TestThreadSafeStoreIndexingFunctionsWithMultipleValues(t *testing.T) {
assert.NoError(compare("baz", []string{}))
}
func TestThreadSafeStoreRV(t *testing.T) {
t.Run("Initial state", func(t *testing.T) {
store := NewThreadSafeStore(Indexers{}, Indices{}).(*threadSafeMap)
if rv := store.LastStoreSyncResourceVersion(); rv != "" {
t.Errorf("Expected initial RV to be \"\", got %q", rv)
}
})
t.Run("Add Update and Delete", func(t *testing.T) {
store := NewThreadSafeStore(Indexers{}, Indices{}).(*threadSafeMap)
// Add obj with RV "10"
store.Add("key1", &metav1.ObjectMeta{ResourceVersion: "10"})
if rv := store.LastStoreSyncResourceVersion(); rv != "10" {
t.Errorf("Expected RV to be \"10\", got %q", rv)
}
// Add obj with same RV "10"
store.Add("key3", &metav1.ObjectMeta{ResourceVersion: "10"})
if rv := store.LastStoreSyncResourceVersion(); rv != "10" {
t.Errorf("Expected RV to remain \"10\" after adding same RV, got %q", rv)
}
// Add obj with higher RV "20"
store.Add("key4", &metav1.ObjectMeta{ResourceVersion: "20"})
if rv := store.LastStoreSyncResourceVersion(); rv != "20" {
t.Errorf("Expected RV to be \"20\", got %q", rv)
}
// Delete an earlier object with a newer RV
store.DeleteWithObject("key4", &metav1.ObjectMeta{ResourceVersion: "30"})
if rv := store.LastStoreSyncResourceVersion(); rv != "30" {
t.Errorf("Expected RV to become \"30\" after deletion %q", rv)
}
// Add non-meta object, should not be updated
store.Add("key5", "just a string")
if rv := store.LastStoreSyncResourceVersion(); rv != "30" {
t.Errorf("Expected RV to remain \"30\" after adding non-meta object, got %q", rv)
}
// Add obj with RV "40"
store.Add("key6", &metav1.ObjectMeta{ResourceVersion: "40"})
if rv := store.LastStoreSyncResourceVersion(); rv != "40" {
t.Errorf("Expected RV to be \"40\", got %q", rv)
}
// Nil delete
store.Delete("key6")
if rv := store.LastStoreSyncResourceVersion(); rv != "40" {
t.Errorf("Expected RV to remain \"40\" after delete, got %q", rv)
}
txns := []ThreadSafeStoreTransaction{
{
Transaction{
Object: &metav1.ObjectMeta{ResourceVersion: "40"},
Type: TransactionTypeUpdate,
},
"key9",
},
{
Transaction{
Object: &metav1.ObjectMeta{ResourceVersion: "30"},
Type: TransactionTypeUpdate,
},
"key10",
},
{
Transaction{
Object: &metav1.ObjectMeta{ResourceVersion: "50"},
Type: TransactionTypeUpdate,
},
"key11",
},
}
store.Transaction(txns...)
if rv := store.LastStoreSyncResourceVersion(); rv != "50" {
t.Errorf("Expected RV to be \"50\" after transaction, got %q", rv)
}
})
t.Run("Replace", func(t *testing.T) {
store := NewThreadSafeStore(Indexers{}, Indices{}).(*threadSafeMap)
store.Add("key1", &metav1.ObjectMeta{ResourceVersion: "10"})
if rv := store.LastStoreSyncResourceVersion(); rv != "10" {
t.Fatalf("Setup failed, expected RV \"10\", got %q", rv)
}
items := map[string]interface{}{
"key3": &metav1.ObjectMeta{ResourceVersion: "40"},
"key2": &metav1.ObjectMeta{ResourceVersion: "30"},
}
store.Replace(items, "50")
if rv := store.LastStoreSyncResourceVersion(); rv != "50" {
t.Errorf("Expected RV to be \"50\" after Replace(), got %q", rv)
}
})
t.Run("Delete", func(t *testing.T) {
store := NewThreadSafeStore(Indexers{}, Indices{}).(*threadSafeMap)
store.Add("key1", &metav1.ObjectMeta{ResourceVersion: "10"})
if rv := store.LastStoreSyncResourceVersion(); rv != "10" {
t.Fatalf("Setup failed, expected RV \"10\", got %q", rv)
}
store.DeleteWithObject("key1", &metav1.ObjectMeta{ResourceVersion: "20"})
if rv := store.LastStoreSyncResourceVersion(); rv != "20" {
t.Errorf("Expected RV to be \"20\" after Delete(), got %q", rv)
}
})
}
func BenchmarkIndexer(b *testing.B) {
testIndexer := "testIndexer"