mirror of
https://github.com/kubernetes/client-go.git
synced 2025-07-19 01:27:10 +00:00
[client-go #1415] Use transformer from provided store within internal stores in reflector to limit memory usage bursts
Signed-off-by: Valerian Roche <valerian.roche@datadoghq.com> Kubernetes-commit: 585ed0a5cb378e794e4775bc846d5309ca65f2c6
This commit is contained in:
parent
c2ce9a8b31
commit
71776a84bd
5
tools/cache/delta_fifo.go
vendored
5
tools/cache/delta_fifo.go
vendored
@ -303,6 +303,11 @@ func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
|
||||
return f.keyFunc(obj)
|
||||
}
|
||||
|
||||
// Transformer implements the TransformingStore interface.
|
||||
func (f *DeltaFIFO) Transformer() TransformFunc {
|
||||
return f.transformer
|
||||
}
|
||||
|
||||
// HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first,
|
||||
// or the first batch of items inserted by Replace() has been popped.
|
||||
func (f *DeltaFIFO) HasSynced() bool {
|
||||
|
14
tools/cache/reflector.go
vendored
14
tools/cache/reflector.go
vendored
@ -76,6 +76,13 @@ type ReflectorStore interface {
|
||||
Resync() error
|
||||
}
|
||||
|
||||
// TransformingStore is an optional interface that can be implemented by the provided store.
|
||||
// If implemented on the provided store reflector will use the same transformer in its internal stores.
|
||||
type TransformingStore interface {
|
||||
Store
|
||||
Transformer() TransformFunc
|
||||
}
|
||||
|
||||
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
|
||||
type Reflector struct {
|
||||
// name identifies this reflector. By default, it will be a file:line if possible.
|
||||
@ -718,6 +725,11 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) {
|
||||
return false
|
||||
}
|
||||
|
||||
storeOpts := []StoreOption{}
|
||||
if tr, ok := r.store.(TransformingStore); ok && tr.Transformer() != nil {
|
||||
storeOpts = append(storeOpts, WithTransformer(tr.Transformer()))
|
||||
}
|
||||
|
||||
initTrace := trace.New("Reflector WatchList", trace.Field{Key: "name", Value: r.name})
|
||||
defer initTrace.LogIfLong(10 * time.Second)
|
||||
for {
|
||||
@ -729,7 +741,7 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) {
|
||||
|
||||
resourceVersion = ""
|
||||
lastKnownRV := r.rewatchResourceVersion()
|
||||
temporaryStore = NewStore(DeletionHandlingMetaNamespaceKeyFunc)
|
||||
temporaryStore = NewStore(DeletionHandlingMetaNamespaceKeyFunc, storeOpts...)
|
||||
// TODO(#115478): large "list", slow clients, slow network, p&f
|
||||
// might slow down streaming and eventually fail.
|
||||
// maybe in such a case we should retry with an increased timeout?
|
||||
|
113
tools/cache/reflector_test.go
vendored
113
tools/cache/reflector_test.go
vendored
@ -2056,10 +2056,119 @@ func TestReflectorReplacesStoreOnUnsafeDelete(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestReflectorRespectStoreTransformer(t *testing.T) {
|
||||
mkPod := func(id string, rv string) *v1.Pod {
|
||||
return &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: id, ResourceVersion: rv},
|
||||
Spec: v1.PodSpec{
|
||||
Hostname: "test",
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
preExisting1 := mkPod("foo-1", "1")
|
||||
preExisting2 := mkPod("foo-2", "2")
|
||||
pod3 := mkPod("foo-3", "3")
|
||||
|
||||
lastExpectedRV := "3"
|
||||
events := []watch.Event{
|
||||
{Type: watch.Added, Object: preExisting1},
|
||||
{Type: watch.Added, Object: preExisting2},
|
||||
{Type: watch.Bookmark, Object: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
ResourceVersion: lastExpectedRV,
|
||||
Annotations: map[string]string{
|
||||
metav1.InitialEventsAnnotationKey: "true",
|
||||
},
|
||||
},
|
||||
}},
|
||||
{Type: watch.Added, Object: pod3},
|
||||
}
|
||||
|
||||
s := NewFIFO(MetaNamespaceKeyFunc)
|
||||
var replaceInvoked atomic.Int32
|
||||
store := &fakeStore{
|
||||
Store: s,
|
||||
beforeReplace: func(list []interface{}, rv string) {
|
||||
replaceInvoked.Add(1)
|
||||
// Only two pods are present at the point when Replace is called.
|
||||
if len(list) != 2 {
|
||||
t.Errorf("unexpected nb of objects: expected 2 received %d", len(list))
|
||||
}
|
||||
for _, obj := range list {
|
||||
cast := obj.(*v1.Pod)
|
||||
if cast.Spec.Hostname != "transformed" {
|
||||
t.Error("Object was not transformed prior to replacement")
|
||||
}
|
||||
}
|
||||
},
|
||||
afterReplace: func(rv string, err error) {},
|
||||
transformer: func(i interface{}) (interface{}, error) {
|
||||
cast := i.(*v1.Pod)
|
||||
cast.Spec.Hostname = "transformed"
|
||||
return cast, nil
|
||||
},
|
||||
}
|
||||
|
||||
var once sync.Once
|
||||
lw := &ListWatch{
|
||||
WatchFunc: func(metav1.ListOptions) (watch.Interface, error) {
|
||||
fw := watch.NewFake()
|
||||
go func() {
|
||||
once.Do(func() {
|
||||
for _, e := range events {
|
||||
fw.Action(e.Type, e.Object)
|
||||
}
|
||||
})
|
||||
}()
|
||||
return fw, nil
|
||||
},
|
||||
// ListFunc should never be used in WatchList mode
|
||||
ListFunc: func(metav1.ListOptions) (runtime.Object, error) {
|
||||
return nil, errors.New("list call not expected in WatchList mode")
|
||||
},
|
||||
}
|
||||
|
||||
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true)
|
||||
r := NewReflector(lw, &v1.Pod{}, store, 0)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
defer close(doneCh)
|
||||
r.RunWithContext(ctx)
|
||||
}()
|
||||
|
||||
// wait for the RV to sync to the version returned by the final list
|
||||
err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (done bool, err error) {
|
||||
if rv := r.LastSyncResourceVersion(); rv == lastExpectedRV {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("reflector never caught up with expected revision: %q, err: %v", lastExpectedRV, err)
|
||||
}
|
||||
|
||||
if want, got := lastExpectedRV, r.LastSyncResourceVersion(); want != got {
|
||||
t.Errorf("expected LastSyncResourceVersion to be %q, but got: %q", want, got)
|
||||
}
|
||||
if want, got := 1, int(replaceInvoked.Load()); want != got {
|
||||
t.Errorf("expected replace to be invoked %d times, but got: %d", want, got)
|
||||
}
|
||||
|
||||
cancel()
|
||||
select {
|
||||
case <-doneCh:
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
t.Errorf("timed out waiting for Run to return")
|
||||
}
|
||||
}
|
||||
|
||||
type fakeStore struct {
|
||||
Store
|
||||
beforeReplace func(list []interface{}, s string)
|
||||
afterReplace func(rv string, err error)
|
||||
transformer TransformFunc
|
||||
}
|
||||
|
||||
func (f *fakeStore) Replace(list []interface{}, rv string) error {
|
||||
@ -2069,6 +2178,10 @@ func (f *fakeStore) Replace(list []interface{}, rv string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (f *fakeStore) Transformer() TransformFunc {
|
||||
return f.transformer
|
||||
}
|
||||
|
||||
func BenchmarkExtractList(b *testing.B) {
|
||||
_, _, podList := getPodListItems(0, fakeItemsNum)
|
||||
_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
|
||||
|
37
tools/cache/store.go
vendored
37
tools/cache/store.go
vendored
@ -161,6 +161,8 @@ type cache struct {
|
||||
// keyFunc is used to make the key for objects stored in and retrieved from items, and
|
||||
// should be deterministic.
|
||||
keyFunc KeyFunc
|
||||
// Called with every object put in the cache.
|
||||
transformer TransformFunc
|
||||
}
|
||||
|
||||
var _ Store = &cache{}
|
||||
@ -171,6 +173,12 @@ func (c *cache) Add(obj interface{}) error {
|
||||
if err != nil {
|
||||
return KeyError{obj, err}
|
||||
}
|
||||
if c.transformer != nil {
|
||||
obj, err = c.transformer(obj)
|
||||
if err != nil {
|
||||
return fmt.Errorf("transforming: %w", err)
|
||||
}
|
||||
}
|
||||
c.cacheStorage.Add(key, obj)
|
||||
return nil
|
||||
}
|
||||
@ -181,6 +189,12 @@ func (c *cache) Update(obj interface{}) error {
|
||||
if err != nil {
|
||||
return KeyError{obj, err}
|
||||
}
|
||||
if c.transformer != nil {
|
||||
obj, err = c.transformer(obj)
|
||||
if err != nil {
|
||||
return fmt.Errorf("transforming: %w", err)
|
||||
}
|
||||
}
|
||||
c.cacheStorage.Update(key, obj)
|
||||
return nil
|
||||
}
|
||||
@ -267,6 +281,13 @@ func (c *cache) Replace(list []interface{}, resourceVersion string) error {
|
||||
if err != nil {
|
||||
return KeyError{item, err}
|
||||
}
|
||||
|
||||
if c.transformer != nil {
|
||||
item, err = c.transformer(item)
|
||||
if err != nil {
|
||||
return fmt.Errorf("transforming: %w", err)
|
||||
}
|
||||
}
|
||||
items[key] = item
|
||||
}
|
||||
c.cacheStorage.Replace(items, resourceVersion)
|
||||
@ -278,12 +299,24 @@ func (c *cache) Resync() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type StoreOption = func(*cache)
|
||||
|
||||
func WithTransformer(transformer TransformFunc) StoreOption {
|
||||
return func(c *cache) {
|
||||
c.transformer = transformer
|
||||
}
|
||||
}
|
||||
|
||||
// NewStore returns a Store implemented simply with a map and a lock.
|
||||
func NewStore(keyFunc KeyFunc) Store {
|
||||
return &cache{
|
||||
func NewStore(keyFunc KeyFunc, opts ...StoreOption) Store {
|
||||
c := &cache{
|
||||
cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
|
||||
keyFunc: keyFunc,
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt(c)
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
// NewIndexer returns an Indexer implemented simply with a map and a lock.
|
||||
|
16
tools/cache/store_test.go
vendored
16
tools/cache/store_test.go
vendored
@ -18,6 +18,7 @@ package cache
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
@ -143,6 +144,21 @@ func TestCache(t *testing.T) {
|
||||
doTestStore(t, NewStore(testStoreKeyFunc))
|
||||
}
|
||||
|
||||
func TestCacheWithTransformer(t *testing.T) {
|
||||
transformerCalled := &atomic.Bool{}
|
||||
doTestStore(t, NewStore(testStoreKeyFunc, WithTransformer(func(i interface{}) (interface{}, error) {
|
||||
transformerCalled.Store(true)
|
||||
obj, ok := i.(testStoreObject)
|
||||
if !ok {
|
||||
return nil, errors.New("wrong object type")
|
||||
}
|
||||
return obj, nil
|
||||
})))
|
||||
if !transformerCalled.Load() {
|
||||
t.Error("informer was not called")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFIFOCache(t *testing.T) {
|
||||
doTestStore(t, NewFIFO(testStoreKeyFunc))
|
||||
}
|
||||
|
10
tools/cache/the_real_fifo.go
vendored
10
tools/cache/the_real_fifo.go
vendored
@ -18,11 +18,12 @@ package cache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
utiltrace "k8s.io/utils/trace"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// RealFIFO is a Queue in which every notification from the Reflector is passed
|
||||
@ -389,6 +390,11 @@ func (f *RealFIFO) Resync() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Transformer implements the TransformingStore interface.
|
||||
func (f *RealFIFO) Transformer() TransformFunc {
|
||||
return f.transformer
|
||||
}
|
||||
|
||||
// NewRealFIFO returns a Store which can be used to queue up items to
|
||||
// process.
|
||||
func NewRealFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter, transformer TransformFunc) *RealFIFO {
|
||||
|
Loading…
Reference in New Issue
Block a user