Merge pull request #126754 from serathius/watchcache-btree

Reimplement watch cache storage with btree
This commit is contained in:
Kubernetes Prow Robot 2024-10-29 14:20:58 +00:00 committed by GitHub
commit c83250d104
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 510 additions and 3 deletions

View File

@ -15,6 +15,7 @@ require (
github.com/fsnotify/fsnotify v1.7.0
github.com/go-logr/logr v1.4.2
github.com/gogo/protobuf v1.3.2
github.com/google/btree v1.0.1
github.com/google/cel-go v0.21.0
github.com/google/gnostic-models v0.6.8
github.com/google/go-cmp v0.6.0
@ -81,7 +82,6 @@ require (
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect

View File

@ -19,6 +19,8 @@ package cacher
import (
"fmt"
"github.com/google/btree"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
@ -53,6 +55,12 @@ type storeElement struct {
Fields fields.Set
}
func (t *storeElement) Less(than btree.Item) bool {
return t.Key < than.(*storeElement).Key
}
var _ btree.Item = (*storeElement)(nil)
func storeElementKey(obj interface{}) (string, error) {
elem, ok := obj.(*storeElement)
if !ok {

View File

@ -0,0 +1,393 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"fmt"
"math"
"strings"
"sync"
"github.com/google/btree"
"k8s.io/client-go/tools/cache"
)
// newThreadedBtreeStoreIndexer returns a storage for cacher by adding locking over the two 2 data structures:
// * btree based storage for efficient LIST operation on prefix
// * map based indexer for retrieving values by index.
// This separation is used to allow independent snapshotting those two storages in the future.
// Intention is to utilize btree for its cheap snapshots that don't require locking if don't mutate data.
func newThreadedBtreeStoreIndexer(indexers cache.Indexers, degree int) *threadedStoreIndexer {
return &threadedStoreIndexer{
store: newBtreeStore(degree),
indexer: newIndexer(indexers),
}
}
type threadedStoreIndexer struct {
lock sync.RWMutex
store btreeStore
indexer indexer
}
func (si *threadedStoreIndexer) Add(obj interface{}) error {
return si.addOrUpdate(obj)
}
func (si *threadedStoreIndexer) Update(obj interface{}) error {
return si.addOrUpdate(obj)
}
func (si *threadedStoreIndexer) addOrUpdate(obj interface{}) error {
if obj == nil {
return fmt.Errorf("obj cannot be nil")
}
newElem, ok := obj.(*storeElement)
if !ok {
return fmt.Errorf("obj not a storeElement: %#v", obj)
}
si.lock.Lock()
defer si.lock.Unlock()
oldElem := si.store.addOrUpdateElem(newElem)
return si.indexer.updateElem(newElem.Key, oldElem, newElem)
}
func (si *threadedStoreIndexer) Delete(obj interface{}) error {
storeElem, ok := obj.(*storeElement)
if !ok {
return fmt.Errorf("obj not a storeElement: %#v", obj)
}
si.lock.Lock()
defer si.lock.Unlock()
oldObj := si.store.deleteElem(storeElem)
if oldObj == nil {
return nil
}
return si.indexer.updateElem(storeElem.Key, oldObj.(*storeElement), nil)
}
func (si *threadedStoreIndexer) List() []interface{} {
si.lock.RLock()
defer si.lock.RUnlock()
return si.store.List()
}
func (si *threadedStoreIndexer) ListPrefix(prefix, continueKey string, limit int) ([]interface{}, bool) {
si.lock.RLock()
defer si.lock.RUnlock()
return si.store.ListPrefix(prefix, continueKey, limit)
}
func (si *threadedStoreIndexer) ListKeys() []string {
si.lock.RLock()
defer si.lock.RUnlock()
return si.store.ListKeys()
}
func (si *threadedStoreIndexer) Get(obj interface{}) (item interface{}, exists bool, err error) {
si.lock.RLock()
defer si.lock.RUnlock()
return si.store.Get(obj)
}
func (si *threadedStoreIndexer) GetByKey(key string) (item interface{}, exists bool, err error) {
si.lock.RLock()
defer si.lock.RUnlock()
return si.store.GetByKey(key)
}
func (si *threadedStoreIndexer) Replace(objs []interface{}, resourceVersion string) error {
si.lock.Lock()
defer si.lock.Unlock()
err := si.store.Replace(objs, resourceVersion)
if err != nil {
return err
}
return si.indexer.Replace(objs, resourceVersion)
}
func (si *threadedStoreIndexer) ByIndex(indexName, indexValue string) ([]interface{}, error) {
si.lock.RLock()
defer si.lock.RUnlock()
return si.indexer.ByIndex(indexName, indexValue)
}
func newBtreeStore(degree int) btreeStore {
return btreeStore{
tree: btree.New(degree),
}
}
type btreeStore struct {
tree *btree.BTree
}
func (s *btreeStore) Add(obj interface{}) error {
if obj == nil {
return fmt.Errorf("obj cannot be nil")
}
storeElem, ok := obj.(*storeElement)
if !ok {
return fmt.Errorf("obj not a storeElement: %#v", obj)
}
s.addOrUpdateElem(storeElem)
return nil
}
func (s *btreeStore) Update(obj interface{}) error {
if obj == nil {
return fmt.Errorf("obj cannot be nil")
}
storeElem, ok := obj.(*storeElement)
if !ok {
return fmt.Errorf("obj not a storeElement: %#v", obj)
}
s.addOrUpdateElem(storeElem)
return nil
}
func (s *btreeStore) Delete(obj interface{}) error {
if obj == nil {
return fmt.Errorf("obj cannot be nil")
}
storeElem, ok := obj.(*storeElement)
if !ok {
return fmt.Errorf("obj not a storeElement: %#v", obj)
}
s.deleteElem(storeElem)
return nil
}
func (s *btreeStore) deleteElem(storeElem *storeElement) interface{} {
return s.tree.Delete(storeElem)
}
func (s *btreeStore) List() []interface{} {
items := make([]interface{}, 0, s.tree.Len())
s.tree.Ascend(func(i btree.Item) bool {
items = append(items, i.(interface{}))
return true
})
return items
}
func (s *btreeStore) ListKeys() []string {
items := make([]string, 0, s.tree.Len())
s.tree.Ascend(func(i btree.Item) bool {
items = append(items, i.(*storeElement).Key)
return true
})
return items
}
func (s *btreeStore) Get(obj interface{}) (item interface{}, exists bool, err error) {
storeElem, ok := obj.(*storeElement)
if !ok {
return nil, false, fmt.Errorf("obj is not a storeElement")
}
item = s.tree.Get(storeElem)
if item == nil {
return nil, false, nil
}
return item, true, nil
}
func (s *btreeStore) GetByKey(key string) (item interface{}, exists bool, err error) {
return s.getByKey(key)
}
func (s *btreeStore) Replace(objs []interface{}, _ string) error {
s.tree.Clear(false)
for _, obj := range objs {
storeElem, ok := obj.(*storeElement)
if !ok {
return fmt.Errorf("obj not a storeElement: %#v", obj)
}
s.addOrUpdateElem(storeElem)
}
return nil
}
// addOrUpdateLocked assumes a lock is held and is used for Add
// and Update operations.
func (s *btreeStore) addOrUpdateElem(storeElem *storeElement) *storeElement {
oldObj := s.tree.ReplaceOrInsert(storeElem)
if oldObj == nil {
return nil
}
return oldObj.(*storeElement)
}
func (s *btreeStore) getByKey(key string) (item interface{}, exists bool, err error) {
keyElement := &storeElement{Key: key}
item = s.tree.Get(keyElement)
return item, item != nil, nil
}
func (s *btreeStore) ListPrefix(prefix, continueKey string, limit int) ([]interface{}, bool) {
if limit < 0 {
return nil, false
}
if continueKey == "" {
continueKey = prefix
}
var result []interface{}
var hasMore bool
if limit == 0 {
limit = math.MaxInt
}
s.tree.AscendGreaterOrEqual(&storeElement{Key: continueKey}, func(i btree.Item) bool {
elementKey := i.(*storeElement).Key
if !strings.HasPrefix(elementKey, prefix) {
return false
}
// TODO: Might be worth to lookup one more item to provide more accurate HasMore.
if len(result) >= limit {
hasMore = true
return false
}
result = append(result, i.(interface{}))
return true
})
return result, hasMore
}
func (s *btreeStore) Count(prefix, continueKey string) (count int) {
if continueKey == "" {
continueKey = prefix
}
s.tree.AscendGreaterOrEqual(&storeElement{Key: continueKey}, func(i btree.Item) bool {
elementKey := i.(*storeElement).Key
if !strings.HasPrefix(elementKey, prefix) {
return false
}
count++
return true
})
return count
}
// newIndexer returns a indexer similar to storeIndex from client-go/tools/cache.
// TODO: Unify the indexer code with client-go/cache package.
// Major differences is type of values stored and their mutability:
// * Indexer in client-go stores object keys, that are not mutable.
// * Indexer in cacher stores whole objects, which is mutable.
// Indexer in client-go uses keys as it is used in conjunction with map[key]value
// allowing for fast value retrieval, while btree used in cacher would provide additional overhead.
// Difference in mutability of stored values is used for optimizing some operations in client-go Indexer.
func newIndexer(indexers cache.Indexers) indexer {
return indexer{
indices: map[string]map[string]map[string]*storeElement{},
indexers: indexers,
}
}
type indexer struct {
indices map[string]map[string]map[string]*storeElement
indexers cache.Indexers
}
func (i *indexer) ByIndex(indexName, indexValue string) ([]interface{}, error) {
indexFunc := i.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("index with name %s does not exist", indexName)
}
index := i.indices[indexName]
set := index[indexValue]
list := make([]interface{}, 0, len(set))
for _, obj := range set {
list = append(list, obj)
}
return list, nil
}
func (i *indexer) Replace(objs []interface{}, resourceVersion string) error {
i.indices = map[string]map[string]map[string]*storeElement{}
for _, obj := range objs {
storeElem, ok := obj.(*storeElement)
if !ok {
return fmt.Errorf("obj not a storeElement: %#v", obj)
}
err := i.updateElem(storeElem.Key, nil, storeElem)
if err != nil {
return err
}
}
return nil
}
func (i *indexer) updateElem(key string, oldObj, newObj *storeElement) (err error) {
var oldIndexValues, indexValues []string
for name, indexFunc := range i.indexers {
if oldObj != nil {
oldIndexValues, err = indexFunc(oldObj)
} else {
oldIndexValues = oldIndexValues[:0]
}
if err != nil {
return fmt.Errorf("unable to calculate an index entry for key %q on index %q: %w", key, name, err)
}
if newObj != nil {
indexValues, err = indexFunc(newObj)
} else {
indexValues = indexValues[:0]
}
if err != nil {
return fmt.Errorf("unable to calculate an index entry for key %q on index %q: %w", key, name, err)
}
index := i.indices[name]
if index == nil {
index = map[string]map[string]*storeElement{}
i.indices[name] = index
}
if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] {
// We optimize for the most common case where indexFunc returns a single value which has not been changed
i.add(key, indexValues[0], newObj, index)
continue
}
for _, value := range oldIndexValues {
i.delete(key, value, index)
}
for _, value := range indexValues {
i.add(key, value, newObj, index)
}
}
return nil
}
func (i *indexer) add(key, value string, obj *storeElement, index map[string]map[string]*storeElement) {
set := index[value]
if set == nil {
set = map[string]*storeElement{}
index[value] = set
}
set[key] = obj
}
func (i *indexer) delete(key, value string, index map[string]map[string]*storeElement) {
set := index[value]
if set == nil {
return
}
delete(set, key)
// If we don's delete the set when zero, indices with high cardinality
// short lived resources can cause memory to increase over time from
// unused empty sets. See `kubernetes/kubernetes/issues/84959`.
if len(set) == 0 {
delete(index, value)
}
}

View File

@ -0,0 +1,81 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestStoreListOrdered(t *testing.T) {
store := newThreadedBtreeStoreIndexer(nil, 32)
assert.NoError(t, store.Add(testStorageElement("foo3", "bar3", 1)))
assert.NoError(t, store.Add(testStorageElement("foo1", "bar2", 2)))
assert.NoError(t, store.Add(testStorageElement("foo2", "bar1", 3)))
assert.Equal(t, []interface{}{
testStorageElement("foo1", "bar2", 2),
testStorageElement("foo2", "bar1", 3),
testStorageElement("foo3", "bar3", 1),
}, store.List())
}
func TestStoreListPrefix(t *testing.T) {
store := newThreadedBtreeStoreIndexer(nil, 32)
assert.NoError(t, store.Add(testStorageElement("foo3", "bar3", 1)))
assert.NoError(t, store.Add(testStorageElement("foo1", "bar2", 2)))
assert.NoError(t, store.Add(testStorageElement("foo2", "bar1", 3)))
assert.NoError(t, store.Add(testStorageElement("bar", "baz", 4)))
items, hasMore := store.ListPrefix("foo", "", 0)
assert.False(t, hasMore)
assert.Equal(t, []interface{}{
testStorageElement("foo1", "bar2", 2),
testStorageElement("foo2", "bar1", 3),
testStorageElement("foo3", "bar3", 1),
}, items)
items, hasMore = store.ListPrefix("foo2", "", 0)
assert.False(t, hasMore)
assert.Equal(t, []interface{}{
testStorageElement("foo2", "bar1", 3),
}, items)
items, hasMore = store.ListPrefix("foo", "", 1)
assert.True(t, hasMore)
assert.Equal(t, []interface{}{
testStorageElement("foo1", "bar2", 2),
}, items)
items, hasMore = store.ListPrefix("foo", "foo1\x00", 1)
assert.True(t, hasMore)
assert.Equal(t, []interface{}{
testStorageElement("foo2", "bar1", 3),
}, items)
items, hasMore = store.ListPrefix("foo", "foo2\x00", 1)
assert.False(t, hasMore)
assert.Equal(t, []interface{}{
testStorageElement("foo3", "bar3", 1),
}, items)
items, hasMore = store.ListPrefix("bar", "", 0)
assert.False(t, hasMore)
assert.Equal(t, []interface{}{
testStorageElement("bar", "baz", 4),
}, items)
}

View File

@ -28,7 +28,17 @@ import (
)
func TestStoreSingleKey(t *testing.T) {
store := newStoreIndexer(nil)
t.Run("cache.Indexer", func(t *testing.T) {
store := newStoreIndexer(testStoreIndexers())
testStoreSingleKey(t, store)
})
t.Run("btree", func(t *testing.T) {
store := newThreadedBtreeStoreIndexer(storeElementIndexers(testStoreIndexers()), 32)
testStoreSingleKey(t, store)
})
}
func testStoreSingleKey(t *testing.T, store storeIndexer) {
assertStoreEmpty(t, store, "foo")
require.NoError(t, store.Add(testStorageElement("foo", "bar", 1)))
@ -50,7 +60,17 @@ func TestStoreSingleKey(t *testing.T) {
}
func TestStoreIndexerSingleKey(t *testing.T) {
store := newStoreIndexer(testStoreIndexers())
t.Run("cache.Indexer", func(t *testing.T) {
store := newStoreIndexer(testStoreIndexers())
testStoreIndexerSingleKey(t, store)
})
t.Run("btree", func(t *testing.T) {
store := newThreadedBtreeStoreIndexer(storeElementIndexers(testStoreIndexers()), 32)
testStoreIndexerSingleKey(t, store)
})
}
func testStoreIndexerSingleKey(t *testing.T, store storeIndexer) {
items, err := store.ByIndex("by_val", "bar")
require.NoError(t, err)
assert.Empty(t, items)

View File

@ -47,6 +47,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/google/cel-go v0.21.0 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/gofuzz v1.2.0 // indirect

View File

@ -51,6 +51,7 @@ require (
github.com/go-openapi/swag v0.23.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/google/cel-go v0.21.0 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/uuid v1.6.0 // indirect

View File

@ -31,6 +31,7 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA=
github.com/google/cel-go v0.21.0/go.mod h1:rHUlWCcBKgyEk+eV03RPdZUekPp6YcJwV0FxuUksYxc=
github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=

View File

@ -45,6 +45,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/google/cel-go v0.21.0 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/gofuzz v1.2.0 // indirect

View File

@ -43,6 +43,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/google/cel-go v0.21.0 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect