client-go: allow adding indexes after informer starts

Kubernetes-commit: d96a9858d396d7f418d24ea47bdc92ef8429f707
This commit is contained in:
John Howard
2023-03-31 15:57:18 -07:00
committed by Kubernetes Publisher
parent 84a6fe7e40
commit 785e19661f
4 changed files with 146 additions and 47 deletions

View File

@@ -26,6 +26,9 @@ import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -117,6 +120,81 @@ func isRegistered(i SharedInformer, h ResourceEventHandlerRegistration) bool {
return s.processor.getListener(h) != nil
}
func TestIndexer(t *testing.T) {
assert := assert.New(t)
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()
pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Labels: map[string]string{"a": "a-val", "b": "b-val1"}}}
pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Labels: map[string]string{"b": "b-val2"}}}
pod3 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3", Labels: map[string]string{"a": "a-val2"}}}
source.Add(pod1)
source.Add(pod2)
// create the shared informer and resync every 1s
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
err := informer.AddIndexers(map[string]IndexFunc{
"labels": func(obj interface{}) ([]string, error) {
res := []string{}
for k := range obj.(*v1.Pod).Labels {
res = append(res, k)
}
return res, nil
},
})
if err != nil {
t.Fatal(err)
}
stop := make(chan struct{})
defer close(stop)
go informer.Run(stop)
WaitForCacheSync(stop, informer.HasSynced)
cmpOps := cmpopts.SortSlices(func(a, b any) bool {
return a.(*v1.Pod).Name < b.(*v1.Pod).Name
})
// We should be able to lookup by index
res, err := informer.GetIndexer().ByIndex("labels", "a")
assert.NoError(err)
if diff := cmp.Diff([]any{pod1}, res); diff != "" {
t.Fatal(diff)
}
// Adding an item later is fine as well
source.Add(pod3)
// Event is async, need to poll
assert.Eventually(func() bool {
res, _ := informer.GetIndexer().ByIndex("labels", "a")
return cmp.Diff([]any{pod1, pod3}, res, cmpOps) == ""
}, time.Second*3, time.Millisecond)
// Adding an index later is also fine
err = informer.AddIndexers(map[string]IndexFunc{
"labels-again": func(obj interface{}) ([]string, error) {
res := []string{}
for k := range obj.(*v1.Pod).Labels {
res = append(res, k)
}
return res, nil
},
})
assert.NoError(err)
// Should be immediately available
res, err = informer.GetIndexer().ByIndex("labels-again", "a")
assert.NoError(err)
if diff := cmp.Diff([]any{pod1, pod3}, res, cmpOps); diff != "" {
t.Fatal(diff)
}
if got := informer.GetIndexer().ListIndexFuncValues("labels"); !sets.New(got...).Equal(sets.New("a", "b")) {
t.Fatalf("got %v", got)
}
if got := informer.GetIndexer().ListIndexFuncValues("labels-again"); !sets.New(got...).Equal(sets.New("a", "b")) {
t.Fatalf("got %v", got)
}
}
func TestListenerResyncPeriods(t *testing.T) {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()