From 71776a84bd3800573d7ea4e65e573737b5d461a6 Mon Sep 17 00:00:00 2001 From: Valerian Roche Date: Thu, 26 Jun 2025 22:00:41 -0400 Subject: [PATCH] [client-go #1415] Use transformer from provided store within internal stores in reflector to limit memory usage bursts Signed-off-by: Valerian Roche Kubernetes-commit: 585ed0a5cb378e794e4775bc846d5309ca65f2c6 --- tools/cache/delta_fifo.go | 5 ++ tools/cache/reflector.go | 14 ++++- tools/cache/reflector_test.go | 113 ++++++++++++++++++++++++++++++++++ tools/cache/store.go | 37 ++++++++++- tools/cache/store_test.go | 16 +++++ tools/cache/the_real_fifo.go | 10 ++- 6 files changed, 190 insertions(+), 5 deletions(-) diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index 264d7559..9d9e238c 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -303,6 +303,11 @@ func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) { return f.keyFunc(obj) } +// Transformer implements the TransformingStore interface. +func (f *DeltaFIFO) Transformer() TransformFunc { + return f.transformer +} + // HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first, // or the first batch of items inserted by Replace() has been popped. func (f *DeltaFIFO) HasSynced() bool { diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index 47626610..7907942a 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -76,6 +76,13 @@ type ReflectorStore interface { Resync() error } +// TransformingStore is an optional interface that can be implemented by the provided store. +// If implemented on the provided store reflector will use the same transformer in its internal stores. +type TransformingStore interface { + Store + Transformer() TransformFunc +} + // Reflector watches a specified resource and causes all changes to be reflected in the given store. type Reflector struct { // name identifies this reflector. By default, it will be a file:line if possible. @@ -718,6 +725,11 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) { return false } + storeOpts := []StoreOption{} + if tr, ok := r.store.(TransformingStore); ok && tr.Transformer() != nil { + storeOpts = append(storeOpts, WithTransformer(tr.Transformer())) + } + initTrace := trace.New("Reflector WatchList", trace.Field{Key: "name", Value: r.name}) defer initTrace.LogIfLong(10 * time.Second) for { @@ -729,7 +741,7 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) { resourceVersion = "" lastKnownRV := r.rewatchResourceVersion() - temporaryStore = NewStore(DeletionHandlingMetaNamespaceKeyFunc) + temporaryStore = NewStore(DeletionHandlingMetaNamespaceKeyFunc, storeOpts...) // TODO(#115478): large "list", slow clients, slow network, p&f // might slow down streaming and eventually fail. // maybe in such a case we should retry with an increased timeout? diff --git a/tools/cache/reflector_test.go b/tools/cache/reflector_test.go index bc3168af..558cdcee 100644 --- a/tools/cache/reflector_test.go +++ b/tools/cache/reflector_test.go @@ -2056,10 +2056,119 @@ func TestReflectorReplacesStoreOnUnsafeDelete(t *testing.T) { } } +func TestReflectorRespectStoreTransformer(t *testing.T) { + mkPod := func(id string, rv string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: id, ResourceVersion: rv}, + Spec: v1.PodSpec{ + Hostname: "test", + }, + } + } + + preExisting1 := mkPod("foo-1", "1") + preExisting2 := mkPod("foo-2", "2") + pod3 := mkPod("foo-3", "3") + + lastExpectedRV := "3" + events := []watch.Event{ + {Type: watch.Added, Object: preExisting1}, + {Type: watch.Added, Object: preExisting2}, + {Type: watch.Bookmark, Object: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: lastExpectedRV, + Annotations: map[string]string{ + metav1.InitialEventsAnnotationKey: "true", + }, + }, + }}, + {Type: watch.Added, Object: pod3}, + } + + s := NewFIFO(MetaNamespaceKeyFunc) + var replaceInvoked atomic.Int32 + store := &fakeStore{ + Store: s, + beforeReplace: func(list []interface{}, rv string) { + replaceInvoked.Add(1) + // Only two pods are present at the point when Replace is called. + if len(list) != 2 { + t.Errorf("unexpected nb of objects: expected 2 received %d", len(list)) + } + for _, obj := range list { + cast := obj.(*v1.Pod) + if cast.Spec.Hostname != "transformed" { + t.Error("Object was not transformed prior to replacement") + } + } + }, + afterReplace: func(rv string, err error) {}, + transformer: func(i interface{}) (interface{}, error) { + cast := i.(*v1.Pod) + cast.Spec.Hostname = "transformed" + return cast, nil + }, + } + + var once sync.Once + lw := &ListWatch{ + WatchFunc: func(metav1.ListOptions) (watch.Interface, error) { + fw := watch.NewFake() + go func() { + once.Do(func() { + for _, e := range events { + fw.Action(e.Type, e.Object) + } + }) + }() + return fw, nil + }, + // ListFunc should never be used in WatchList mode + ListFunc: func(metav1.ListOptions) (runtime.Object, error) { + return nil, errors.New("list call not expected in WatchList mode") + }, + } + + clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true) + r := NewReflector(lw, &v1.Pod{}, store, 0) + ctx, cancel := context.WithCancel(context.Background()) + doneCh := make(chan struct{}) + go func() { + defer close(doneCh) + r.RunWithContext(ctx) + }() + + // wait for the RV to sync to the version returned by the final list + err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (done bool, err error) { + if rv := r.LastSyncResourceVersion(); rv == lastExpectedRV { + return true, nil + } + return false, nil + }) + if err != nil { + t.Fatalf("reflector never caught up with expected revision: %q, err: %v", lastExpectedRV, err) + } + + if want, got := lastExpectedRV, r.LastSyncResourceVersion(); want != got { + t.Errorf("expected LastSyncResourceVersion to be %q, but got: %q", want, got) + } + if want, got := 1, int(replaceInvoked.Load()); want != got { + t.Errorf("expected replace to be invoked %d times, but got: %d", want, got) + } + + cancel() + select { + case <-doneCh: + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("timed out waiting for Run to return") + } +} + type fakeStore struct { Store beforeReplace func(list []interface{}, s string) afterReplace func(rv string, err error) + transformer TransformFunc } func (f *fakeStore) Replace(list []interface{}, rv string) error { @@ -2069,6 +2178,10 @@ func (f *fakeStore) Replace(list []interface{}, rv string) error { return err } +func (f *fakeStore) Transformer() TransformFunc { + return f.transformer +} + func BenchmarkExtractList(b *testing.B) { _, _, podList := getPodListItems(0, fakeItemsNum) _, _, configMapList := getConfigmapListItems(0, fakeItemsNum) diff --git a/tools/cache/store.go b/tools/cache/store.go index 5cc3f42e..1d068580 100644 --- a/tools/cache/store.go +++ b/tools/cache/store.go @@ -161,6 +161,8 @@ type cache struct { // keyFunc is used to make the key for objects stored in and retrieved from items, and // should be deterministic. keyFunc KeyFunc + // Called with every object put in the cache. + transformer TransformFunc } var _ Store = &cache{} @@ -171,6 +173,12 @@ func (c *cache) Add(obj interface{}) error { if err != nil { return KeyError{obj, err} } + if c.transformer != nil { + obj, err = c.transformer(obj) + if err != nil { + return fmt.Errorf("transforming: %w", err) + } + } c.cacheStorage.Add(key, obj) return nil } @@ -181,6 +189,12 @@ func (c *cache) Update(obj interface{}) error { if err != nil { return KeyError{obj, err} } + if c.transformer != nil { + obj, err = c.transformer(obj) + if err != nil { + return fmt.Errorf("transforming: %w", err) + } + } c.cacheStorage.Update(key, obj) return nil } @@ -267,6 +281,13 @@ func (c *cache) Replace(list []interface{}, resourceVersion string) error { if err != nil { return KeyError{item, err} } + + if c.transformer != nil { + item, err = c.transformer(item) + if err != nil { + return fmt.Errorf("transforming: %w", err) + } + } items[key] = item } c.cacheStorage.Replace(items, resourceVersion) @@ -278,12 +299,24 @@ func (c *cache) Resync() error { return nil } +type StoreOption = func(*cache) + +func WithTransformer(transformer TransformFunc) StoreOption { + return func(c *cache) { + c.transformer = transformer + } +} + // NewStore returns a Store implemented simply with a map and a lock. -func NewStore(keyFunc KeyFunc) Store { - return &cache{ +func NewStore(keyFunc KeyFunc, opts ...StoreOption) Store { + c := &cache{ cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}), keyFunc: keyFunc, } + for _, opt := range opts { + opt(c) + } + return c } // NewIndexer returns an Indexer implemented simply with a map and a lock. diff --git a/tools/cache/store_test.go b/tools/cache/store_test.go index 232211ee..f560fb1a 100644 --- a/tools/cache/store_test.go +++ b/tools/cache/store_test.go @@ -18,6 +18,7 @@ package cache import ( "errors" + "sync/atomic" "testing" "k8s.io/apimachinery/pkg/util/sets" @@ -143,6 +144,21 @@ func TestCache(t *testing.T) { doTestStore(t, NewStore(testStoreKeyFunc)) } +func TestCacheWithTransformer(t *testing.T) { + transformerCalled := &atomic.Bool{} + doTestStore(t, NewStore(testStoreKeyFunc, WithTransformer(func(i interface{}) (interface{}, error) { + transformerCalled.Store(true) + obj, ok := i.(testStoreObject) + if !ok { + return nil, errors.New("wrong object type") + } + return obj, nil + }))) + if !transformerCalled.Load() { + t.Error("informer was not called") + } +} + func TestFIFOCache(t *testing.T) { doTestStore(t, NewFIFO(testStoreKeyFunc)) } diff --git a/tools/cache/the_real_fifo.go b/tools/cache/the_real_fifo.go index 9be14ff3..ef322bea 100644 --- a/tools/cache/the_real_fifo.go +++ b/tools/cache/the_real_fifo.go @@ -18,11 +18,12 @@ package cache import ( "fmt" + "sync" + "time" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" utiltrace "k8s.io/utils/trace" - "sync" - "time" ) // RealFIFO is a Queue in which every notification from the Reflector is passed @@ -389,6 +390,11 @@ func (f *RealFIFO) Resync() error { return nil } +// Transformer implements the TransformingStore interface. +func (f *RealFIFO) Transformer() TransformFunc { + return f.transformer +} + // NewRealFIFO returns a Store which can be used to queue up items to // process. func NewRealFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter, transformer TransformFunc) *RealFIFO {