[client-go #1415] Embed proper interface in TransformingStore to ensure DeltaFIFO and RealFIFO are implementing it

Signed-off-by: Valerian Roche <valerian.roche@datadoghq.com>

Kubernetes-commit: 88c20d46a4e5ca8db7519c81856a358c919ae262
This commit is contained in:
Valerian Roche
2025-07-28 13:53:34 -04:00
committed by Kubernetes Publisher
parent 97256a6495
commit f466f58eea
5 changed files with 155 additions and 117 deletions

View File

@@ -270,7 +270,8 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
}
var (
_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
_ = TransformingStore(&DeltaFIFO{}) // DeltaFIFO implements TransformingStore to allow memory optimizations
)
var (

View File

@@ -28,7 +28,7 @@ import (
// from the most recent Delta.
// You should treat the items returned inside the deltas as immutable.
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
func (f *DeltaFIFO) List() []interface{} {
func (f *DeltaFIFO) list() []interface{} {
f.lock.RLock()
defer f.lock.RUnlock()
return f.listLocked()
@@ -46,7 +46,7 @@ func (f *DeltaFIFO) listLocked() []interface{} {
// ListKeys returns a list of all the keys of the objects currently
// in the FIFO.
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
func (f *DeltaFIFO) ListKeys() []string {
func (f *DeltaFIFO) listKeys() []string {
f.lock.RLock()
defer f.lock.RUnlock()
list := make([]string, 0, len(f.queue))
@@ -60,19 +60,19 @@ func (f *DeltaFIFO) ListKeys() []string {
// or sets exists=false.
// You should treat the items returned inside the deltas as immutable.
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
func (f *DeltaFIFO) get(obj interface{}) (item interface{}, exists bool, err error) {
key, err := f.KeyOf(obj)
if err != nil {
return nil, false, KeyError{obj, err}
}
return f.GetByKey(key)
return f.getByKey(key)
}
// GetByKey returns the complete list of deltas for the requested item,
// setting exists=false if that list is empty.
// You should treat the items returned inside the deltas as immutable.
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
func (f *DeltaFIFO) getByKey(key string) (item interface{}, exists bool, err error) {
f.lock.RLock()
defer f.lock.RUnlock()
d, exists := f.items[key]
@@ -320,10 +320,10 @@ func TestDeltaFIFO_addUpdate(t *testing.T) {
f.Update(mkFifoObj("foo", 12))
f.Delete(mkFifoObj("foo", 15))
if e, a := []interface{}{mkFifoObj("foo", 15)}, f.List(); !reflect.DeepEqual(e, a) {
if e, a := []interface{}{mkFifoObj("foo", 15)}, f.list(); !reflect.DeepEqual(e, a) {
t.Errorf("Expected %+v, got %+v", e, a)
}
if e, a := []string{"foo"}, f.ListKeys(); !reflect.DeepEqual(e, a) {
if e, a := []string{"foo"}, f.listKeys(); !reflect.DeepEqual(e, a) {
t.Errorf("Expected %+v, got %+v", e, a)
}
@@ -349,7 +349,7 @@ func TestDeltaFIFO_addUpdate(t *testing.T) {
t.Errorf("Got second value %v", unexpected.val)
case <-time.After(50 * time.Millisecond):
}
_, exists, _ := f.Get(mkFifoObj("foo", ""))
_, exists, _ := f.get(mkFifoObj("foo", ""))
if exists {
t.Errorf("item did not get removed")
}
@@ -397,7 +397,7 @@ func TestDeltaFIFO_transformer(t *testing.T) {
must(f.Replace([]interface{}{}, ""))
// Should be empty
if e, a := []string{"foo", "bar"}, f.ListKeys(); !reflect.DeepEqual(e, a) {
if e, a := []string{"foo", "bar"}, f.listKeys(); !reflect.DeepEqual(e, a) {
t.Errorf("Expected %+v, got %+v", e, a)
}
@@ -507,7 +507,7 @@ func TestDeltaFIFO_addReplace(t *testing.T) {
t.Errorf("Got second value %v", unexpected.val)
case <-time.After(50 * time.Millisecond):
}
_, exists, _ := f.Get(mkFifoObj("foo", ""))
_, exists, _ := f.get(mkFifoObj("foo", ""))
if exists {
t.Errorf("item did not get removed")
}
@@ -991,7 +991,7 @@ func BenchmarkDeltaFIFOListKeys(b *testing.B) {
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_ = f.ListKeys()
_ = f.listKeys()
}
})
b.StopTimer()

View File

@@ -79,7 +79,7 @@ type ReflectorStore interface {
// TransformingStore is an optional interface that can be implemented by the provided store.
// If implemented on the provided store reflector will use the same transformer in its internal stores.
type TransformingStore interface {
Store
ReflectorStore
Transformer() TransformFunc
}

View File

@@ -20,10 +20,12 @@ import (
"context"
"errors"
"fmt"
"maps"
"math/rand"
"net/http"
"reflect"
goruntime "runtime"
"slices"
"strconv"
"sync"
"sync/atomic"
@@ -1962,7 +1964,7 @@ func TestReflectorReplacesStoreOnUnsafeDelete(t *testing.T) {
s := NewFIFO(MetaNamespaceKeyFunc)
var replaceInvoked atomic.Int32
store := &fakeStore{
Store: s,
ReflectorStore: s,
beforeReplace: func(list []interface{}, rv string) {
// interested in the Replace call that happens after the Error event
if rv == lastExpectedRV {
@@ -2057,131 +2059,165 @@ func TestReflectorReplacesStoreOnUnsafeDelete(t *testing.T) {
}
func TestReflectorRespectStoreTransformer(t *testing.T) {
mkPod := func(id string, rv string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: id, ResourceVersion: rv},
Spec: v1.PodSpec{
Hostname: "test",
for name, test := range map[string]struct {
storeBuilder func(counter *atomic.Int32) ReflectorStore
items func(rs ReflectorStore) []interface{}
}{
"real-fifo": {
storeBuilder: func(counter *atomic.Int32) ReflectorStore {
return NewRealFIFO(MetaNamespaceKeyFunc, NewStore(MetaNamespaceKeyFunc), func(i interface{}) (interface{}, error) {
counter.Add(1)
cast := i.(*v1.Pod)
cast.Spec.Hostname = "transformed"
return cast, nil
})
},
}
}
preExisting1 := mkPod("foo-1", "1")
preExisting2 := mkPod("foo-2", "2")
pod3 := mkPod("foo-3", "3")
lastExpectedRV := "3"
events := []watch.Event{
{Type: watch.Added, Object: preExisting1},
{Type: watch.Added, Object: preExisting2},
{Type: watch.Bookmark, Object: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: lastExpectedRV,
Annotations: map[string]string{
metav1.InitialEventsAnnotationKey: "true",
},
items: func(rs ReflectorStore) []interface{} {
store := rs.(*RealFIFO)
objects := make(map[string]interface{})
for _, item := range store.getItems() {
key, _ := store.keyFunc(item.Object)
if item.Type == Deleted {
delete(objects, key)
} else {
objects[key] = item.Object
}
}
return slices.Collect(maps.Values(objects))
},
}},
{Type: watch.Added, Object: pod3},
}
s := NewFIFO(MetaNamespaceKeyFunc)
var replaceInvoked atomic.Int32
store := &fakeStore{
Store: s,
beforeReplace: func(list []interface{}, rv string) {
replaceInvoked.Add(1)
// Only two pods are present at the point when Replace is called.
if len(list) != 2 {
t.Errorf("unexpected nb of objects: expected 2 received %d", len(list))
},
"delta-fifo": {
storeBuilder: func(counter *atomic.Int32) ReflectorStore {
return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: MetaNamespaceKeyFunc,
Transformer: func(i interface{}) (interface{}, error) {
counter.Add(1)
cast := i.(*v1.Pod)
cast.Spec.Hostname = "transformed"
return cast, nil
},
})
},
items: func(rs ReflectorStore) []interface{} {
return rs.(*DeltaFIFO).list()
},
},
} {
t.Run(name, func(t *testing.T) {
mkPod := func(id string, rv string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: id, ResourceVersion: rv},
Spec: v1.PodSpec{
Hostname: "test",
},
}
}
for _, obj := range list {
cast := obj.(*v1.Pod)
preExisting1 := mkPod("foo-1", "1")
preExisting2 := mkPod("foo-2", "2")
pod3 := mkPod("foo-3", "3")
lastExpectedRV := "3"
events := []watch.Event{
{Type: watch.Added, Object: preExisting1},
{Type: watch.Added, Object: preExisting2},
{Type: watch.Bookmark, Object: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: lastExpectedRV,
Annotations: map[string]string{
metav1.InitialEventsAnnotationKey: "true",
},
},
}},
{Type: watch.Added, Object: pod3},
}
var transformerInvoked atomic.Int32
s := test.storeBuilder(&transformerInvoked)
var once sync.Once
lw := &ListWatch{
WatchFunc: func(metav1.ListOptions) (watch.Interface, error) {
fw := watch.NewFake()
go func() {
once.Do(func() {
for _, e := range events {
fw.Action(e.Type, e.Object)
}
})
}()
return fw, nil
},
// ListFunc should never be used in WatchList mode
ListFunc: func(metav1.ListOptions) (runtime.Object, error) {
return nil, errors.New("list call not expected in WatchList mode")
},
}
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true)
r := NewReflector(lw, &v1.Pod{}, s, 0)
ctx, cancel := context.WithCancel(context.Background())
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
r.RunWithContext(ctx)
}()
// wait for the RV to sync to the version returned by the final list
err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (done bool, err error) {
if rv := r.LastSyncResourceVersion(); rv == lastExpectedRV {
return true, nil
}
return false, nil
})
if err != nil {
t.Fatalf("reflector never caught up with expected revision: %q, err: %v", lastExpectedRV, err)
}
if want, got := lastExpectedRV, r.LastSyncResourceVersion(); want != got {
t.Errorf("expected LastSyncResourceVersion to be %q, but got: %q", want, got)
}
informerItems := test.items(s)
if want, got := 3, len(informerItems); want != got {
t.Errorf("expected informer to contain %d objects, but got: %d", want, got)
}
for _, item := range informerItems {
cast := item.(*v1.Pod)
if cast.Spec.Hostname != "transformed" {
t.Error("Object was not transformed prior to replacement")
}
}
},
afterReplace: func(rv string, err error) {},
transformer: func(i interface{}) (interface{}, error) {
cast := i.(*v1.Pod)
cast.Spec.Hostname = "transformed"
return cast, nil
},
}
var once sync.Once
lw := &ListWatch{
WatchFunc: func(metav1.ListOptions) (watch.Interface, error) {
fw := watch.NewFake()
go func() {
once.Do(func() {
for _, e := range events {
fw.Action(e.Type, e.Object)
}
})
}()
return fw, nil
},
// ListFunc should never be used in WatchList mode
ListFunc: func(metav1.ListOptions) (runtime.Object, error) {
return nil, errors.New("list call not expected in WatchList mode")
},
}
// Transformer should have been invoked twice for the initial sync in the informer on the temporary store,
// then twice on replace, then once on the following update.
if want, got := 5, int(transformerInvoked.Load()); want != got {
t.Errorf("expected transformer to be invoked %d times, but got: %d", want, got)
}
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true)
r := NewReflector(lw, &v1.Pod{}, store, 0)
ctx, cancel := context.WithCancel(context.Background())
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
r.RunWithContext(ctx)
}()
// wait for the RV to sync to the version returned by the final list
err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (done bool, err error) {
if rv := r.LastSyncResourceVersion(); rv == lastExpectedRV {
return true, nil
}
return false, nil
})
if err != nil {
t.Fatalf("reflector never caught up with expected revision: %q, err: %v", lastExpectedRV, err)
}
if want, got := lastExpectedRV, r.LastSyncResourceVersion(); want != got {
t.Errorf("expected LastSyncResourceVersion to be %q, but got: %q", want, got)
}
if want, got := 1, int(replaceInvoked.Load()); want != got {
t.Errorf("expected replace to be invoked %d times, but got: %d", want, got)
}
cancel()
select {
case <-doneCh:
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timed out waiting for Run to return")
cancel()
select {
case <-doneCh:
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timed out waiting for Run to return")
}
})
}
}
type fakeStore struct {
Store
ReflectorStore
beforeReplace func(list []interface{}, s string)
afterReplace func(rv string, err error)
transformer TransformFunc
}
func (f *fakeStore) Replace(list []interface{}, rv string) error {
f.beforeReplace(list, rv)
err := f.Store.Replace(list, rv)
err := f.ReflectorStore.Replace(list, rv)
f.afterReplace(rv, err)
return err
}
func (f *fakeStore) Transformer() TransformFunc {
return f.transformer
}
func BenchmarkExtractList(b *testing.B) {
_, _, podList := getPodListItems(0, fakeItemsNum)
_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)

View File

@@ -89,7 +89,8 @@ type RealFIFO struct {
}
var (
_ = Queue(&RealFIFO{}) // RealFIFO is a Queue
_ = Queue(&RealFIFO{}) // RealFIFO is a Queue
_ = TransformingStore(&RealFIFO{}) // RealFIFO implements TransformingStore to allow memory optimizations
)
// Close the queue.