From b69a16cf38a529196b8d9a9846bcfc3a4d2a107f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Thu, 5 May 2022 17:30:24 +0200 Subject: [PATCH] Refactor store index into its structure Kubernetes-commit: 7c94ce3076a96acab4c7e88489cd596f1aad40e0 --- tools/cache/thread_safe_store.go | 179 +++++++++++++++----------- tools/cache/thread_safe_store_test.go | 10 +- 2 files changed, 110 insertions(+), 79 deletions(-) diff --git a/tools/cache/thread_safe_store.go b/tools/cache/thread_safe_store.go index 1182ea14..686b94e2 100644 --- a/tools/cache/thread_safe_store.go +++ b/tools/cache/thread_safe_store.go @@ -59,15 +59,89 @@ type ThreadSafeStore interface { Resync() error } +// storeIndex implements the indexing functionality for Store interface +type storeIndex struct { + // indexers maps a name to an IndexFunc + indexers Indexers + // indices maps a name to an Index + indices Indices +} + +func (i *storeIndex) reset() { + i.indices = Indices{} +} + +func (i *storeIndex) getKeysFromIndex(indexName string, obj interface{}) (sets.String, error) { + indexFunc := i.indexers[indexName] + if indexFunc == nil { + return nil, fmt.Errorf("Index with name %s does not exist", indexName) + } + + indexedValues, err := indexFunc(obj) + if err != nil { + return nil, err + } + index := i.indices[indexName] + + var storeKeySet sets.String + if len(indexedValues) == 1 { + // In majority of cases, there is exactly one value matching. + // Optimize the most common path - deduping is not needed here. + storeKeySet = index[indexedValues[0]] + } else { + // Need to de-dupe the return list. + // Since multiple keys are allowed, this can happen. + storeKeySet = sets.String{} + for _, indexedValue := range indexedValues { + for key := range index[indexedValue] { + storeKeySet.Insert(key) + } + } + } + + return storeKeySet, nil +} + +func (i *storeIndex) getKeysByIndex(indexName, indexedValue string) (sets.String, error) { + indexFunc := i.indexers[indexName] + if indexFunc == nil { + return nil, fmt.Errorf("Index with name %s does not exist", indexName) + } + + index := i.indices[indexName] + return index[indexedValue], nil +} + +func (i *storeIndex) getIndexValues(indexName string) []string { + index := i.indices[indexName] + names := make([]string, 0, len(index)) + for key := range index { + names = append(names, key) + } + return names +} + +func (i *storeIndex) addIndexers(newIndexers Indexers) error { + oldKeys := sets.StringKeySet(i.indexers) + newKeys := sets.StringKeySet(newIndexers) + + if oldKeys.HasAny(newKeys.List()...) { + return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys)) + } + + for k, v := range newIndexers { + i.indexers[k] = v + } + return nil +} + // threadSafeMap implements ThreadSafeStore type threadSafeMap struct { lock sync.RWMutex items map[string]interface{} - // indexers maps a name to an IndexFunc - indexers Indexers - // indices maps a name to an Index - indices Indices + // index implements the indexing functionality + index *storeIndex } func (c *threadSafeMap) Add(key string, obj interface{}) { @@ -79,14 +153,14 @@ func (c *threadSafeMap) Update(key string, obj interface{}) { defer c.lock.Unlock() oldObject := c.items[key] c.items[key] = obj - c.updateIndices(oldObject, obj, key) + c.index.updateIndices(oldObject, obj, key) } func (c *threadSafeMap) Delete(key string) { c.lock.Lock() defer c.lock.Unlock() if obj, exists := c.items[key]; exists { - c.updateIndices(obj, nil, key) + c.index.updateIndices(obj, nil, key) delete(c.items, key) } } @@ -126,9 +200,9 @@ func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion st c.items = items // rebuild any index - c.indices = Indices{} + c.index.reset() for key, item := range c.items { - c.updateIndices(nil, item, key) + c.index.updateIndices(nil, item, key) } } @@ -138,32 +212,10 @@ func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, c.lock.RLock() defer c.lock.RUnlock() - indexFunc := c.indexers[indexName] - if indexFunc == nil { - return nil, fmt.Errorf("Index with name %s does not exist", indexName) - } - - indexedValues, err := indexFunc(obj) + storeKeySet, err := c.index.getKeysFromIndex(indexName, obj) if err != nil { return nil, err } - index := c.indices[indexName] - - var storeKeySet sets.String - if len(indexedValues) == 1 { - // In majority of cases, there is exactly one value matching. - // Optimize the most common path - deduping is not needed here. - storeKeySet = index[indexedValues[0]] - } else { - // Need to de-dupe the return list. - // Since multiple keys are allowed, this can happen. - storeKeySet = sets.String{} - for _, indexedValue := range indexedValues { - for key := range index[indexedValue] { - storeKeySet.Insert(key) - } - } - } list := make([]interface{}, 0, storeKeySet.Len()) for storeKey := range storeKeySet { @@ -177,14 +229,10 @@ func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, c.lock.RLock() defer c.lock.RUnlock() - indexFunc := c.indexers[indexName] - if indexFunc == nil { - return nil, fmt.Errorf("Index with name %s does not exist", indexName) + set, err := c.index.getKeysByIndex(indexName, indexedValue) + if err != nil { + return nil, err } - - index := c.indices[indexName] - - set := index[indexedValue] list := make([]interface{}, 0, set.Len()) for key := range set { list = append(list, c.items[key]) @@ -199,14 +247,10 @@ func (c *threadSafeMap) IndexKeys(indexName, indexedValue string) ([]string, err c.lock.RLock() defer c.lock.RUnlock() - indexFunc := c.indexers[indexName] - if indexFunc == nil { - return nil, fmt.Errorf("Index with name %s does not exist", indexName) + set, err := c.index.getKeysByIndex(indexName, indexedValue) + if err != nil { + return nil, err } - - index := c.indices[indexName] - - set := index[indexedValue] return set.List(), nil } @@ -214,16 +258,11 @@ func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string { c.lock.RLock() defer c.lock.RUnlock() - index := c.indices[indexName] - names := make([]string, 0, len(index)) - for key := range index { - names = append(names, key) - } - return names + return c.index.getIndexValues(indexName) } func (c *threadSafeMap) GetIndexers() Indexers { - return c.indexers + return c.index.indexers } func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error { @@ -234,17 +273,7 @@ func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error { return fmt.Errorf("cannot add indexers to running index") } - oldKeys := sets.StringKeySet(c.indexers) - newKeys := sets.StringKeySet(newIndexers) - - if oldKeys.HasAny(newKeys.List()...) { - return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys)) - } - - for k, v := range newIndexers { - c.indexers[k] = v - } - return nil + return c.index.addIndexers(newIndexers) } // updateIndices modifies the objects location in the managed indexes: @@ -252,10 +281,10 @@ func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error { // - for update you must provide both the oldObj and the newObj // - for delete you must provide only the oldObj // updateIndices must be called from a function that already has a lock on the cache -func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) { +func (i *storeIndex) updateIndices(oldObj interface{}, newObj interface{}, key string) { var oldIndexValues, indexValues []string var err error - for name, indexFunc := range c.indexers { + for name, indexFunc := range i.indexers { if oldObj != nil { oldIndexValues, err = indexFunc(oldObj) } else { @@ -274,10 +303,10 @@ func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, ke panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) } - index := c.indices[name] + index := i.indices[name] if index == nil { index = Index{} - c.indices[name] = index + i.indices[name] = index } if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] { @@ -286,15 +315,15 @@ func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, ke } for _, value := range oldIndexValues { - c.deleteKeyFromIndex(key, value, index) + i.deleteKeyFromIndex(key, value, index) } for _, value := range indexValues { - c.addKeyToIndex(key, value, index) + i.addKeyToIndex(key, value, index) } } } -func (c *threadSafeMap) addKeyToIndex(key, indexValue string, index Index) { +func (i *storeIndex) addKeyToIndex(key, indexValue string, index Index) { set := index[indexValue] if set == nil { set = sets.String{} @@ -303,7 +332,7 @@ func (c *threadSafeMap) addKeyToIndex(key, indexValue string, index Index) { set.Insert(key) } -func (c *threadSafeMap) deleteKeyFromIndex(key, indexValue string, index Index) { +func (i *storeIndex) deleteKeyFromIndex(key, indexValue string, index Index) { set := index[indexValue] if set == nil { return @@ -325,8 +354,10 @@ func (c *threadSafeMap) Resync() error { // NewThreadSafeStore creates a new instance of ThreadSafeStore. func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore { return &threadSafeMap{ - items: map[string]interface{}{}, - indexers: indexers, - indices: indices, + items: map[string]interface{}{}, + index: &storeIndex{ + indexers: indexers, + indices: indices, + }, } } diff --git a/tools/cache/thread_safe_store_test.go b/tools/cache/thread_safe_store_test.go index c4cbd4d0..53f60208 100644 --- a/tools/cache/thread_safe_store_test.go +++ b/tools/cache/thread_safe_store_test.go @@ -43,7 +43,7 @@ func TestThreadSafeStoreDeleteRemovesEmptySetsFromIndex(t *testing.T) { store.Add(testKey, testKey) // Assumption check, there should be a set for the `testKey` with one element in the added index - set := store.indices[testIndexer][testKey] + set := store.index.indices[testIndexer][testKey] if len(set) != 1 { t.Errorf("Initial assumption of index backing string set having 1 element failed. Actual elements: %d", len(set)) @@ -51,7 +51,7 @@ func TestThreadSafeStoreDeleteRemovesEmptySetsFromIndex(t *testing.T) { } store.Delete(testKey) - set, present := store.indices[testIndexer][testKey] + set, present := store.index.indices[testIndexer][testKey] if present { t.Errorf("Index backing string set not deleted from index. Set length: %d", len(set)) @@ -76,7 +76,7 @@ func TestThreadSafeStoreAddKeepsNonEmptySetPostDeleteFromIndex(t *testing.T) { store.Add("delete", "delete") // Assumption check, there should be a set for the `testIndex` with two elements - set := store.indices[testIndexer][testIndex] + set := store.index.indices[testIndexer][testIndex] if len(set) != 2 { t.Errorf("Initial assumption of index backing string set having 2 elements failed. Actual elements: %d", len(set)) @@ -84,7 +84,7 @@ func TestThreadSafeStoreAddKeepsNonEmptySetPostDeleteFromIndex(t *testing.T) { } store.Delete("delete") - set, present := store.indices[testIndexer][testIndex] + set, present := store.index.indices[testIndexer][testIndex] if !present { t.Errorf("Index backing string set erroneously deleted from index.") @@ -114,7 +114,7 @@ func TestThreadSafeStoreIndexingFunctionsWithMultipleValues(t *testing.T) { assert := assert.New(t) compare := func(key string, expected []string) error { - values := store.indices[testIndexer][key].List() + values := store.index.indices[testIndexer][key].List() if cmp.Equal(values, expected) { return nil }