diff --git a/staging/src/k8s.io/apiserver/go.mod b/staging/src/k8s.io/apiserver/go.mod index 2054b13ab0a..bbbacabaaed 100644 --- a/staging/src/k8s.io/apiserver/go.mod +++ b/staging/src/k8s.io/apiserver/go.mod @@ -15,6 +15,7 @@ require ( github.com/fsnotify/fsnotify v1.7.0 github.com/go-logr/logr v1.4.2 github.com/gogo/protobuf v1.3.2 + github.com/google/btree v1.0.1 github.com/google/cel-go v0.21.0 github.com/google/gnostic-models v0.6.8 github.com/google/go-cmp v0.6.0 @@ -81,7 +82,6 @@ require ( github.com/golang-jwt/jwt/v4 v4.5.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect - github.com/google/btree v1.0.1 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/store.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/store.go index 2167acffa3f..c0007b73102 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/store.go @@ -19,6 +19,8 @@ package cacher import ( "fmt" + "github.com/google/btree" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" @@ -53,6 +55,12 @@ type storeElement struct { Fields fields.Set } +func (t *storeElement) Less(than btree.Item) bool { + return t.Key < than.(*storeElement).Key +} + +var _ btree.Item = (*storeElement)(nil) + func storeElementKey(obj interface{}) (string, error) { elem, ok := obj.(*storeElement) if !ok { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/store_btree.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/store_btree.go new file mode 100644 index 00000000000..b4af96920a2 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/store_btree.go @@ -0,0 +1,393 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cacher + +import ( + "fmt" + "math" + "strings" + "sync" + + "github.com/google/btree" + "k8s.io/client-go/tools/cache" +) + +// newThreadedBtreeStoreIndexer returns a storage for cacher by adding locking over the two 2 data structures: +// * btree based storage for efficient LIST operation on prefix +// * map based indexer for retrieving values by index. +// This separation is used to allow independent snapshotting those two storages in the future. +// Intention is to utilize btree for its cheap snapshots that don't require locking if don't mutate data. +func newThreadedBtreeStoreIndexer(indexers cache.Indexers, degree int) *threadedStoreIndexer { + return &threadedStoreIndexer{ + store: newBtreeStore(degree), + indexer: newIndexer(indexers), + } +} + +type threadedStoreIndexer struct { + lock sync.RWMutex + store btreeStore + indexer indexer +} + +func (si *threadedStoreIndexer) Add(obj interface{}) error { + return si.addOrUpdate(obj) +} + +func (si *threadedStoreIndexer) Update(obj interface{}) error { + return si.addOrUpdate(obj) +} + +func (si *threadedStoreIndexer) addOrUpdate(obj interface{}) error { + if obj == nil { + return fmt.Errorf("obj cannot be nil") + } + newElem, ok := obj.(*storeElement) + if !ok { + return fmt.Errorf("obj not a storeElement: %#v", obj) + } + si.lock.Lock() + defer si.lock.Unlock() + oldElem := si.store.addOrUpdateElem(newElem) + return si.indexer.updateElem(newElem.Key, oldElem, newElem) +} + +func (si *threadedStoreIndexer) Delete(obj interface{}) error { + storeElem, ok := obj.(*storeElement) + if !ok { + return fmt.Errorf("obj not a storeElement: %#v", obj) + } + si.lock.Lock() + defer si.lock.Unlock() + oldObj := si.store.deleteElem(storeElem) + if oldObj == nil { + return nil + } + return si.indexer.updateElem(storeElem.Key, oldObj.(*storeElement), nil) +} + +func (si *threadedStoreIndexer) List() []interface{} { + si.lock.RLock() + defer si.lock.RUnlock() + return si.store.List() +} + +func (si *threadedStoreIndexer) ListPrefix(prefix, continueKey string, limit int) ([]interface{}, bool) { + si.lock.RLock() + defer si.lock.RUnlock() + return si.store.ListPrefix(prefix, continueKey, limit) +} + +func (si *threadedStoreIndexer) ListKeys() []string { + si.lock.RLock() + defer si.lock.RUnlock() + return si.store.ListKeys() +} + +func (si *threadedStoreIndexer) Get(obj interface{}) (item interface{}, exists bool, err error) { + si.lock.RLock() + defer si.lock.RUnlock() + return si.store.Get(obj) +} + +func (si *threadedStoreIndexer) GetByKey(key string) (item interface{}, exists bool, err error) { + si.lock.RLock() + defer si.lock.RUnlock() + return si.store.GetByKey(key) +} + +func (si *threadedStoreIndexer) Replace(objs []interface{}, resourceVersion string) error { + si.lock.Lock() + defer si.lock.Unlock() + err := si.store.Replace(objs, resourceVersion) + if err != nil { + return err + } + return si.indexer.Replace(objs, resourceVersion) +} + +func (si *threadedStoreIndexer) ByIndex(indexName, indexValue string) ([]interface{}, error) { + si.lock.RLock() + defer si.lock.RUnlock() + return si.indexer.ByIndex(indexName, indexValue) +} + +func newBtreeStore(degree int) btreeStore { + return btreeStore{ + tree: btree.New(degree), + } +} + +type btreeStore struct { + tree *btree.BTree +} + +func (s *btreeStore) Add(obj interface{}) error { + if obj == nil { + return fmt.Errorf("obj cannot be nil") + } + storeElem, ok := obj.(*storeElement) + if !ok { + return fmt.Errorf("obj not a storeElement: %#v", obj) + } + s.addOrUpdateElem(storeElem) + return nil +} + +func (s *btreeStore) Update(obj interface{}) error { + if obj == nil { + return fmt.Errorf("obj cannot be nil") + } + storeElem, ok := obj.(*storeElement) + if !ok { + return fmt.Errorf("obj not a storeElement: %#v", obj) + } + s.addOrUpdateElem(storeElem) + return nil +} + +func (s *btreeStore) Delete(obj interface{}) error { + if obj == nil { + return fmt.Errorf("obj cannot be nil") + } + storeElem, ok := obj.(*storeElement) + if !ok { + return fmt.Errorf("obj not a storeElement: %#v", obj) + } + s.deleteElem(storeElem) + return nil +} + +func (s *btreeStore) deleteElem(storeElem *storeElement) interface{} { + return s.tree.Delete(storeElem) +} + +func (s *btreeStore) List() []interface{} { + items := make([]interface{}, 0, s.tree.Len()) + s.tree.Ascend(func(i btree.Item) bool { + items = append(items, i.(interface{})) + return true + }) + return items +} + +func (s *btreeStore) ListKeys() []string { + items := make([]string, 0, s.tree.Len()) + s.tree.Ascend(func(i btree.Item) bool { + items = append(items, i.(*storeElement).Key) + return true + }) + return items +} + +func (s *btreeStore) Get(obj interface{}) (item interface{}, exists bool, err error) { + storeElem, ok := obj.(*storeElement) + if !ok { + return nil, false, fmt.Errorf("obj is not a storeElement") + } + item = s.tree.Get(storeElem) + if item == nil { + return nil, false, nil + } + return item, true, nil +} + +func (s *btreeStore) GetByKey(key string) (item interface{}, exists bool, err error) { + return s.getByKey(key) +} + +func (s *btreeStore) Replace(objs []interface{}, _ string) error { + s.tree.Clear(false) + for _, obj := range objs { + storeElem, ok := obj.(*storeElement) + if !ok { + return fmt.Errorf("obj not a storeElement: %#v", obj) + } + s.addOrUpdateElem(storeElem) + } + return nil +} + +// addOrUpdateLocked assumes a lock is held and is used for Add +// and Update operations. +func (s *btreeStore) addOrUpdateElem(storeElem *storeElement) *storeElement { + oldObj := s.tree.ReplaceOrInsert(storeElem) + if oldObj == nil { + return nil + } + return oldObj.(*storeElement) +} + +func (s *btreeStore) getByKey(key string) (item interface{}, exists bool, err error) { + keyElement := &storeElement{Key: key} + item = s.tree.Get(keyElement) + return item, item != nil, nil +} + +func (s *btreeStore) ListPrefix(prefix, continueKey string, limit int) ([]interface{}, bool) { + if limit < 0 { + return nil, false + } + if continueKey == "" { + continueKey = prefix + } + var result []interface{} + var hasMore bool + if limit == 0 { + limit = math.MaxInt + } + s.tree.AscendGreaterOrEqual(&storeElement{Key: continueKey}, func(i btree.Item) bool { + elementKey := i.(*storeElement).Key + if !strings.HasPrefix(elementKey, prefix) { + return false + } + // TODO: Might be worth to lookup one more item to provide more accurate HasMore. + if len(result) >= limit { + hasMore = true + return false + } + result = append(result, i.(interface{})) + return true + }) + return result, hasMore +} + +func (s *btreeStore) Count(prefix, continueKey string) (count int) { + if continueKey == "" { + continueKey = prefix + } + s.tree.AscendGreaterOrEqual(&storeElement{Key: continueKey}, func(i btree.Item) bool { + elementKey := i.(*storeElement).Key + if !strings.HasPrefix(elementKey, prefix) { + return false + } + count++ + return true + }) + return count +} + +// newIndexer returns a indexer similar to storeIndex from client-go/tools/cache. +// TODO: Unify the indexer code with client-go/cache package. +// Major differences is type of values stored and their mutability: +// * Indexer in client-go stores object keys, that are not mutable. +// * Indexer in cacher stores whole objects, which is mutable. +// Indexer in client-go uses keys as it is used in conjunction with map[key]value +// allowing for fast value retrieval, while btree used in cacher would provide additional overhead. +// Difference in mutability of stored values is used for optimizing some operations in client-go Indexer. +func newIndexer(indexers cache.Indexers) indexer { + return indexer{ + indices: map[string]map[string]map[string]*storeElement{}, + indexers: indexers, + } +} + +type indexer struct { + indices map[string]map[string]map[string]*storeElement + indexers cache.Indexers +} + +func (i *indexer) ByIndex(indexName, indexValue string) ([]interface{}, error) { + indexFunc := i.indexers[indexName] + if indexFunc == nil { + return nil, fmt.Errorf("index with name %s does not exist", indexName) + } + index := i.indices[indexName] + set := index[indexValue] + list := make([]interface{}, 0, len(set)) + for _, obj := range set { + list = append(list, obj) + } + return list, nil +} + +func (i *indexer) Replace(objs []interface{}, resourceVersion string) error { + i.indices = map[string]map[string]map[string]*storeElement{} + for _, obj := range objs { + storeElem, ok := obj.(*storeElement) + if !ok { + return fmt.Errorf("obj not a storeElement: %#v", obj) + } + err := i.updateElem(storeElem.Key, nil, storeElem) + if err != nil { + return err + } + } + return nil +} + +func (i *indexer) updateElem(key string, oldObj, newObj *storeElement) (err error) { + var oldIndexValues, indexValues []string + for name, indexFunc := range i.indexers { + if oldObj != nil { + oldIndexValues, err = indexFunc(oldObj) + } else { + oldIndexValues = oldIndexValues[:0] + } + if err != nil { + return fmt.Errorf("unable to calculate an index entry for key %q on index %q: %w", key, name, err) + } + if newObj != nil { + indexValues, err = indexFunc(newObj) + } else { + indexValues = indexValues[:0] + } + if err != nil { + return fmt.Errorf("unable to calculate an index entry for key %q on index %q: %w", key, name, err) + } + index := i.indices[name] + if index == nil { + index = map[string]map[string]*storeElement{} + i.indices[name] = index + } + if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] { + // We optimize for the most common case where indexFunc returns a single value which has not been changed + i.add(key, indexValues[0], newObj, index) + continue + } + for _, value := range oldIndexValues { + i.delete(key, value, index) + } + for _, value := range indexValues { + i.add(key, value, newObj, index) + } + } + return nil +} + +func (i *indexer) add(key, value string, obj *storeElement, index map[string]map[string]*storeElement) { + set := index[value] + if set == nil { + set = map[string]*storeElement{} + index[value] = set + } + set[key] = obj +} + +func (i *indexer) delete(key, value string, index map[string]map[string]*storeElement) { + set := index[value] + if set == nil { + return + } + delete(set, key) + // If we don's delete the set when zero, indices with high cardinality + // short lived resources can cause memory to increase over time from + // unused empty sets. See `kubernetes/kubernetes/issues/84959`. + if len(set) == 0 { + delete(index, value) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/store_btree_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/store_btree_test.go new file mode 100644 index 00000000000..1b5143a4462 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/store_btree_test.go @@ -0,0 +1,81 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cacher + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestStoreListOrdered(t *testing.T) { + store := newThreadedBtreeStoreIndexer(nil, 32) + assert.NoError(t, store.Add(testStorageElement("foo3", "bar3", 1))) + assert.NoError(t, store.Add(testStorageElement("foo1", "bar2", 2))) + assert.NoError(t, store.Add(testStorageElement("foo2", "bar1", 3))) + assert.Equal(t, []interface{}{ + testStorageElement("foo1", "bar2", 2), + testStorageElement("foo2", "bar1", 3), + testStorageElement("foo3", "bar3", 1), + }, store.List()) +} + +func TestStoreListPrefix(t *testing.T) { + store := newThreadedBtreeStoreIndexer(nil, 32) + assert.NoError(t, store.Add(testStorageElement("foo3", "bar3", 1))) + assert.NoError(t, store.Add(testStorageElement("foo1", "bar2", 2))) + assert.NoError(t, store.Add(testStorageElement("foo2", "bar1", 3))) + assert.NoError(t, store.Add(testStorageElement("bar", "baz", 4))) + + items, hasMore := store.ListPrefix("foo", "", 0) + assert.False(t, hasMore) + assert.Equal(t, []interface{}{ + testStorageElement("foo1", "bar2", 2), + testStorageElement("foo2", "bar1", 3), + testStorageElement("foo3", "bar3", 1), + }, items) + + items, hasMore = store.ListPrefix("foo2", "", 0) + assert.False(t, hasMore) + assert.Equal(t, []interface{}{ + testStorageElement("foo2", "bar1", 3), + }, items) + + items, hasMore = store.ListPrefix("foo", "", 1) + assert.True(t, hasMore) + assert.Equal(t, []interface{}{ + testStorageElement("foo1", "bar2", 2), + }, items) + + items, hasMore = store.ListPrefix("foo", "foo1\x00", 1) + assert.True(t, hasMore) + assert.Equal(t, []interface{}{ + testStorageElement("foo2", "bar1", 3), + }, items) + + items, hasMore = store.ListPrefix("foo", "foo2\x00", 1) + assert.False(t, hasMore) + assert.Equal(t, []interface{}{ + testStorageElement("foo3", "bar3", 1), + }, items) + + items, hasMore = store.ListPrefix("bar", "", 0) + assert.False(t, hasMore) + assert.Equal(t, []interface{}{ + testStorageElement("bar", "baz", 4), + }, items) +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/store_test.go index fbd6e819684..4703066c19f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/store_test.go @@ -28,7 +28,17 @@ import ( ) func TestStoreSingleKey(t *testing.T) { - store := newStoreIndexer(nil) + t.Run("cache.Indexer", func(t *testing.T) { + store := newStoreIndexer(testStoreIndexers()) + testStoreSingleKey(t, store) + }) + t.Run("btree", func(t *testing.T) { + store := newThreadedBtreeStoreIndexer(storeElementIndexers(testStoreIndexers()), 32) + testStoreSingleKey(t, store) + }) +} + +func testStoreSingleKey(t *testing.T, store storeIndexer) { assertStoreEmpty(t, store, "foo") require.NoError(t, store.Add(testStorageElement("foo", "bar", 1))) @@ -50,7 +60,17 @@ func TestStoreSingleKey(t *testing.T) { } func TestStoreIndexerSingleKey(t *testing.T) { - store := newStoreIndexer(testStoreIndexers()) + t.Run("cache.Indexer", func(t *testing.T) { + store := newStoreIndexer(testStoreIndexers()) + testStoreIndexerSingleKey(t, store) + }) + t.Run("btree", func(t *testing.T) { + store := newThreadedBtreeStoreIndexer(storeElementIndexers(testStoreIndexers()), 32) + testStoreIndexerSingleKey(t, store) + }) +} + +func testStoreIndexerSingleKey(t *testing.T, store storeIndexer) { items, err := store.ByIndex("by_val", "bar") require.NoError(t, err) assert.Empty(t, items) diff --git a/staging/src/k8s.io/cloud-provider/go.mod b/staging/src/k8s.io/cloud-provider/go.mod index 8fe7f5d151d..c6611134d4c 100644 --- a/staging/src/k8s.io/cloud-provider/go.mod +++ b/staging/src/k8s.io/cloud-provider/go.mod @@ -47,6 +47,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/google/btree v1.0.1 // indirect github.com/google/cel-go v0.21.0 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/gofuzz v1.2.0 // indirect diff --git a/staging/src/k8s.io/kube-aggregator/go.mod b/staging/src/k8s.io/kube-aggregator/go.mod index 51b20dbd9e0..2c6f8a6f75d 100644 --- a/staging/src/k8s.io/kube-aggregator/go.mod +++ b/staging/src/k8s.io/kube-aggregator/go.mod @@ -51,6 +51,7 @@ require ( github.com/go-openapi/swag v0.23.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/google/btree v1.0.1 // indirect github.com/google/cel-go v0.21.0 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/uuid v1.6.0 // indirect diff --git a/staging/src/k8s.io/kube-controller-manager/go.sum b/staging/src/k8s.io/kube-controller-manager/go.sum index f1b0a77a7a5..64d9e8f7d2c 100644 --- a/staging/src/k8s.io/kube-controller-manager/go.sum +++ b/staging/src/k8s.io/kube-controller-manager/go.sum @@ -31,6 +31,7 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA= github.com/google/cel-go v0.21.0/go.mod h1:rHUlWCcBKgyEk+eV03RPdZUekPp6YcJwV0FxuUksYxc= github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= diff --git a/staging/src/k8s.io/pod-security-admission/go.mod b/staging/src/k8s.io/pod-security-admission/go.mod index bfae291ea86..b48534f2966 100644 --- a/staging/src/k8s.io/pod-security-admission/go.mod +++ b/staging/src/k8s.io/pod-security-admission/go.mod @@ -45,6 +45,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/google/btree v1.0.1 // indirect github.com/google/cel-go v0.21.0 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/gofuzz v1.2.0 // indirect diff --git a/staging/src/k8s.io/sample-apiserver/go.mod b/staging/src/k8s.io/sample-apiserver/go.mod index a129447f41d..ca4f6137853 100644 --- a/staging/src/k8s.io/sample-apiserver/go.mod +++ b/staging/src/k8s.io/sample-apiserver/go.mod @@ -43,6 +43,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/google/btree v1.0.1 // indirect github.com/google/cel-go v0.21.0 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/go-cmp v0.6.0 // indirect