Merge pull request #131799 from DataDog/client-go-reflector-transformer

[client-go #1415] Use transformer from provided store within internal stores in reflector to limit memory usage bursts

Kubernetes-commit: 3f0a1c6c7f02f33649e58996408b1501fce3c0ee
This commit is contained in:
Kubernetes Publisher 2025-06-30 08:36:31 -07:00
commit 5e8db78e04
6 changed files with 190 additions and 5 deletions

View File

@ -303,6 +303,11 @@ func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
return f.keyFunc(obj)
}
// Transformer implements the TransformingStore interface.
func (f *DeltaFIFO) Transformer() TransformFunc {
return f.transformer
}
// HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first,
// or the first batch of items inserted by Replace() has been popped.
func (f *DeltaFIFO) HasSynced() bool {

View File

@ -76,6 +76,13 @@ type ReflectorStore interface {
Resync() error
}
// TransformingStore is an optional interface that can be implemented by the provided store.
// If implemented on the provided store reflector will use the same transformer in its internal stores.
type TransformingStore interface {
Store
Transformer() TransformFunc
}
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
type Reflector struct {
// name identifies this reflector. By default, it will be a file:line if possible.
@ -718,6 +725,11 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) {
return false
}
storeOpts := []StoreOption{}
if tr, ok := r.store.(TransformingStore); ok && tr.Transformer() != nil {
storeOpts = append(storeOpts, WithTransformer(tr.Transformer()))
}
initTrace := trace.New("Reflector WatchList", trace.Field{Key: "name", Value: r.name})
defer initTrace.LogIfLong(10 * time.Second)
for {
@ -729,7 +741,7 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) {
resourceVersion = ""
lastKnownRV := r.rewatchResourceVersion()
temporaryStore = NewStore(DeletionHandlingMetaNamespaceKeyFunc)
temporaryStore = NewStore(DeletionHandlingMetaNamespaceKeyFunc, storeOpts...)
// TODO(#115478): large "list", slow clients, slow network, p&f
// might slow down streaming and eventually fail.
// maybe in such a case we should retry with an increased timeout?

View File

@ -2056,10 +2056,119 @@ func TestReflectorReplacesStoreOnUnsafeDelete(t *testing.T) {
}
}
func TestReflectorRespectStoreTransformer(t *testing.T) {
mkPod := func(id string, rv string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: id, ResourceVersion: rv},
Spec: v1.PodSpec{
Hostname: "test",
},
}
}
preExisting1 := mkPod("foo-1", "1")
preExisting2 := mkPod("foo-2", "2")
pod3 := mkPod("foo-3", "3")
lastExpectedRV := "3"
events := []watch.Event{
{Type: watch.Added, Object: preExisting1},
{Type: watch.Added, Object: preExisting2},
{Type: watch.Bookmark, Object: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: lastExpectedRV,
Annotations: map[string]string{
metav1.InitialEventsAnnotationKey: "true",
},
},
}},
{Type: watch.Added, Object: pod3},
}
s := NewFIFO(MetaNamespaceKeyFunc)
var replaceInvoked atomic.Int32
store := &fakeStore{
Store: s,
beforeReplace: func(list []interface{}, rv string) {
replaceInvoked.Add(1)
// Only two pods are present at the point when Replace is called.
if len(list) != 2 {
t.Errorf("unexpected nb of objects: expected 2 received %d", len(list))
}
for _, obj := range list {
cast := obj.(*v1.Pod)
if cast.Spec.Hostname != "transformed" {
t.Error("Object was not transformed prior to replacement")
}
}
},
afterReplace: func(rv string, err error) {},
transformer: func(i interface{}) (interface{}, error) {
cast := i.(*v1.Pod)
cast.Spec.Hostname = "transformed"
return cast, nil
},
}
var once sync.Once
lw := &ListWatch{
WatchFunc: func(metav1.ListOptions) (watch.Interface, error) {
fw := watch.NewFake()
go func() {
once.Do(func() {
for _, e := range events {
fw.Action(e.Type, e.Object)
}
})
}()
return fw, nil
},
// ListFunc should never be used in WatchList mode
ListFunc: func(metav1.ListOptions) (runtime.Object, error) {
return nil, errors.New("list call not expected in WatchList mode")
},
}
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true)
r := NewReflector(lw, &v1.Pod{}, store, 0)
ctx, cancel := context.WithCancel(context.Background())
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
r.RunWithContext(ctx)
}()
// wait for the RV to sync to the version returned by the final list
err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (done bool, err error) {
if rv := r.LastSyncResourceVersion(); rv == lastExpectedRV {
return true, nil
}
return false, nil
})
if err != nil {
t.Fatalf("reflector never caught up with expected revision: %q, err: %v", lastExpectedRV, err)
}
if want, got := lastExpectedRV, r.LastSyncResourceVersion(); want != got {
t.Errorf("expected LastSyncResourceVersion to be %q, but got: %q", want, got)
}
if want, got := 1, int(replaceInvoked.Load()); want != got {
t.Errorf("expected replace to be invoked %d times, but got: %d", want, got)
}
cancel()
select {
case <-doneCh:
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timed out waiting for Run to return")
}
}
type fakeStore struct {
Store
beforeReplace func(list []interface{}, s string)
afterReplace func(rv string, err error)
transformer TransformFunc
}
func (f *fakeStore) Replace(list []interface{}, rv string) error {
@ -2069,6 +2178,10 @@ func (f *fakeStore) Replace(list []interface{}, rv string) error {
return err
}
func (f *fakeStore) Transformer() TransformFunc {
return f.transformer
}
func BenchmarkExtractList(b *testing.B) {
_, _, podList := getPodListItems(0, fakeItemsNum)
_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)

37
tools/cache/store.go vendored
View File

@ -161,6 +161,8 @@ type cache struct {
// keyFunc is used to make the key for objects stored in and retrieved from items, and
// should be deterministic.
keyFunc KeyFunc
// Called with every object put in the cache.
transformer TransformFunc
}
var _ Store = &cache{}
@ -171,6 +173,12 @@ func (c *cache) Add(obj interface{}) error {
if err != nil {
return KeyError{obj, err}
}
if c.transformer != nil {
obj, err = c.transformer(obj)
if err != nil {
return fmt.Errorf("transforming: %w", err)
}
}
c.cacheStorage.Add(key, obj)
return nil
}
@ -181,6 +189,12 @@ func (c *cache) Update(obj interface{}) error {
if err != nil {
return KeyError{obj, err}
}
if c.transformer != nil {
obj, err = c.transformer(obj)
if err != nil {
return fmt.Errorf("transforming: %w", err)
}
}
c.cacheStorage.Update(key, obj)
return nil
}
@ -267,6 +281,13 @@ func (c *cache) Replace(list []interface{}, resourceVersion string) error {
if err != nil {
return KeyError{item, err}
}
if c.transformer != nil {
item, err = c.transformer(item)
if err != nil {
return fmt.Errorf("transforming: %w", err)
}
}
items[key] = item
}
c.cacheStorage.Replace(items, resourceVersion)
@ -278,12 +299,24 @@ func (c *cache) Resync() error {
return nil
}
type StoreOption = func(*cache)
func WithTransformer(transformer TransformFunc) StoreOption {
return func(c *cache) {
c.transformer = transformer
}
}
// NewStore returns a Store implemented simply with a map and a lock.
func NewStore(keyFunc KeyFunc) Store {
return &cache{
func NewStore(keyFunc KeyFunc, opts ...StoreOption) Store {
c := &cache{
cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
keyFunc: keyFunc,
}
for _, opt := range opts {
opt(c)
}
return c
}
// NewIndexer returns an Indexer implemented simply with a map and a lock.

View File

@ -18,6 +18,7 @@ package cache
import (
"errors"
"sync/atomic"
"testing"
"k8s.io/apimachinery/pkg/util/sets"
@ -143,6 +144,21 @@ func TestCache(t *testing.T) {
doTestStore(t, NewStore(testStoreKeyFunc))
}
func TestCacheWithTransformer(t *testing.T) {
transformerCalled := &atomic.Bool{}
doTestStore(t, NewStore(testStoreKeyFunc, WithTransformer(func(i interface{}) (interface{}, error) {
transformerCalled.Store(true)
obj, ok := i.(testStoreObject)
if !ok {
return nil, errors.New("wrong object type")
}
return obj, nil
})))
if !transformerCalled.Load() {
t.Error("informer was not called")
}
}
func TestFIFOCache(t *testing.T) {
doTestStore(t, NewFIFO(testStoreKeyFunc))
}

View File

@ -18,11 +18,12 @@ package cache
import (
"fmt"
"sync"
"time"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
utiltrace "k8s.io/utils/trace"
"sync"
"time"
)
// RealFIFO is a Queue in which every notification from the Reflector is passed
@ -389,6 +390,11 @@ func (f *RealFIFO) Resync() error {
return nil
}
// Transformer implements the TransformingStore interface.
func (f *RealFIFO) Transformer() TransformFunc {
return f.transformer
}
// NewRealFIFO returns a Store which can be used to queue up items to
// process.
func NewRealFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter, transformer TransformFunc) *RealFIFO {