mirror of
https://github.com/kubernetes/client-go.git
synced 2025-06-25 22:51:40 +00:00
Merge pull request #105234 from wojtek-t/optimize_indexer
Optimize indexer Kubernetes-commit: 3b2b23cee707f3407b997db412547f692db59870
This commit is contained in:
commit
f1b4ce1ec0
91
tools/cache/thread_safe_store.go
vendored
91
tools/cache/thread_safe_store.go
vendored
@ -90,7 +90,7 @@ func (c *threadSafeMap) Delete(key string) {
|
|||||||
c.lock.Lock()
|
c.lock.Lock()
|
||||||
defer c.lock.Unlock()
|
defer c.lock.Unlock()
|
||||||
if obj, exists := c.items[key]; exists {
|
if obj, exists := c.items[key]; exists {
|
||||||
c.deleteFromIndices(obj, key)
|
c.updateIndices(obj, nil, key)
|
||||||
delete(c.items, key)
|
delete(c.items, key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -251,61 +251,76 @@ func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
|
// updateIndices modifies the objects location in the managed indexes:
|
||||||
|
// - for create you must provide only the newObj
|
||||||
|
// - for update you must provide both the oldObj and the newObj
|
||||||
|
// - for delete you must provide only the oldObj
|
||||||
// updateIndices must be called from a function that already has a lock on the cache
|
// updateIndices must be called from a function that already has a lock on the cache
|
||||||
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
|
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
|
||||||
// if we got an old object, we need to remove it before we add it again
|
var oldIndexValues, indexValues []string
|
||||||
if oldObj != nil {
|
var err error
|
||||||
c.deleteFromIndices(oldObj, key)
|
|
||||||
}
|
|
||||||
for name, indexFunc := range c.indexers {
|
for name, indexFunc := range c.indexers {
|
||||||
indexValues, err := indexFunc(newObj)
|
if oldObj != nil {
|
||||||
|
oldIndexValues, err = indexFunc(oldObj)
|
||||||
|
} else {
|
||||||
|
oldIndexValues = oldIndexValues[:0]
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
|
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if newObj != nil {
|
||||||
|
indexValues, err = indexFunc(newObj)
|
||||||
|
} else {
|
||||||
|
indexValues = indexValues[:0]
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
|
||||||
|
}
|
||||||
|
|
||||||
index := c.indices[name]
|
index := c.indices[name]
|
||||||
if index == nil {
|
if index == nil {
|
||||||
index = Index{}
|
index = Index{}
|
||||||
c.indices[name] = index
|
c.indices[name] = index
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, indexValue := range indexValues {
|
for _, value := range oldIndexValues {
|
||||||
set := index[indexValue]
|
// We optimize for the most common case where index returns a single value.
|
||||||
if set == nil {
|
if len(indexValues) == 1 && value == indexValues[0] {
|
||||||
set = sets.String{}
|
continue
|
||||||
index[indexValue] = set
|
|
||||||
}
|
}
|
||||||
set.Insert(key)
|
c.deleteKeyFromIndex(key, value, index)
|
||||||
|
}
|
||||||
|
for _, value := range indexValues {
|
||||||
|
// We optimize for the most common case where index returns a single value.
|
||||||
|
if len(oldIndexValues) == 1 && value == oldIndexValues[0] {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
c.addKeyToIndex(key, value, index)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// deleteFromIndices removes the object from each of the managed indexes
|
func (c *threadSafeMap) addKeyToIndex(key, indexValue string, index Index) {
|
||||||
// it is intended to be called from a function that already has a lock on the cache
|
set := index[indexValue]
|
||||||
func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {
|
if set == nil {
|
||||||
for name, indexFunc := range c.indexers {
|
set = sets.String{}
|
||||||
indexValues, err := indexFunc(obj)
|
index[indexValue] = set
|
||||||
if err != nil {
|
}
|
||||||
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
|
set.Insert(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
index := c.indices[name]
|
func (c *threadSafeMap) deleteKeyFromIndex(key, indexValue string, index Index) {
|
||||||
if index == nil {
|
set := index[indexValue]
|
||||||
continue
|
if set == nil {
|
||||||
}
|
return
|
||||||
for _, indexValue := range indexValues {
|
}
|
||||||
set := index[indexValue]
|
set.Delete(key)
|
||||||
if set != nil {
|
// If we don't delete the set when zero, indices with high cardinality
|
||||||
set.Delete(key)
|
// short lived resources can cause memory to increase over time from
|
||||||
|
// unused empty sets. See `kubernetes/kubernetes/issues/84959`.
|
||||||
// If we don't delete the set when zero, indices with high cardinality
|
if len(set) == 0 {
|
||||||
// short lived resources can cause memory to increase over time from
|
delete(index, indexValue)
|
||||||
// unused empty sets. See `kubernetes/kubernetes/issues/84959`.
|
|
||||||
if len(set) == 0 {
|
|
||||||
delete(index, indexValue)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
30
tools/cache/thread_safe_store_test.go
vendored
30
tools/cache/thread_safe_store_test.go
vendored
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package cache
|
package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -90,3 +91,32 @@ func TestThreadSafeStoreAddKeepsNonEmptySetPostDeleteFromIndex(t *testing.T) {
|
|||||||
t.Errorf("Index backing string set has incorrect length, expect 1. Set length: %d", len(set))
|
t.Errorf("Index backing string set has incorrect length, expect 1. Set length: %d", len(set))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func BenchmarkIndexer(b *testing.B) {
|
||||||
|
testIndexer := "testIndexer"
|
||||||
|
|
||||||
|
indexers := Indexers{
|
||||||
|
testIndexer: func(obj interface{}) (strings []string, e error) {
|
||||||
|
indexes := []string{obj.(string)}
|
||||||
|
return indexes, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
indices := Indices{}
|
||||||
|
store := NewThreadSafeStore(indexers, indices).(*threadSafeMap)
|
||||||
|
|
||||||
|
// The following benchmark imitates what is happening in indexes
|
||||||
|
// used in storage layer, where indexing is mostly static (e.g.
|
||||||
|
// indexing objects by their (namespace, name)).
|
||||||
|
// The 5000 number imitates indexing nodes in 5000-node cluster.
|
||||||
|
objectCount := 5000
|
||||||
|
objects := make([]string, 0, 5000)
|
||||||
|
for i := 0; i < objectCount; i++ {
|
||||||
|
objects = append(objects, fmt.Sprintf("object-number-%d", i))
|
||||||
|
}
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
store.Update(objects[i%objectCount], objects[i%objectCount])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user