Optimize index updating

Kubernetes-commit: 75273a0689250f4861ea41405e6402c6191563a8
This commit is contained in:
wojtekt 2021-09-24 15:24:01 +02:00 committed by Kubernetes Publisher
parent 463b3d12bb
commit e61d0d5d52
2 changed files with 53 additions and 41 deletions

View File

@ -90,7 +90,7 @@ func (c *threadSafeMap) Delete(key string) {
c.lock.Lock() c.lock.Lock()
defer c.lock.Unlock() defer c.lock.Unlock()
if obj, exists := c.items[key]; exists { if obj, exists := c.items[key]; exists {
c.deleteFromIndices(obj, key) c.updateIndices(obj, nil, key)
delete(c.items, key) delete(c.items, key)
} }
} }
@ -251,61 +251,76 @@ func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
return nil return nil
} }
// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj // updateIndices modifies the objects location in the managed indexes:
// - for create you must provide only the newObj
// - for update you must provide both the oldObj and the newObj
// - for delete you must provide only the oldObj
// updateIndices must be called from a function that already has a lock on the cache // updateIndices must be called from a function that already has a lock on the cache
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) { func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
// if we got an old object, we need to remove it before we add it again var oldIndexValues, indexValues []string
if oldObj != nil { var err error
c.deleteFromIndices(oldObj, key)
}
for name, indexFunc := range c.indexers { for name, indexFunc := range c.indexers {
indexValues, err := indexFunc(newObj) if oldObj != nil {
oldIndexValues, err = indexFunc(oldObj)
} else {
oldIndexValues = oldIndexValues[:0]
}
if err != nil { if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
} }
if newObj != nil {
indexValues, err = indexFunc(newObj)
} else {
indexValues = indexValues[:0]
}
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
index := c.indices[name] index := c.indices[name]
if index == nil { if index == nil {
index = Index{} index = Index{}
c.indices[name] = index c.indices[name] = index
} }
for _, indexValue := range indexValues { for _, value := range oldIndexValues {
set := index[indexValue] // We optimize for the most common case where index returns a single value.
if set == nil { if len(indexValues) == 1 && value == indexValues[0] {
set = sets.String{} continue
index[indexValue] = set
} }
set.Insert(key) c.deleteKeyFromIndex(key, value, index)
}
for _, value := range indexValues {
// We optimize for the most common case where index returns a single value.
if len(oldIndexValues) == 1 && value == oldIndexValues[0] {
continue
}
c.addKeyToIndex(key, value, index)
} }
} }
} }
// deleteFromIndices removes the object from each of the managed indexes func (c *threadSafeMap) addKeyToIndex(key, indexValue string, index Index) {
// it is intended to be called from a function that already has a lock on the cache set := index[indexValue]
func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) { if set == nil {
for name, indexFunc := range c.indexers { set = sets.String{}
indexValues, err := indexFunc(obj) index[indexValue] = set
if err != nil { }
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) set.Insert(key)
} }
index := c.indices[name] func (c *threadSafeMap) deleteKeyFromIndex(key, indexValue string, index Index) {
if index == nil { set := index[indexValue]
continue if set == nil {
} return
for _, indexValue := range indexValues { }
set := index[indexValue] set.Delete(key)
if set != nil { // If we don't delete the set when zero, indices with high cardinality
set.Delete(key) // short lived resources can cause memory to increase over time from
// unused empty sets. See `kubernetes/kubernetes/issues/84959`.
// If we don't delete the set when zero, indices with high cardinality if len(set) == 0 {
// short lived resources can cause memory to increase over time from delete(index, indexValue)
// unused empty sets. See `kubernetes/kubernetes/issues/84959`.
if len(set) == 0 {
delete(index, indexValue)
}
}
}
} }
} }

View File

@ -92,9 +92,6 @@ func TestThreadSafeStoreAddKeepsNonEmptySetPostDeleteFromIndex(t *testing.T) {
} }
} }
// BenchmarkIndexer-12 447849 2738 ns/op 242 B/op 4 allocs/op
// PASS
// ok k8s.io/kubernetes/vendor/k8s.io/client-go/tools/cache 2.451s
func BenchmarkIndexer(b *testing.B) { func BenchmarkIndexer(b *testing.B) {
testIndexer := "testIndexer" testIndexer := "testIndexer"