From 785e19661f801a65c1ee5203b713e221b7436006 Mon Sep 17 00:00:00 2001 From: John Howard Date: Fri, 31 Mar 2023 15:57:18 -0700 Subject: [PATCH] client-go: allow adding indexes after informer starts Kubernetes-commit: d96a9858d396d7f418d24ea47bdc92ef8429f707 --- tools/cache/index.go | 3 +- tools/cache/shared_informer.go | 4 +- tools/cache/shared_informer_test.go | 78 ++++++++++++++++++++ tools/cache/thread_safe_store.go | 108 +++++++++++++++++----------- 4 files changed, 146 insertions(+), 47 deletions(-) diff --git a/tools/cache/index.go b/tools/cache/index.go index b78d3086..c5819fb6 100644 --- a/tools/cache/index.go +++ b/tools/cache/index.go @@ -50,8 +50,7 @@ type Indexer interface { // GetIndexers return the indexers GetIndexers() Indexers - // AddIndexers adds more indexers to this store. If you call this after you already have data - // in the store, the results are undefined. + // AddIndexers adds more indexers to this store. This supports adding indexes after the store already has items. AddIndexers(newIndexers Indexers) error } diff --git a/tools/cache/shared_informer.go b/tools/cache/shared_informer.go index b3f37431..a06df6e6 100644 --- a/tools/cache/shared_informer.go +++ b/tools/cache/shared_informer.go @@ -540,8 +540,8 @@ func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error { s.startedLock.Lock() defer s.startedLock.Unlock() - if s.started { - return fmt.Errorf("informer has already started") + if s.stopped { + return fmt.Errorf("indexer was not added because it has stopped already") } return s.indexer.AddIndexers(indexers) diff --git a/tools/cache/shared_informer_test.go b/tools/cache/shared_informer_test.go index 459f257f..1e4ed9d5 100644 --- a/tools/cache/shared_informer_test.go +++ b/tools/cache/shared_informer_test.go @@ -26,6 +26,9 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -117,6 +120,81 @@ func isRegistered(i SharedInformer, h ResourceEventHandlerRegistration) bool { return s.processor.getListener(h) != nil } +func TestIndexer(t *testing.T) { + assert := assert.New(t) + // source simulates an apiserver object endpoint. + source := fcache.NewFakeControllerSource() + pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Labels: map[string]string{"a": "a-val", "b": "b-val1"}}} + pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Labels: map[string]string{"b": "b-val2"}}} + pod3 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3", Labels: map[string]string{"a": "a-val2"}}} + source.Add(pod1) + source.Add(pod2) + + // create the shared informer and resync every 1s + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) + err := informer.AddIndexers(map[string]IndexFunc{ + "labels": func(obj interface{}) ([]string, error) { + res := []string{} + for k := range obj.(*v1.Pod).Labels { + res = append(res, k) + } + return res, nil + }, + }) + if err != nil { + t.Fatal(err) + } + stop := make(chan struct{}) + defer close(stop) + + go informer.Run(stop) + WaitForCacheSync(stop, informer.HasSynced) + + cmpOps := cmpopts.SortSlices(func(a, b any) bool { + return a.(*v1.Pod).Name < b.(*v1.Pod).Name + }) + + // We should be able to lookup by index + res, err := informer.GetIndexer().ByIndex("labels", "a") + assert.NoError(err) + if diff := cmp.Diff([]any{pod1}, res); diff != "" { + t.Fatal(diff) + } + + // Adding an item later is fine as well + source.Add(pod3) + // Event is async, need to poll + assert.Eventually(func() bool { + res, _ := informer.GetIndexer().ByIndex("labels", "a") + return cmp.Diff([]any{pod1, pod3}, res, cmpOps) == "" + }, time.Second*3, time.Millisecond) + + // Adding an index later is also fine + err = informer.AddIndexers(map[string]IndexFunc{ + "labels-again": func(obj interface{}) ([]string, error) { + res := []string{} + for k := range obj.(*v1.Pod).Labels { + res = append(res, k) + } + return res, nil + }, + }) + assert.NoError(err) + + // Should be immediately available + res, err = informer.GetIndexer().ByIndex("labels-again", "a") + assert.NoError(err) + if diff := cmp.Diff([]any{pod1, pod3}, res, cmpOps); diff != "" { + t.Fatal(diff) + } + if got := informer.GetIndexer().ListIndexFuncValues("labels"); !sets.New(got...).Equal(sets.New("a", "b")) { + t.Fatalf("got %v", got) + } + if got := informer.GetIndexer().ListIndexFuncValues("labels-again"); !sets.New(got...).Equal(sets.New("a", "b")) { + t.Fatalf("got %v", got) + } +} + func TestListenerResyncPeriods(t *testing.T) { // source simulates an apiserver object endpoint. source := fcache.NewFakeControllerSource() diff --git a/tools/cache/thread_safe_store.go b/tools/cache/thread_safe_store.go index 145e93ee..7a4df0e1 100644 --- a/tools/cache/thread_safe_store.go +++ b/tools/cache/thread_safe_store.go @@ -52,8 +52,7 @@ type ThreadSafeStore interface { ByIndex(indexName, indexedValue string) ([]interface{}, error) GetIndexers() Indexers - // AddIndexers adds more indexers to this store. If you call this after you already have data - // in the store, the results are undefined. + // AddIndexers adds more indexers to this store. This supports adding indexes after the store already has items. AddIndexers(newIndexers Indexers) error // Resync is a no-op and is deprecated Resync() error @@ -135,50 +134,66 @@ func (i *storeIndex) addIndexers(newIndexers Indexers) error { return nil } +// updateSingleIndex modifies the objects location in the named index: +// - for create you must provide only the newObj +// - for update you must provide both the oldObj and the newObj +// - for delete you must provide only the oldObj +// updateSingleIndex must be called from a function that already has a lock on the cache +func (i *storeIndex) updateSingleIndex(name string, oldObj interface{}, newObj interface{}, key string) { + var oldIndexValues, indexValues []string + indexFunc, ok := i.indexers[name] + if !ok { + // Should never happen. Caller is responsible for ensuring this exists, and should call with lock + // held to avoid any races. + panic(fmt.Errorf("indexer %q does not exist", name)) + } + if oldObj != nil { + var err error + oldIndexValues, err = indexFunc(oldObj) + if err != nil { + panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) + } + } else { + oldIndexValues = oldIndexValues[:0] + } + + if newObj != nil { + var err error + indexValues, err = indexFunc(newObj) + if err != nil { + panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) + } + } else { + indexValues = indexValues[:0] + } + + index := i.indices[name] + if index == nil { + index = Index{} + i.indices[name] = index + } + + if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] { + // We optimize for the most common case where indexFunc returns a single value which has not been changed + return + } + + for _, value := range oldIndexValues { + i.deleteKeyFromIndex(key, value, index) + } + for _, value := range indexValues { + i.addKeyToIndex(key, value, index) + } +} + // updateIndices modifies the objects location in the managed indexes: // - for create you must provide only the newObj // - for update you must provide both the oldObj and the newObj // - for delete you must provide only the oldObj // updateIndices must be called from a function that already has a lock on the cache func (i *storeIndex) updateIndices(oldObj interface{}, newObj interface{}, key string) { - var oldIndexValues, indexValues []string - var err error - for name, indexFunc := range i.indexers { - if oldObj != nil { - oldIndexValues, err = indexFunc(oldObj) - } else { - oldIndexValues = oldIndexValues[:0] - } - if err != nil { - panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) - } - - if newObj != nil { - indexValues, err = indexFunc(newObj) - } else { - indexValues = indexValues[:0] - } - if err != nil { - panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) - } - - index := i.indices[name] - if index == nil { - index = Index{} - i.indices[name] = index - } - - if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] { - // We optimize for the most common case where indexFunc returns a single value which has not been changed - continue - } - - for _, value := range oldIndexValues { - i.deleteKeyFromIndex(key, value, index) - } - for _, value := range indexValues { - i.addKeyToIndex(key, value, index) - } + for name := range i.indexers { + i.updateSingleIndex(name, oldObj, newObj, key) } } @@ -339,11 +354,18 @@ func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error { c.lock.Lock() defer c.lock.Unlock() - if len(c.items) > 0 { - return fmt.Errorf("cannot add indexers to running index") + if err := c.index.addIndexers(newIndexers); err != nil { + return err } - return c.index.addIndexers(newIndexers) + // If there are already items, index them + for key, item := range c.items { + for name := range newIndexers { + c.index.updateSingleIndex(name, nil, item, key) + } + } + + return nil } func (c *threadSafeMap) Resync() error {