diff --git a/tools/cache/thread_safe_store.go b/tools/cache/thread_safe_store.go index 56251179..ea34e903 100644 --- a/tools/cache/thread_safe_store.go +++ b/tools/cache/thread_safe_store.go @@ -90,7 +90,7 @@ func (c *threadSafeMap) Delete(key string) { c.lock.Lock() defer c.lock.Unlock() if obj, exists := c.items[key]; exists { - c.deleteFromIndices(obj, key) + c.updateIndices(obj, nil, key) delete(c.items, key) } } @@ -251,61 +251,76 @@ func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error { return nil } -// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj +// updateIndices modifies the objects location in the managed indexes: +// - for create you must provide only the newObj +// - for update you must provide both the oldObj and the newObj +// - for delete you must provide only the oldObj // updateIndices must be called from a function that already has a lock on the cache func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) { - // if we got an old object, we need to remove it before we add it again - if oldObj != nil { - c.deleteFromIndices(oldObj, key) - } + var oldIndexValues, indexValues []string + var err error for name, indexFunc := range c.indexers { - indexValues, err := indexFunc(newObj) + if oldObj != nil { + oldIndexValues, err = indexFunc(oldObj) + } else { + oldIndexValues = oldIndexValues[:0] + } if err != nil { panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) } + + if newObj != nil { + indexValues, err = indexFunc(newObj) + } else { + indexValues = indexValues[:0] + } + if err != nil { + panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) + } + index := c.indices[name] if index == nil { index = Index{} c.indices[name] = index } - for _, indexValue := range indexValues { - set := index[indexValue] - if set == nil { - set = sets.String{} - index[indexValue] = set + for _, value := range oldIndexValues { + // We optimize for the most common case where index returns a single value. + if len(indexValues) == 1 && value == indexValues[0] { + continue } - set.Insert(key) + c.deleteKeyFromIndex(key, value, index) + } + for _, value := range indexValues { + // We optimize for the most common case where index returns a single value. + if len(oldIndexValues) == 1 && value == oldIndexValues[0] { + continue + } + c.addKeyToIndex(key, value, index) } } } -// deleteFromIndices removes the object from each of the managed indexes -// it is intended to be called from a function that already has a lock on the cache -func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) { - for name, indexFunc := range c.indexers { - indexValues, err := indexFunc(obj) - if err != nil { - panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) - } +func (c *threadSafeMap) addKeyToIndex(key, indexValue string, index Index) { + set := index[indexValue] + if set == nil { + set = sets.String{} + index[indexValue] = set + } + set.Insert(key) +} - index := c.indices[name] - if index == nil { - continue - } - for _, indexValue := range indexValues { - set := index[indexValue] - if set != nil { - set.Delete(key) - - // If we don't delete the set when zero, indices with high cardinality - // short lived resources can cause memory to increase over time from - // unused empty sets. See `kubernetes/kubernetes/issues/84959`. - if len(set) == 0 { - delete(index, indexValue) - } - } - } +func (c *threadSafeMap) deleteKeyFromIndex(key, indexValue string, index Index) { + set := index[indexValue] + if set == nil { + return + } + set.Delete(key) + // If we don't delete the set when zero, indices with high cardinality + // short lived resources can cause memory to increase over time from + // unused empty sets. See `kubernetes/kubernetes/issues/84959`. + if len(set) == 0 { + delete(index, indexValue) } } diff --git a/tools/cache/thread_safe_store_test.go b/tools/cache/thread_safe_store_test.go index 267ebcbb..591ff372 100644 --- a/tools/cache/thread_safe_store_test.go +++ b/tools/cache/thread_safe_store_test.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "fmt" "testing" ) @@ -90,3 +91,32 @@ func TestThreadSafeStoreAddKeepsNonEmptySetPostDeleteFromIndex(t *testing.T) { t.Errorf("Index backing string set has incorrect length, expect 1. Set length: %d", len(set)) } } + +func BenchmarkIndexer(b *testing.B) { + testIndexer := "testIndexer" + + indexers := Indexers{ + testIndexer: func(obj interface{}) (strings []string, e error) { + indexes := []string{obj.(string)} + return indexes, nil + }, + } + + indices := Indices{} + store := NewThreadSafeStore(indexers, indices).(*threadSafeMap) + + // The following benchmark imitates what is happening in indexes + // used in storage layer, where indexing is mostly static (e.g. + // indexing objects by their (namespace, name)). + // The 5000 number imitates indexing nodes in 5000-node cluster. + objectCount := 5000 + objects := make([]string, 0, 5000) + for i := 0; i < objectCount; i++ { + objects = append(objects, fmt.Sprintf("object-number-%d", i)) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + store.Update(objects[i%objectCount], objects[i%objectCount]) + } +}