Merge pull request #135580 from serathius/client-go-transformer

Embed proper interface in TransformingStore to ensure DeltaFIFO and RealFIFO are implementing it

Kubernetes-commit: 04e8064bccebd04981ee0094457550c9de4f92e3
This commit is contained in:
Kubernetes Publisher
2025-12-04 12:55:04 -08:00
11 changed files with 318 additions and 153 deletions

View File

@@ -708,20 +708,7 @@ func newInformer(clientState Store, options InformerOptions) Controller {
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
var fifo Queue
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
fifo = NewRealFIFOWithOptions(RealFIFOOptions{
KeyFunction: MetaNamespaceKeyFunc,
KnownObjects: clientState,
Transformer: options.Transform,
})
} else {
fifo = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
Transformer: options.Transform,
})
}
fifo := newQueueFIFO(clientState, options.Transform)
cfg := &Config{
Queue: fifo,
@@ -742,3 +729,19 @@ func newInformer(clientState Store, options InformerOptions) Controller {
}
return New(cfg)
}
func newQueueFIFO(clientState Store, transform TransformFunc) Queue {
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
return NewRealFIFOWithOptions(RealFIFOOptions{
KeyFunction: MetaNamespaceKeyFunc,
KnownObjects: clientState,
Transformer: transform,
})
} else {
return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
Transformer: transform,
})
}
}

View File

@@ -270,7 +270,8 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
}
var (
_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
_ = TransformingStore(&DeltaFIFO{}) // DeltaFIFO implements TransformingStore to allow memory optimizations
)
var (

View File

@@ -28,7 +28,7 @@ import (
// from the most recent Delta.
// You should treat the items returned inside the deltas as immutable.
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
func (f *DeltaFIFO) List() []interface{} {
func (f *DeltaFIFO) list() []interface{} {
f.lock.RLock()
defer f.lock.RUnlock()
return f.listLocked()
@@ -46,7 +46,7 @@ func (f *DeltaFIFO) listLocked() []interface{} {
// ListKeys returns a list of all the keys of the objects currently
// in the FIFO.
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
func (f *DeltaFIFO) ListKeys() []string {
func (f *DeltaFIFO) listKeys() []string {
f.lock.RLock()
defer f.lock.RUnlock()
list := make([]string, 0, len(f.queue))
@@ -60,19 +60,19 @@ func (f *DeltaFIFO) ListKeys() []string {
// or sets exists=false.
// You should treat the items returned inside the deltas as immutable.
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
func (f *DeltaFIFO) get(obj interface{}) (item interface{}, exists bool, err error) {
key, err := f.KeyOf(obj)
if err != nil {
return nil, false, KeyError{obj, err}
}
return f.GetByKey(key)
return f.getByKey(key)
}
// GetByKey returns the complete list of deltas for the requested item,
// setting exists=false if that list is empty.
// You should treat the items returned inside the deltas as immutable.
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
func (f *DeltaFIFO) getByKey(key string) (item interface{}, exists bool, err error) {
f.lock.RLock()
defer f.lock.RUnlock()
d, exists := f.items[key]
@@ -320,10 +320,10 @@ func TestDeltaFIFO_addUpdate(t *testing.T) {
f.Update(mkFifoObj("foo", 12))
f.Delete(mkFifoObj("foo", 15))
if e, a := []interface{}{mkFifoObj("foo", 15)}, f.List(); !reflect.DeepEqual(e, a) {
if e, a := []interface{}{mkFifoObj("foo", 15)}, f.list(); !reflect.DeepEqual(e, a) {
t.Errorf("Expected %+v, got %+v", e, a)
}
if e, a := []string{"foo"}, f.ListKeys(); !reflect.DeepEqual(e, a) {
if e, a := []string{"foo"}, f.listKeys(); !reflect.DeepEqual(e, a) {
t.Errorf("Expected %+v, got %+v", e, a)
}
@@ -349,7 +349,7 @@ func TestDeltaFIFO_addUpdate(t *testing.T) {
t.Errorf("Got second value %v", unexpected.val)
case <-time.After(50 * time.Millisecond):
}
_, exists, _ := f.Get(mkFifoObj("foo", ""))
_, exists, _ := f.get(mkFifoObj("foo", ""))
if exists {
t.Errorf("item did not get removed")
}
@@ -397,7 +397,7 @@ func TestDeltaFIFO_transformer(t *testing.T) {
must(f.Replace([]interface{}{}, ""))
// Should be empty
if e, a := []string{"foo", "bar"}, f.ListKeys(); !reflect.DeepEqual(e, a) {
if e, a := []string{"foo", "bar"}, f.listKeys(); !reflect.DeepEqual(e, a) {
t.Errorf("Expected %+v, got %+v", e, a)
}
@@ -507,7 +507,7 @@ func TestDeltaFIFO_addReplace(t *testing.T) {
t.Errorf("Got second value %v", unexpected.val)
case <-time.After(50 * time.Millisecond):
}
_, exists, _ := f.Get(mkFifoObj("foo", ""))
_, exists, _ := f.get(mkFifoObj("foo", ""))
if exists {
t.Errorf("item did not get removed")
}
@@ -991,7 +991,7 @@ func BenchmarkDeltaFIFOListKeys(b *testing.B) {
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_ = f.ListKeys()
_ = f.listKeys()
}
})
b.StopTimer()

View File

@@ -79,7 +79,7 @@ type ReflectorStore interface {
// TransformingStore is an optional interface that can be implemented by the provided store.
// If implemented on the provided store reflector will use the same transformer in its internal stores.
type TransformingStore interface {
Store
ReflectorStore
Transformer() TransformFunc
}
@@ -733,9 +733,11 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) {
return false
}
var transformer TransformFunc
storeOpts := []StoreOption{}
if tr, ok := r.store.(TransformingStore); ok && tr.Transformer() != nil {
storeOpts = append(storeOpts, WithTransformer(tr.Transformer()))
transformer = tr.Transformer()
storeOpts = append(storeOpts, WithTransformer(transformer))
}
initTrace := trace.New("Reflector WatchList", trace.Field{Key: "name", Value: r.name})
@@ -795,7 +797,7 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) {
// we utilize the temporaryStore to ensure independence from the current store implementation.
// as of today, the store is implemented as a queue and will be drained by the higher-level
// component as soon as it finishes replacing the content.
checkWatchListDataConsistencyIfRequested(ctx, r.name, resourceVersion, r.listerWatcher.ListWithContext, temporaryStore.List)
checkWatchListDataConsistencyIfRequested(ctx, r.name, resourceVersion, r.listerWatcher.ListWithContext, transformer, temporaryStore.List)
if err := r.store.Replace(temporaryStore.List(), resourceVersion); err != nil {
return nil, fmt.Errorf("unable to sync watch-list result: %w", err)

View File

@@ -33,11 +33,11 @@ import (
//
// Note that this function will panic when data inconsistency is detected.
// This is intentional because we want to catch it in the CI.
func checkWatchListDataConsistencyIfRequested[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn consistencydetector.ListFunc[T], retrieveItemsFn consistencydetector.RetrieveItemsFunc[U]) {
func checkWatchListDataConsistencyIfRequested[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn consistencydetector.ListFunc[T], listItemTransformFunc func(interface{}) (interface{}, error), retrieveItemsFn consistencydetector.RetrieveItemsFunc[U]) {
if !consistencydetector.IsDataConsistencyDetectionForWatchListEnabled() {
return
}
// for informers we pass an empty ListOptions because
// listFn might be wrapped for filtering during informer construction.
consistencydetector.CheckDataConsistency(ctx, identity, lastSyncedResourceVersion, listFn, metav1.ListOptions{}, retrieveItemsFn)
consistencydetector.CheckDataConsistency(ctx, identity, lastSyncedResourceVersion, listFn, listItemTransformFunc, metav1.ListOptions{}, retrieveItemsFn)
}

View File

@@ -0,0 +1,114 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"context"
"fmt"
"testing"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientfeatures "k8s.io/client-go/features"
clientfeaturestesting "k8s.io/client-go/features/testing"
"k8s.io/client-go/util/consistencydetector"
"k8s.io/klog/v2/ktesting"
)
func TestReflectorDataConsistencyDetector(t *testing.T) {
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true)
restore := consistencydetector.SetDataConsistencyDetectionForWatchListEnabledForTest(true)
defer restore()
markTransformed := func(obj interface{}) (interface{}, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return obj, nil
}
newPod := pod.DeepCopy()
if newPod.Labels == nil {
newPod.Labels = make(map[string]string)
}
newPod.Labels["transformed"] = "true"
return newPod, nil
}
for _, inOrder := range []bool{false, true} {
t.Run(fmt.Sprintf("InOrder=%v", inOrder), func(t *testing.T) {
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.InOrderInformers, inOrder)
for _, transformerEnabled := range []bool{false, true} {
var transformer TransformFunc
if transformerEnabled {
transformer = markTransformed
}
t.Run(fmt.Sprintf("Transformer=%v", transformerEnabled), func(t *testing.T) {
runTestReflectorDataConsistencyDetector(t, transformer)
})
}
})
}
}
func runTestReflectorDataConsistencyDetector(t *testing.T, transformer TransformFunc) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
store := NewStore(MetaNamespaceKeyFunc)
fifo := newQueueFIFO(store, transformer)
lw := &ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &v1.PodList{
ListMeta: metav1.ListMeta{ResourceVersion: "1"},
Items: []v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "pod-1", ResourceVersion: "1"}},
},
}, nil
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
w := watch.NewFake()
go func() {
w.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod-1", ResourceVersion: "1"}})
w.Action(watch.Bookmark, &v1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
ResourceVersion: "1",
Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"},
}})
}()
return w, nil
},
}
r := NewReflector(lw, &v1.Pod{}, fifo, 0)
go func() {
_ = wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
return r.LastSyncResourceVersion() != "", nil
})
cancel()
}()
err := r.ListAndWatchWithContext(ctx)
if err != nil {
t.Errorf("ListAndWatchWithContext returned error: %v", err)
}
}

View File

@@ -20,10 +20,12 @@ import (
"context"
"errors"
"fmt"
"maps"
"math/rand"
"net/http"
"reflect"
goruntime "runtime"
"slices"
"strconv"
"sync"
"sync/atomic"
@@ -1962,7 +1964,7 @@ func TestReflectorReplacesStoreOnUnsafeDelete(t *testing.T) {
s := NewFIFO(MetaNamespaceKeyFunc)
var replaceInvoked atomic.Int32
store := &fakeStore{
Store: s,
ReflectorStore: s,
beforeReplace: func(list []interface{}, rv string) {
// interested in the Replace call that happens after the Error event
if rv == lastExpectedRV {
@@ -2057,131 +2059,165 @@ func TestReflectorReplacesStoreOnUnsafeDelete(t *testing.T) {
}
func TestReflectorRespectStoreTransformer(t *testing.T) {
mkPod := func(id string, rv string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: id, ResourceVersion: rv},
Spec: v1.PodSpec{
Hostname: "test",
for name, test := range map[string]struct {
storeBuilder func(counter *atomic.Int32) ReflectorStore
items func(rs ReflectorStore) []interface{}
}{
"real-fifo": {
storeBuilder: func(counter *atomic.Int32) ReflectorStore {
return NewRealFIFO(MetaNamespaceKeyFunc, NewStore(MetaNamespaceKeyFunc), func(i interface{}) (interface{}, error) {
counter.Add(1)
cast := i.(*v1.Pod)
cast.Spec.Hostname = "transformed"
return cast, nil
})
},
}
}
preExisting1 := mkPod("foo-1", "1")
preExisting2 := mkPod("foo-2", "2")
pod3 := mkPod("foo-3", "3")
lastExpectedRV := "3"
events := []watch.Event{
{Type: watch.Added, Object: preExisting1},
{Type: watch.Added, Object: preExisting2},
{Type: watch.Bookmark, Object: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: lastExpectedRV,
Annotations: map[string]string{
metav1.InitialEventsAnnotationKey: "true",
},
items: func(rs ReflectorStore) []interface{} {
store := rs.(*RealFIFO)
objects := make(map[string]interface{})
for _, item := range store.getItems() {
key, _ := store.keyFunc(item.Object)
if item.Type == Deleted {
delete(objects, key)
} else {
objects[key] = item.Object
}
}
return slices.Collect(maps.Values(objects))
},
}},
{Type: watch.Added, Object: pod3},
}
s := NewFIFO(MetaNamespaceKeyFunc)
var replaceInvoked atomic.Int32
store := &fakeStore{
Store: s,
beforeReplace: func(list []interface{}, rv string) {
replaceInvoked.Add(1)
// Only two pods are present at the point when Replace is called.
if len(list) != 2 {
t.Errorf("unexpected nb of objects: expected 2 received %d", len(list))
},
"delta-fifo": {
storeBuilder: func(counter *atomic.Int32) ReflectorStore {
return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: MetaNamespaceKeyFunc,
Transformer: func(i interface{}) (interface{}, error) {
counter.Add(1)
cast := i.(*v1.Pod)
cast.Spec.Hostname = "transformed"
return cast, nil
},
})
},
items: func(rs ReflectorStore) []interface{} {
return rs.(*DeltaFIFO).list()
},
},
} {
t.Run(name, func(t *testing.T) {
mkPod := func(id string, rv string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: id, ResourceVersion: rv},
Spec: v1.PodSpec{
Hostname: "test",
},
}
}
for _, obj := range list {
cast := obj.(*v1.Pod)
preExisting1 := mkPod("foo-1", "1")
preExisting2 := mkPod("foo-2", "2")
pod3 := mkPod("foo-3", "3")
lastExpectedRV := "3"
events := []watch.Event{
{Type: watch.Added, Object: preExisting1},
{Type: watch.Added, Object: preExisting2},
{Type: watch.Bookmark, Object: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: lastExpectedRV,
Annotations: map[string]string{
metav1.InitialEventsAnnotationKey: "true",
},
},
}},
{Type: watch.Added, Object: pod3},
}
var transformerInvoked atomic.Int32
s := test.storeBuilder(&transformerInvoked)
var once sync.Once
lw := &ListWatch{
WatchFunc: func(metav1.ListOptions) (watch.Interface, error) {
fw := watch.NewFake()
go func() {
once.Do(func() {
for _, e := range events {
fw.Action(e.Type, e.Object)
}
})
}()
return fw, nil
},
// ListFunc should never be used in WatchList mode
ListFunc: func(metav1.ListOptions) (runtime.Object, error) {
return nil, errors.New("list call not expected in WatchList mode")
},
}
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true)
r := NewReflector(lw, &v1.Pod{}, s, 0)
ctx, cancel := context.WithCancel(context.Background())
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
r.RunWithContext(ctx)
}()
// wait for the RV to sync to the version returned by the final list
err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (done bool, err error) {
if rv := r.LastSyncResourceVersion(); rv == lastExpectedRV {
return true, nil
}
return false, nil
})
if err != nil {
t.Fatalf("reflector never caught up with expected revision: %q, err: %v", lastExpectedRV, err)
}
if want, got := lastExpectedRV, r.LastSyncResourceVersion(); want != got {
t.Errorf("expected LastSyncResourceVersion to be %q, but got: %q", want, got)
}
informerItems := test.items(s)
if want, got := 3, len(informerItems); want != got {
t.Errorf("expected informer to contain %d objects, but got: %d", want, got)
}
for _, item := range informerItems {
cast := item.(*v1.Pod)
if cast.Spec.Hostname != "transformed" {
t.Error("Object was not transformed prior to replacement")
}
}
},
afterReplace: func(rv string, err error) {},
transformer: func(i interface{}) (interface{}, error) {
cast := i.(*v1.Pod)
cast.Spec.Hostname = "transformed"
return cast, nil
},
}
var once sync.Once
lw := &ListWatch{
WatchFunc: func(metav1.ListOptions) (watch.Interface, error) {
fw := watch.NewFake()
go func() {
once.Do(func() {
for _, e := range events {
fw.Action(e.Type, e.Object)
}
})
}()
return fw, nil
},
// ListFunc should never be used in WatchList mode
ListFunc: func(metav1.ListOptions) (runtime.Object, error) {
return nil, errors.New("list call not expected in WatchList mode")
},
}
// Transformer should have been invoked twice for the initial sync in the informer on the temporary store,
// then twice on replace, then once on the following update.
if want, got := 5, int(transformerInvoked.Load()); want != got {
t.Errorf("expected transformer to be invoked %d times, but got: %d", want, got)
}
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true)
r := NewReflector(lw, &v1.Pod{}, store, 0)
ctx, cancel := context.WithCancel(context.Background())
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
r.RunWithContext(ctx)
}()
// wait for the RV to sync to the version returned by the final list
err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (done bool, err error) {
if rv := r.LastSyncResourceVersion(); rv == lastExpectedRV {
return true, nil
}
return false, nil
})
if err != nil {
t.Fatalf("reflector never caught up with expected revision: %q, err: %v", lastExpectedRV, err)
}
if want, got := lastExpectedRV, r.LastSyncResourceVersion(); want != got {
t.Errorf("expected LastSyncResourceVersion to be %q, but got: %q", want, got)
}
if want, got := 1, int(replaceInvoked.Load()); want != got {
t.Errorf("expected replace to be invoked %d times, but got: %d", want, got)
}
cancel()
select {
case <-doneCh:
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timed out waiting for Run to return")
cancel()
select {
case <-doneCh:
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timed out waiting for Run to return")
}
})
}
}
type fakeStore struct {
Store
ReflectorStore
beforeReplace func(list []interface{}, s string)
afterReplace func(rv string, err error)
transformer TransformFunc
}
func (f *fakeStore) Replace(list []interface{}, rv string) error {
f.beforeReplace(list, rv)
err := f.Store.Replace(list, rv)
err := f.ReflectorStore.Replace(list, rv)
f.afterReplace(rv, err)
return err
}
func (f *fakeStore) Transformer() TransformFunc {
return f.transformer
}
func BenchmarkExtractList(b *testing.B) {
_, _, podList := getPodListItems(0, fakeItemsNum)
_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)

View File

@@ -539,20 +539,7 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) {
s.startedLock.Lock()
defer s.startedLock.Unlock()
var fifo Queue
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
fifo = NewRealFIFOWithOptions(RealFIFOOptions{
KeyFunction: MetaNamespaceKeyFunc,
KnownObjects: s.indexer,
Transformer: s.transform,
})
} else {
fifo = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
Transformer: s.transform,
})
}
fifo := newQueueFIFO(s.indexer, s.transform)
cfg := &Config{
Queue: fifo,

View File

@@ -89,7 +89,8 @@ type RealFIFO struct {
}
var (
_ = Queue(&RealFIFO{}) // RealFIFO is a Queue
_ = Queue(&RealFIFO{}) // RealFIFO is a Queue
_ = TransformingStore(&RealFIFO{}) // RealFIFO implements TransformingStore to allow memory optimizations
)
// Close the queue.

View File

@@ -45,16 +45,28 @@ func IsDataConsistencyDetectionForWatchListEnabled() bool {
return dataConsistencyDetectionForWatchListEnabled
}
// SetDataConsistencyDetectionForWatchListEnabledForTest allows to enable/disable data consistency detection for testing purposes.
// It returns a function that restores the original value.
func SetDataConsistencyDetectionForWatchListEnabledForTest(enabled bool) func() {
original := dataConsistencyDetectionForWatchListEnabled
dataConsistencyDetectionForWatchListEnabled = enabled
return func() {
dataConsistencyDetectionForWatchListEnabled = original
}
}
type RetrieveItemsFunc[U any] func() []U
type ListFunc[T runtime.Object] func(ctx context.Context, options metav1.ListOptions) (T, error)
type TransformFunc func(interface{}) (interface{}, error)
// CheckDataConsistency exists solely for testing purposes.
// we cannot use checkWatchListDataConsistencyIfRequested because
// it is guarded by an environmental variable.
// we cannot manipulate the environmental variable because
// it will affect other tests in this package.
func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn ListFunc[T], listOptions metav1.ListOptions, retrieveItemsFn RetrieveItemsFunc[U]) {
func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn ListFunc[T], listItemTransformFunc TransformFunc, listOptions metav1.ListOptions, retrieveItemsFn RetrieveItemsFunc[U]) {
if !canFormAdditionalListCall(lastSyncedResourceVersion, listOptions) {
klog.V(4).Infof("data consistency check for %s is enabled but the parameters (RV, ListOptions) doesn't allow for creating a valid LIST request. Skipping the data consistency check.", identity)
return
@@ -84,6 +96,15 @@ func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity
if err != nil {
panic(err) // this should never happen
}
if listItemTransformFunc != nil {
for i := range rawListItems {
obj, err := listItemTransformFunc(rawListItems[i])
if err != nil {
panic(err)
}
rawListItems[i] = obj.(runtime.Object)
}
}
listItems := toMetaObjectSliceOrDie(rawListItems)
sort.Sort(byUID(listItems))

View File

@@ -215,10 +215,10 @@ func TestDataConsistencyChecker(t *testing.T) {
if scenario.expectPanic {
require.Panics(t, func() {
CheckDataConsistency(ctx, "", scenario.lastSyncedResourceVersion, fakeLister.List, scenario.requestOptions, retrievedItemsFunc)
CheckDataConsistency(ctx, "", scenario.lastSyncedResourceVersion, fakeLister.List, nil, scenario.requestOptions, retrievedItemsFunc)
})
} else {
CheckDataConsistency(ctx, "", scenario.lastSyncedResourceVersion, fakeLister.List, scenario.requestOptions, retrievedItemsFunc)
CheckDataConsistency(ctx, "", scenario.lastSyncedResourceVersion, fakeLister.List, nil, scenario.requestOptions, retrievedItemsFunc)
}
require.Equal(t, scenario.expectedListRequests, fakeLister.counter)
@@ -235,7 +235,7 @@ func TestDataConsistencyCheckerRetry(t *testing.T) {
stopListErrorAfter := 5
fakeErrLister := &errorLister{stopErrorAfter: stopListErrorAfter}
CheckDataConsistency(ctx, "", "", fakeErrLister.List, metav1.ListOptions{}, retrievedItemsFunc)
CheckDataConsistency(ctx, "", "", fakeErrLister.List, nil, metav1.ListOptions{}, retrievedItemsFunc)
require.Equal(t, fakeErrLister.listCounter, fakeErrLister.stopErrorAfter)
}