Use transformer in consistency checker

Kubernetes-commit: 9043cb4f1c3f893675872a957076d371fe4bbb53
This commit is contained in:
Jordan Liggitt
2025-11-26 15:19:00 -05:00
committed by Kubernetes Publisher
parent 0c76ee5d62
commit 25da70165e
4 changed files with 21 additions and 8 deletions

View File

@@ -726,9 +726,11 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) {
return false
}
var transformer TransformFunc
storeOpts := []StoreOption{}
if tr, ok := r.store.(TransformingStore); ok && tr.Transformer() != nil {
storeOpts = append(storeOpts, WithTransformer(tr.Transformer()))
transformer = tr.Transformer()
storeOpts = append(storeOpts, WithTransformer(transformer))
}
initTrace := trace.New("Reflector WatchList", trace.Field{Key: "name", Value: r.name})
@@ -788,7 +790,7 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) {
// we utilize the temporaryStore to ensure independence from the current store implementation.
// as of today, the store is implemented as a queue and will be drained by the higher-level
// component as soon as it finishes replacing the content.
checkWatchListDataConsistencyIfRequested(ctx, r.name, resourceVersion, r.listerWatcher.ListWithContext, temporaryStore.List)
checkWatchListDataConsistencyIfRequested(ctx, r.name, resourceVersion, r.listerWatcher.ListWithContext, transformer, temporaryStore.List)
if err := r.store.Replace(temporaryStore.List(), resourceVersion); err != nil {
return nil, fmt.Errorf("unable to sync watch-list result: %w", err)

View File

@@ -33,11 +33,11 @@ import (
//
// Note that this function will panic when data inconsistency is detected.
// This is intentional because we want to catch it in the CI.
func checkWatchListDataConsistencyIfRequested[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn consistencydetector.ListFunc[T], retrieveItemsFn consistencydetector.RetrieveItemsFunc[U]) {
func checkWatchListDataConsistencyIfRequested[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn consistencydetector.ListFunc[T], listItemTransformFunc func(interface{}) (interface{}, error), retrieveItemsFn consistencydetector.RetrieveItemsFunc[U]) {
if !consistencydetector.IsDataConsistencyDetectionForWatchListEnabled() {
return
}
// for informers we pass an empty ListOptions because
// listFn might be wrapped for filtering during informer construction.
consistencydetector.CheckDataConsistency(ctx, identity, lastSyncedResourceVersion, listFn, metav1.ListOptions{}, retrieveItemsFn)
consistencydetector.CheckDataConsistency(ctx, identity, lastSyncedResourceVersion, listFn, listItemTransformFunc, metav1.ListOptions{}, retrieveItemsFn)
}

View File

@@ -59,12 +59,14 @@ type RetrieveItemsFunc[U any] func() []U
type ListFunc[T runtime.Object] func(ctx context.Context, options metav1.ListOptions) (T, error)
type TransformFunc func(interface{}) (interface{}, error)
// CheckDataConsistency exists solely for testing purposes.
// we cannot use checkWatchListDataConsistencyIfRequested because
// it is guarded by an environmental variable.
// we cannot manipulate the environmental variable because
// it will affect other tests in this package.
func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn ListFunc[T], listOptions metav1.ListOptions, retrieveItemsFn RetrieveItemsFunc[U]) {
func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn ListFunc[T], listItemTransformFunc TransformFunc, listOptions metav1.ListOptions, retrieveItemsFn RetrieveItemsFunc[U]) {
if !canFormAdditionalListCall(lastSyncedResourceVersion, listOptions) {
klog.V(4).Infof("data consistency check for %s is enabled but the parameters (RV, ListOptions) doesn't allow for creating a valid LIST request. Skipping the data consistency check.", identity)
return
@@ -94,6 +96,15 @@ func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity
if err != nil {
panic(err) // this should never happen
}
if listItemTransformFunc != nil {
for i := range rawListItems {
obj, err := listItemTransformFunc(rawListItems[i])
if err != nil {
panic(err)
}
rawListItems[i] = obj.(runtime.Object)
}
}
listItems := toMetaObjectSliceOrDie(rawListItems)
sort.Sort(byUID(listItems))

View File

@@ -215,10 +215,10 @@ func TestDataConsistencyChecker(t *testing.T) {
if scenario.expectPanic {
require.Panics(t, func() {
CheckDataConsistency(ctx, "", scenario.lastSyncedResourceVersion, fakeLister.List, scenario.requestOptions, retrievedItemsFunc)
CheckDataConsistency(ctx, "", scenario.lastSyncedResourceVersion, fakeLister.List, nil, scenario.requestOptions, retrievedItemsFunc)
})
} else {
CheckDataConsistency(ctx, "", scenario.lastSyncedResourceVersion, fakeLister.List, scenario.requestOptions, retrievedItemsFunc)
CheckDataConsistency(ctx, "", scenario.lastSyncedResourceVersion, fakeLister.List, nil, scenario.requestOptions, retrievedItemsFunc)
}
require.Equal(t, scenario.expectedListRequests, fakeLister.counter)
@@ -235,7 +235,7 @@ func TestDataConsistencyCheckerRetry(t *testing.T) {
stopListErrorAfter := 5
fakeErrLister := &errorLister{stopErrorAfter: stopListErrorAfter}
CheckDataConsistency(ctx, "", "", fakeErrLister.List, metav1.ListOptions{}, retrievedItemsFunc)
CheckDataConsistency(ctx, "", "", fakeErrLister.List, nil, metav1.ListOptions{}, retrievedItemsFunc)
require.Equal(t, fakeErrLister.listCounter, fakeErrLister.stopErrorAfter)
}