Merge pull request #113387 from wojtek-t/refactor_client_indexing

Refactor store index into its structure

Kubernetes-commit: ee0b179a7b9867203a9cc73f5c90909318a240e3
This commit is contained in:
Kubernetes Publisher 2022-11-02 23:45:42 -07:00
commit d576a3570d
4 changed files with 176 additions and 145 deletions

4
go.mod
View File

@ -25,7 +25,7 @@ require (
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8
google.golang.org/protobuf v1.28.1
k8s.io/api v0.0.0-20221103075246-5448eb39ae20
k8s.io/apimachinery v0.0.0-20221028155017-b03a432a2a6d
k8s.io/apimachinery v0.0.0-20221103075033-9e85d3af4ae2
k8s.io/klog/v2 v2.80.1
k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280
k8s.io/utils v0.0.0-20220922133306-665eaaec4324
@ -60,5 +60,5 @@ require (
replace (
k8s.io/api => k8s.io/api v0.0.0-20221103075246-5448eb39ae20
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20221028155017-b03a432a2a6d
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20221103075033-9e85d3af4ae2
)

4
go.sum
View File

@ -478,8 +478,8 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.0.0-20221103075246-5448eb39ae20 h1:L4yzu0YYenWmYNm1WG/jJy/6ioJxAlgDewudcjz0uUE=
k8s.io/api v0.0.0-20221103075246-5448eb39ae20/go.mod h1:G/3aN5AoXC2S2FQkBj1R024v+pPQBOzaaOCf3qJdj+A=
k8s.io/apimachinery v0.0.0-20221028155017-b03a432a2a6d h1:fg/DbLqFKxFESf3AnU5iwCexZWOUnFFLb0JraG20wZo=
k8s.io/apimachinery v0.0.0-20221028155017-b03a432a2a6d/go.mod h1:zSkBXgO5G/dSQOe256tx5Yo2OJytojpY3bsXu/4/ZJE=
k8s.io/apimachinery v0.0.0-20221103075033-9e85d3af4ae2 h1:0nhI6fiyouN4H8MXOcMcCOybGhw4FgxwQbadTKPIRlA=
k8s.io/apimachinery v0.0.0-20221103075033-9e85d3af4ae2/go.mod h1:zSkBXgO5G/dSQOe256tx5Yo2OJytojpY3bsXu/4/ZJE=
k8s.io/klog/v2 v2.80.1 h1:atnLQ121W371wYYFawwYx1aEY2eUfs4l3J72wtgAwV4=
k8s.io/klog/v2 v2.80.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 h1:+70TFaan3hfJzs+7VK2o+OGxg8HsuBr/5f6tVAjDu6E=

View File

@ -59,15 +59,159 @@ type ThreadSafeStore interface {
Resync() error
}
// storeIndex implements the indexing functionality for Store interface
type storeIndex struct {
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}
func (i *storeIndex) reset() {
i.indices = Indices{}
}
func (i *storeIndex) getKeysFromIndex(indexName string, obj interface{}) (sets.String, error) {
indexFunc := i.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}
indexedValues, err := indexFunc(obj)
if err != nil {
return nil, err
}
index := i.indices[indexName]
var storeKeySet sets.String
if len(indexedValues) == 1 {
// In majority of cases, there is exactly one value matching.
// Optimize the most common path - deduping is not needed here.
storeKeySet = index[indexedValues[0]]
} else {
// Need to de-dupe the return list.
// Since multiple keys are allowed, this can happen.
storeKeySet = sets.String{}
for _, indexedValue := range indexedValues {
for key := range index[indexedValue] {
storeKeySet.Insert(key)
}
}
}
return storeKeySet, nil
}
func (i *storeIndex) getKeysByIndex(indexName, indexedValue string) (sets.String, error) {
indexFunc := i.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}
index := i.indices[indexName]
return index[indexedValue], nil
}
func (i *storeIndex) getIndexValues(indexName string) []string {
index := i.indices[indexName]
names := make([]string, 0, len(index))
for key := range index {
names = append(names, key)
}
return names
}
func (i *storeIndex) addIndexers(newIndexers Indexers) error {
oldKeys := sets.StringKeySet(i.indexers)
newKeys := sets.StringKeySet(newIndexers)
if oldKeys.HasAny(newKeys.List()...) {
return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys))
}
for k, v := range newIndexers {
i.indexers[k] = v
}
return nil
}
// updateIndices modifies the objects location in the managed indexes:
// - for create you must provide only the newObj
// - for update you must provide both the oldObj and the newObj
// - for delete you must provide only the oldObj
// updateIndices must be called from a function that already has a lock on the cache
func (i *storeIndex) updateIndices(oldObj interface{}, newObj interface{}, key string) {
var oldIndexValues, indexValues []string
var err error
for name, indexFunc := range i.indexers {
if oldObj != nil {
oldIndexValues, err = indexFunc(oldObj)
} else {
oldIndexValues = oldIndexValues[:0]
}
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
if newObj != nil {
indexValues, err = indexFunc(newObj)
} else {
indexValues = indexValues[:0]
}
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
index := i.indices[name]
if index == nil {
index = Index{}
i.indices[name] = index
}
if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] {
// We optimize for the most common case where indexFunc returns a single value which has not been changed
continue
}
for _, value := range oldIndexValues {
i.deleteKeyFromIndex(key, value, index)
}
for _, value := range indexValues {
i.addKeyToIndex(key, value, index)
}
}
}
func (i *storeIndex) addKeyToIndex(key, indexValue string, index Index) {
set := index[indexValue]
if set == nil {
set = sets.String{}
index[indexValue] = set
}
set.Insert(key)
}
func (i *storeIndex) deleteKeyFromIndex(key, indexValue string, index Index) {
set := index[indexValue]
if set == nil {
return
}
set.Delete(key)
// If we don't delete the set when zero, indices with high cardinality
// short lived resources can cause memory to increase over time from
// unused empty sets. See `kubernetes/kubernetes/issues/84959`.
if len(set) == 0 {
delete(index, indexValue)
}
}
// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
// index implements the indexing functionality
index *storeIndex
}
func (c *threadSafeMap) Add(key string, obj interface{}) {
@ -79,14 +223,14 @@ func (c *threadSafeMap) Update(key string, obj interface{}) {
defer c.lock.Unlock()
oldObject := c.items[key]
c.items[key] = obj
c.updateIndices(oldObject, obj, key)
c.index.updateIndices(oldObject, obj, key)
}
func (c *threadSafeMap) Delete(key string) {
c.lock.Lock()
defer c.lock.Unlock()
if obj, exists := c.items[key]; exists {
c.updateIndices(obj, nil, key)
c.index.updateIndices(obj, nil, key)
delete(c.items, key)
}
}
@ -126,9 +270,9 @@ func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion st
c.items = items
// rebuild any index
c.indices = Indices{}
c.index.reset()
for key, item := range c.items {
c.updateIndices(nil, item, key)
c.index.updateIndices(nil, item, key)
}
}
@ -138,32 +282,10 @@ func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{},
c.lock.RLock()
defer c.lock.RUnlock()
indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}
indexedValues, err := indexFunc(obj)
storeKeySet, err := c.index.getKeysFromIndex(indexName, obj)
if err != nil {
return nil, err
}
index := c.indices[indexName]
var storeKeySet sets.String
if len(indexedValues) == 1 {
// In majority of cases, there is exactly one value matching.
// Optimize the most common path - deduping is not needed here.
storeKeySet = index[indexedValues[0]]
} else {
// Need to de-dupe the return list.
// Since multiple keys are allowed, this can happen.
storeKeySet = sets.String{}
for _, indexedValue := range indexedValues {
for key := range index[indexedValue] {
storeKeySet.Insert(key)
}
}
}
list := make([]interface{}, 0, storeKeySet.Len())
for storeKey := range storeKeySet {
@ -177,14 +299,10 @@ func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{},
c.lock.RLock()
defer c.lock.RUnlock()
indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
set, err := c.index.getKeysByIndex(indexName, indexedValue)
if err != nil {
return nil, err
}
index := c.indices[indexName]
set := index[indexedValue]
list := make([]interface{}, 0, set.Len())
for key := range set {
list = append(list, c.items[key])
@ -199,14 +317,10 @@ func (c *threadSafeMap) IndexKeys(indexName, indexedValue string) ([]string, err
c.lock.RLock()
defer c.lock.RUnlock()
indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
set, err := c.index.getKeysByIndex(indexName, indexedValue)
if err != nil {
return nil, err
}
index := c.indices[indexName]
set := index[indexedValue]
return set.List(), nil
}
@ -214,16 +328,11 @@ func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
c.lock.RLock()
defer c.lock.RUnlock()
index := c.indices[indexName]
names := make([]string, 0, len(index))
for key := range index {
names = append(names, key)
}
return names
return c.index.getIndexValues(indexName)
}
func (c *threadSafeMap) GetIndexers() Indexers {
return c.indexers
return c.index.indexers
}
func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
@ -234,87 +343,7 @@ func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
return fmt.Errorf("cannot add indexers to running index")
}
oldKeys := sets.StringKeySet(c.indexers)
newKeys := sets.StringKeySet(newIndexers)
if oldKeys.HasAny(newKeys.List()...) {
return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys))
}
for k, v := range newIndexers {
c.indexers[k] = v
}
return nil
}
// updateIndices modifies the objects location in the managed indexes:
// - for create you must provide only the newObj
// - for update you must provide both the oldObj and the newObj
// - for delete you must provide only the oldObj
// updateIndices must be called from a function that already has a lock on the cache
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
var oldIndexValues, indexValues []string
var err error
for name, indexFunc := range c.indexers {
if oldObj != nil {
oldIndexValues, err = indexFunc(oldObj)
} else {
oldIndexValues = oldIndexValues[:0]
}
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
if newObj != nil {
indexValues, err = indexFunc(newObj)
} else {
indexValues = indexValues[:0]
}
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
index := c.indices[name]
if index == nil {
index = Index{}
c.indices[name] = index
}
if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] {
// We optimize for the most common case where indexFunc returns a single value which has not been changed
continue
}
for _, value := range oldIndexValues {
c.deleteKeyFromIndex(key, value, index)
}
for _, value := range indexValues {
c.addKeyToIndex(key, value, index)
}
}
}
func (c *threadSafeMap) addKeyToIndex(key, indexValue string, index Index) {
set := index[indexValue]
if set == nil {
set = sets.String{}
index[indexValue] = set
}
set.Insert(key)
}
func (c *threadSafeMap) deleteKeyFromIndex(key, indexValue string, index Index) {
set := index[indexValue]
if set == nil {
return
}
set.Delete(key)
// If we don't delete the set when zero, indices with high cardinality
// short lived resources can cause memory to increase over time from
// unused empty sets. See `kubernetes/kubernetes/issues/84959`.
if len(set) == 0 {
delete(index, indexValue)
}
return c.index.addIndexers(newIndexers)
}
func (c *threadSafeMap) Resync() error {
@ -325,8 +354,10 @@ func (c *threadSafeMap) Resync() error {
// NewThreadSafeStore creates a new instance of ThreadSafeStore.
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
return &threadSafeMap{
items: map[string]interface{}{},
indexers: indexers,
indices: indices,
items: map[string]interface{}{},
index: &storeIndex{
indexers: indexers,
indices: indices,
},
}
}

View File

@ -43,7 +43,7 @@ func TestThreadSafeStoreDeleteRemovesEmptySetsFromIndex(t *testing.T) {
store.Add(testKey, testKey)
// Assumption check, there should be a set for the `testKey` with one element in the added index
set := store.indices[testIndexer][testKey]
set := store.index.indices[testIndexer][testKey]
if len(set) != 1 {
t.Errorf("Initial assumption of index backing string set having 1 element failed. Actual elements: %d", len(set))
@ -51,7 +51,7 @@ func TestThreadSafeStoreDeleteRemovesEmptySetsFromIndex(t *testing.T) {
}
store.Delete(testKey)
set, present := store.indices[testIndexer][testKey]
set, present := store.index.indices[testIndexer][testKey]
if present {
t.Errorf("Index backing string set not deleted from index. Set length: %d", len(set))
@ -76,7 +76,7 @@ func TestThreadSafeStoreAddKeepsNonEmptySetPostDeleteFromIndex(t *testing.T) {
store.Add("delete", "delete")
// Assumption check, there should be a set for the `testIndex` with two elements
set := store.indices[testIndexer][testIndex]
set := store.index.indices[testIndexer][testIndex]
if len(set) != 2 {
t.Errorf("Initial assumption of index backing string set having 2 elements failed. Actual elements: %d", len(set))
@ -84,7 +84,7 @@ func TestThreadSafeStoreAddKeepsNonEmptySetPostDeleteFromIndex(t *testing.T) {
}
store.Delete("delete")
set, present := store.indices[testIndexer][testIndex]
set, present := store.index.indices[testIndexer][testIndex]
if !present {
t.Errorf("Index backing string set erroneously deleted from index.")
@ -114,7 +114,7 @@ func TestThreadSafeStoreIndexingFunctionsWithMultipleValues(t *testing.T) {
assert := assert.New(t)
compare := func(key string, expected []string) error {
values := store.indices[testIndexer][key].List()
values := store.index.indices[testIndexer][key].List()
if cmp.Equal(values, expected) {
return nil
}