From 171ef8cd00a575ad4ef083a9c4748463056de9bd Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Wed, 26 Nov 2025 15:19:00 -0500 Subject: [PATCH] Use transformer in consistency checker Kubernetes-commit: 91368adbb556286942d996c60ab6cc39306415b7 --- tools/cache/reflector.go | 6 ++++-- tools/cache/reflector_data_consistency_detector.go | 4 ++-- .../data_consistency_detector.go | 13 ++++++++++++- .../data_consistency_detector_test.go | 6 +++--- 4 files changed, 21 insertions(+), 8 deletions(-) diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index c5529e7bd..0bc7747fe 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -733,9 +733,11 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) { return false } + var transformer TransformFunc storeOpts := []StoreOption{} if tr, ok := r.store.(TransformingStore); ok && tr.Transformer() != nil { - storeOpts = append(storeOpts, WithTransformer(tr.Transformer())) + transformer = tr.Transformer() + storeOpts = append(storeOpts, WithTransformer(transformer)) } initTrace := trace.New("Reflector WatchList", trace.Field{Key: "name", Value: r.name}) @@ -795,7 +797,7 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) { // we utilize the temporaryStore to ensure independence from the current store implementation. // as of today, the store is implemented as a queue and will be drained by the higher-level // component as soon as it finishes replacing the content. - checkWatchListDataConsistencyIfRequested(ctx, r.name, resourceVersion, r.listerWatcher.ListWithContext, temporaryStore.List) + checkWatchListDataConsistencyIfRequested(ctx, r.name, resourceVersion, r.listerWatcher.ListWithContext, transformer, temporaryStore.List) if err := r.store.Replace(temporaryStore.List(), resourceVersion); err != nil { return nil, fmt.Errorf("unable to sync watch-list result: %w", err) diff --git a/tools/cache/reflector_data_consistency_detector.go b/tools/cache/reflector_data_consistency_detector.go index a7e0d9c43..4119c78a6 100644 --- a/tools/cache/reflector_data_consistency_detector.go +++ b/tools/cache/reflector_data_consistency_detector.go @@ -33,11 +33,11 @@ import ( // // Note that this function will panic when data inconsistency is detected. // This is intentional because we want to catch it in the CI. -func checkWatchListDataConsistencyIfRequested[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn consistencydetector.ListFunc[T], retrieveItemsFn consistencydetector.RetrieveItemsFunc[U]) { +func checkWatchListDataConsistencyIfRequested[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn consistencydetector.ListFunc[T], listItemTransformFunc func(interface{}) (interface{}, error), retrieveItemsFn consistencydetector.RetrieveItemsFunc[U]) { if !consistencydetector.IsDataConsistencyDetectionForWatchListEnabled() { return } // for informers we pass an empty ListOptions because // listFn might be wrapped for filtering during informer construction. - consistencydetector.CheckDataConsistency(ctx, identity, lastSyncedResourceVersion, listFn, metav1.ListOptions{}, retrieveItemsFn) + consistencydetector.CheckDataConsistency(ctx, identity, lastSyncedResourceVersion, listFn, listItemTransformFunc, metav1.ListOptions{}, retrieveItemsFn) } diff --git a/util/consistencydetector/data_consistency_detector.go b/util/consistencydetector/data_consistency_detector.go index 2e883bf4a..72c0124a0 100644 --- a/util/consistencydetector/data_consistency_detector.go +++ b/util/consistencydetector/data_consistency_detector.go @@ -59,12 +59,14 @@ type RetrieveItemsFunc[U any] func() []U type ListFunc[T runtime.Object] func(ctx context.Context, options metav1.ListOptions) (T, error) +type TransformFunc func(interface{}) (interface{}, error) + // CheckDataConsistency exists solely for testing purposes. // we cannot use checkWatchListDataConsistencyIfRequested because // it is guarded by an environmental variable. // we cannot manipulate the environmental variable because // it will affect other tests in this package. -func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn ListFunc[T], listOptions metav1.ListOptions, retrieveItemsFn RetrieveItemsFunc[U]) { +func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn ListFunc[T], listItemTransformFunc TransformFunc, listOptions metav1.ListOptions, retrieveItemsFn RetrieveItemsFunc[U]) { if !canFormAdditionalListCall(lastSyncedResourceVersion, listOptions) { klog.V(4).Infof("data consistency check for %s is enabled but the parameters (RV, ListOptions) doesn't allow for creating a valid LIST request. Skipping the data consistency check.", identity) return @@ -94,6 +96,15 @@ func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity if err != nil { panic(err) // this should never happen } + if listItemTransformFunc != nil { + for i := range rawListItems { + obj, err := listItemTransformFunc(rawListItems[i]) + if err != nil { + panic(err) + } + rawListItems[i] = obj.(runtime.Object) + } + } listItems := toMetaObjectSliceOrDie(rawListItems) sort.Sort(byUID(listItems)) diff --git a/util/consistencydetector/data_consistency_detector_test.go b/util/consistencydetector/data_consistency_detector_test.go index bdac197a7..a5e8bffe7 100644 --- a/util/consistencydetector/data_consistency_detector_test.go +++ b/util/consistencydetector/data_consistency_detector_test.go @@ -215,10 +215,10 @@ func TestDataConsistencyChecker(t *testing.T) { if scenario.expectPanic { require.Panics(t, func() { - CheckDataConsistency(ctx, "", scenario.lastSyncedResourceVersion, fakeLister.List, scenario.requestOptions, retrievedItemsFunc) + CheckDataConsistency(ctx, "", scenario.lastSyncedResourceVersion, fakeLister.List, nil, scenario.requestOptions, retrievedItemsFunc) }) } else { - CheckDataConsistency(ctx, "", scenario.lastSyncedResourceVersion, fakeLister.List, scenario.requestOptions, retrievedItemsFunc) + CheckDataConsistency(ctx, "", scenario.lastSyncedResourceVersion, fakeLister.List, nil, scenario.requestOptions, retrievedItemsFunc) } require.Equal(t, scenario.expectedListRequests, fakeLister.counter) @@ -235,7 +235,7 @@ func TestDataConsistencyCheckerRetry(t *testing.T) { stopListErrorAfter := 5 fakeErrLister := &errorLister{stopErrorAfter: stopListErrorAfter} - CheckDataConsistency(ctx, "", "", fakeErrLister.List, metav1.ListOptions{}, retrievedItemsFunc) + CheckDataConsistency(ctx, "", "", fakeErrLister.List, nil, metav1.ListOptions{}, retrievedItemsFunc) require.Equal(t, fakeErrLister.listCounter, fakeErrLister.stopErrorAfter) }