From 7d2828a10551bb0d4d028a38c2aec9a7671c8f4f Mon Sep 17 00:00:00 2001 From: Michael Aspinwall Date: Mon, 8 Dec 2025 16:18:46 +0000 Subject: [PATCH] Add test for replace events in controller Kubernetes-commit: 6b977a28e90a1f369de91b07a13213ec05a5a426 --- tools/cache/controller_test.go | 235 +++++++++++++++++++++++++++++++++ 1 file changed, 235 insertions(+) diff --git a/tools/cache/controller_test.go b/tools/cache/controller_test.go index 21a11c908..9bddef67d 100644 --- a/tools/cache/controller_test.go +++ b/tools/cache/controller_test.go @@ -21,12 +21,14 @@ import ( "errors" "fmt" "math/rand" + "sort" "sync" "sync/atomic" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -38,6 +40,8 @@ import ( "k8s.io/klog/v2/ktesting" "sigs.k8s.io/randfill" + + "k8s.io/apimachinery/pkg/api/meta" ) func Example() { @@ -893,3 +897,234 @@ func (m *mockTxnStore) Transaction(txns ...Transaction) *TransactionError { m.succeedCount = len(txns) return nil } + +func TestReplaceEvents(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + source := fcache.NewFakeControllerSource() + store := NewStore(DeletionHandlingMetaNamespaceKeyFunc) + t.Cleanup(func() { + source.Shutdown() + }) + + recorder := newEventRecorder(store) + + fifo := NewRealFIFOWithOptions(RealFIFOOptions{ + KnownObjects: store, + }) + + cfg := &Config{ + Queue: fifo, + ListerWatcher: source, + ObjectType: &v1.Pod{}, + FullResyncPeriod: 0, + + Process: func(obj interface{}, isInInitialList bool) error { + if deltas, ok := obj.(Deltas); ok { + return processDeltas(recorder, store, deltas, isInInitialList) + } + return errors.New("object given as Process argument is not Deltas") + }, + ProcessBatch: func(deltaList []Delta, isInInitialList bool) error { + return processDeltasInBatch(recorder, store, deltaList, isInInitialList) + }, + } + + c := New(cfg) + go c.RunWithContext(ctx) + if !WaitForCacheSync(ctx.Done(), c.HasSynced) { + t.Fatal("Timed out waiting for cache sync") + } + testReplaceEvents(t, ctx, fifo, recorder, store) +} + +func testReplaceEvents(t *testing.T, ctx context.Context, fifo Queue, m *eventRecorder, store Store) { + tcs := []struct { + name string + initialObjs []metav1.Object + replacedItems []interface{} + replaceRV string + expectedHistory []eventRecord + }{ + { + name: "Create with no initial objects", + replacedItems: []interface{}{ + &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod-1", Namespace: "default", ResourceVersion: "1"}}, + }, + expectedHistory: []eventRecord{ + {Action: "add", Key: "default/pod-1", EventRV: "1", StoreRV: "1"}, + }, + replaceRV: "1", + }, + { + name: "Delete all objects", + initialObjs: []metav1.Object{ + &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod-1", Namespace: "default", ResourceVersion: "1"}}, + &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod-3", Namespace: "default", ResourceVersion: "2"}}, + }, + replacedItems: []interface{}{}, + expectedHistory: []eventRecord{ + {Action: "delete", Key: "default/pod-1", EventRV: "1", StoreRV: ""}, + {Action: "delete", Key: "default/pod-3", EventRV: "2", StoreRV: ""}, + }, + replaceRV: "2", + }, + { + name: "Create, update, delete", + initialObjs: []metav1.Object{ + &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod-1", Namespace: "default", ResourceVersion: "1"}}, + &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod-2", Namespace: "default", Labels: map[string]string{"ver": "1"}, ResourceVersion: "2"}}, + &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod-3", Namespace: "default", ResourceVersion: "3"}}, + }, + replacedItems: []interface{}{ + &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod-1", Namespace: "default", ResourceVersion: "4"}}, + &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod-2", Namespace: "default", Labels: map[string]string{"ver": "2"}, ResourceVersion: "5"}}, + &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod-4", Namespace: "default", ResourceVersion: "6"}}, + }, + expectedHistory: []eventRecord{ + {Action: "delete", Key: "default/pod-3", EventRV: "3", StoreRV: ""}, + {Action: "update", Key: "default/pod-1", EventRV: "4", StoreRV: "4"}, + {Action: "update", Key: "default/pod-2", EventRV: "5", StoreRV: "5"}, + {Action: "add", Key: "default/pod-4", EventRV: "6", StoreRV: "6"}, + }, + replaceRV: "6", + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + for _, obj := range tc.initialObjs { + require.NoError(t, fifo.Add(obj), "failed to add") + } + + // Wait for all create events to appear + require.NoError(t, m.waitForEventCount(ctx, len(tc.initialObjs), 5*time.Second), "Controller failed to receive initial setup all events, got: %v", m.getHistory()) + + // Clear history so we only compare what happens during the RESYNC/REPLACE + m.clearHistory() + + // Run replace + require.NoError(t, fifo.Replace(tc.replacedItems, tc.replaceRV), "failed to replace") + + // Wait for all expected events to be received + require.NoError(t, m.waitForEventCount(ctx, len(tc.expectedHistory), 5*time.Second), "Controller failed to receive all events, got: %v", m.getHistory()) + + items := sortItemsByKey(store.List(), DeletionHandlingMetaNamespaceKeyFunc) + assert.Equal(t, tc.replacedItems, items) + assert.Equal(t, tc.expectedHistory, m.getHistory()) + }) + } +} + +type eventRecord struct { + Action string + Key string + EventRV string + StoreRV string +} + +type eventRecorder struct { + historyLock sync.Mutex + history []eventRecord + store Store + + updateCh chan bool +} + +func newEventRecorder(store Store) *eventRecorder { + return &eventRecorder{ + store: store, + updateCh: make(chan bool, 1), + } +} + +func (m *eventRecorder) OnAdd(obj interface{}, _ bool) { + m.record("add", obj) +} +func (m *eventRecorder) OnUpdate(_, obj interface{}) { + m.record("update", obj) +} +func (m *eventRecorder) OnDelete(obj interface{}) { + m.record("delete", obj) +} + +func (m *eventRecorder) record(action string, obj interface{}) { + m.historyLock.Lock() + defer m.historyLock.Unlock() + key, _ := DeletionHandlingMetaNamespaceKeyFunc(obj) + + if deleted, ok := obj.(DeletedFinalStateUnknown); ok { + obj = deleted.Obj + } + + eventRV := "" + if accessor, err := meta.Accessor(obj); err == nil { + eventRV = accessor.GetResourceVersion() + } + + storeRV := "" + if storeObj, exists, err := m.store.GetByKey(key); err == nil && exists { + if accessor, err := meta.Accessor(storeObj); err == nil { + storeRV = accessor.GetResourceVersion() + } + } + + m.history = append(m.history, eventRecord{ + Action: action, + Key: key, + EventRV: eventRV, + StoreRV: storeRV, + }) + select { + case m.updateCh <- true: + default: + } +} + +func (m *eventRecorder) clearHistory() { + m.historyLock.Lock() + defer m.historyLock.Unlock() + m.history = []eventRecord{} +} + +func (m *eventRecorder) getHistory() []eventRecord { + m.historyLock.Lock() + historyCopy := make([]eventRecord, len(m.history)) + copy(historyCopy, m.history) + m.historyLock.Unlock() + + return historyCopy +} + +func (m *eventRecorder) waitForEventCount(ctx context.Context, count int, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + for { + m.historyLock.Lock() + currentCount := len(m.history) + m.historyLock.Unlock() + + if currentCount >= count { + return nil + } + + select { + case <-m.updateCh: + continue + case <-ctx.Done(): + return fmt.Errorf("context canceled waiting for %d events, currently have %d", count, currentCount) + } + } +} + +func sortItemsByKey(items []interface{}, keyFunc KeyFunc) []interface{} { + sort.Slice(items, func(i, j int) bool { + keyI, _ := keyFunc(items[i]) + keyJ, _ := keyFunc(items[j]) + return keyI < keyJ + }) + return items +}