Merge pull request #126968 from serathius/watchache-refactor-store

Watch cache refactor watch cache store and add tests
This commit is contained in:
Kubernetes Prow Robot 2024-08-29 10:24:29 +01:00 committed by GitHub
commit 6de748d97b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 269 additions and 65 deletions

View File

@ -0,0 +1,91 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"fmt"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
)
type storeIndexer interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
Replace([]interface{}, string) error
ByIndex(indexName, indexedValue string) ([]interface{}, error)
}
func newStoreIndexer(indexers *cache.Indexers) storeIndexer {
return cache.NewIndexer(storeElementKey, storeElementIndexers(indexers))
}
// Computing a key of an object is generally non-trivial (it performs
// e.g. validation underneath). Similarly computing object fields and
// labels. To avoid computing them multiple times (to serve the event
// in different List/Watch requests), in the underlying store we are
// keeping structs (key, object, labels, fields).
type storeElement struct {
Key string
Object runtime.Object
Labels labels.Set
Fields fields.Set
}
func storeElementKey(obj interface{}) (string, error) {
elem, ok := obj.(*storeElement)
if !ok {
return "", fmt.Errorf("not a storeElement: %v", obj)
}
return elem.Key, nil
}
func storeElementObject(obj interface{}) (runtime.Object, error) {
elem, ok := obj.(*storeElement)
if !ok {
return nil, fmt.Errorf("not a storeElement: %v", obj)
}
return elem.Object, nil
}
func storeElementIndexFunc(objIndexFunc cache.IndexFunc) cache.IndexFunc {
return func(obj interface{}) (strings []string, e error) {
seo, err := storeElementObject(obj)
if err != nil {
return nil, err
}
return objIndexFunc(seo)
}
}
func storeElementIndexers(indexers *cache.Indexers) cache.Indexers {
if indexers == nil {
return cache.Indexers{}
}
ret := cache.Indexers{}
for indexName, indexFunc := range *indexers {
ret[indexName] = storeElementIndexFunc(indexFunc)
}
return ret
}

View File

@ -0,0 +1,157 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"
)
func TestStoreSingleKey(t *testing.T) {
store := newStoreIndexer(nil)
assertStoreEmpty(t, store, "foo")
require.NoError(t, store.Add(testStorageElement("foo", "bar", 1)))
assertStoreSingleKey(t, store, "foo", "bar", 1)
require.NoError(t, store.Update(testStorageElement("foo", "baz", 2)))
assertStoreSingleKey(t, store, "foo", "baz", 2)
require.NoError(t, store.Update(testStorageElement("foo", "baz", 3)))
assertStoreSingleKey(t, store, "foo", "baz", 3)
require.NoError(t, store.Replace([]interface{}{testStorageElement("foo", "bar", 4)}, ""))
assertStoreSingleKey(t, store, "foo", "bar", 4)
require.NoError(t, store.Delete(testStorageElement("foo", "", 0)))
assertStoreEmpty(t, store, "foo")
require.NoError(t, store.Delete(testStorageElement("foo", "", 0)))
}
func TestStoreIndexerSingleKey(t *testing.T) {
store := newStoreIndexer(testStoreIndexers())
items, err := store.ByIndex("by_val", "bar")
require.NoError(t, err)
assert.Empty(t, items)
require.NoError(t, store.Add(testStorageElement("foo", "bar", 1)))
items, err = store.ByIndex("by_val", "bar")
require.NoError(t, err)
assert.Equal(t, []interface{}{
testStorageElement("foo", "bar", 1),
}, items)
require.NoError(t, store.Update(testStorageElement("foo", "baz", 2)))
items, err = store.ByIndex("by_val", "bar")
require.NoError(t, err)
assert.Empty(t, items)
items, err = store.ByIndex("by_val", "baz")
require.NoError(t, err)
assert.Equal(t, []interface{}{
testStorageElement("foo", "baz", 2),
}, items)
require.NoError(t, store.Update(testStorageElement("foo", "baz", 3)))
items, err = store.ByIndex("by_val", "bar")
require.NoError(t, err)
assert.Empty(t, items)
items, err = store.ByIndex("by_val", "baz")
require.NoError(t, err)
assert.Equal(t, []interface{}{
testStorageElement("foo", "baz", 3),
}, items)
require.NoError(t, store.Replace([]interface{}{
testStorageElement("foo", "bar", 4),
}, ""))
items, err = store.ByIndex("by_val", "bar")
require.NoError(t, err)
assert.Equal(t, []interface{}{
testStorageElement("foo", "bar", 4),
}, items)
items, err = store.ByIndex("by_val", "baz")
require.NoError(t, err)
assert.Empty(t, items)
require.NoError(t, store.Delete(testStorageElement("foo", "", 0)))
items, err = store.ByIndex("by_val", "baz")
require.NoError(t, err)
assert.Empty(t, items)
require.NoError(t, store.Delete(testStorageElement("foo", "", 0)))
}
func assertStoreEmpty(t *testing.T, store storeIndexer, nonExistingKey string) {
item, ok, err := store.Get(testStorageElement(nonExistingKey, "", 0))
require.NoError(t, err)
assert.False(t, ok)
assert.Nil(t, item)
item, ok, err = store.GetByKey(nonExistingKey)
require.NoError(t, err)
assert.False(t, ok)
assert.Nil(t, item)
items := store.List()
assert.Empty(t, items)
}
func assertStoreSingleKey(t *testing.T, store storeIndexer, expectKey, expectValue string, expectRV int) {
item, ok, err := store.Get(testStorageElement(expectKey, "", expectRV))
require.NoError(t, err)
assert.True(t, ok)
assert.Equal(t, expectValue, item.(*storeElement).Object.(fakeObj).value)
item, ok, err = store.GetByKey(expectKey)
require.NoError(t, err)
assert.True(t, ok)
assert.Equal(t, expectValue, item.(*storeElement).Object.(fakeObj).value)
items := store.List()
assert.Equal(t, []interface{}{testStorageElement(expectKey, expectValue, expectRV)}, items)
}
func testStorageElement(key, value string, rv int) *storeElement {
return &storeElement{Key: key, Object: fakeObj{value: value, rv: rv}}
}
type fakeObj struct {
value string
rv int
}
func (f fakeObj) GetObjectKind() schema.ObjectKind { return nil }
func (f fakeObj) DeepCopyObject() runtime.Object { return nil }
var _ runtime.Object = (*fakeObj)(nil)
func testStoreIndexFunc(obj interface{}) ([]string, error) {
return []string{obj.(fakeObj).value}, nil
}
func testStoreIndexers() *cache.Indexers {
indexers := cache.Indexers{}
indexers["by_val"] = testStoreIndexFunc
return &indexers
}

View File

@ -83,55 +83,6 @@ type watchCacheEvent struct {
RecordTime time.Time
}
// Computing a key of an object is generally non-trivial (it performs
// e.g. validation underneath). Similarly computing object fields and
// labels. To avoid computing them multiple times (to serve the event
// in different List/Watch requests), in the underlying store we are
// keeping structs (key, object, labels, fields).
type storeElement struct {
Key string
Object runtime.Object
Labels labels.Set
Fields fields.Set
}
func storeElementKey(obj interface{}) (string, error) {
elem, ok := obj.(*storeElement)
if !ok {
return "", fmt.Errorf("not a storeElement: %v", obj)
}
return elem.Key, nil
}
func storeElementObject(obj interface{}) (runtime.Object, error) {
elem, ok := obj.(*storeElement)
if !ok {
return nil, fmt.Errorf("not a storeElement: %v", obj)
}
return elem.Object, nil
}
func storeElementIndexFunc(objIndexFunc cache.IndexFunc) cache.IndexFunc {
return func(obj interface{}) (strings []string, e error) {
seo, err := storeElementObject(obj)
if err != nil {
return nil, err
}
return objIndexFunc(seo)
}
}
func storeElementIndexers(indexers *cache.Indexers) cache.Indexers {
if indexers == nil {
return cache.Indexers{}
}
ret := cache.Indexers{}
for indexName, indexFunc := range *indexers {
ret[indexName] = storeElementIndexFunc(indexFunc)
}
return ret
}
// watchCache implements a Store interface.
// However, it depends on the elements implementing runtime.Object interface.
//
@ -173,7 +124,7 @@ type watchCache struct {
// history" i.e. from the moment just after the newest cached watched event.
// It is necessary to effectively allow clients to start watching at now.
// NOTE: We assume that <store> is thread-safe.
store cache.Indexer
store storeIndexer
// ResourceVersion up to which the watchCache is propagated.
resourceVersion uint64
@ -223,7 +174,7 @@ func newWatchCache(
upperBoundCapacity: defaultUpperBoundCapacity,
startIndex: 0,
endIndex: 0,
store: cache.NewIndexer(storeElementKey, storeElementIndexers(indexers)),
store: newStoreIndexer(indexers),
resourceVersion: 0,
listResourceVersion: 0,
eventHandler: eventHandler,
@ -506,19 +457,10 @@ func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion
if err != nil {
return nil, 0, "", err
}
var result []interface{}
for _, item := range items {
elem, ok := item.(*storeElement)
if !ok {
return nil, 0, "", fmt.Errorf("non *storeElement returned from storage: %v", item)
}
if !hasPathPrefix(elem.Key, key) {
continue
}
result = append(result, item)
result, err := filterPrefix(key, items)
if err != nil {
return nil, 0, "", err
}
sort.Sort(sortableStoreElements(result))
return result, rv, index, nil
}
@ -554,6 +496,21 @@ func (w *watchCache) waitUntilFreshAndListItems(ctx context.Context, resourceVer
return result, rv, index, err
}
func filterPrefix(prefix string, items []interface{}) ([]interface{}, error) {
var result []interface{}
for _, item := range items {
elem, ok := item.(*storeElement)
if !ok {
return nil, fmt.Errorf("non *storeElement returned from storage: %v", item)
}
if !hasPathPrefix(elem.Key, prefix) {
continue
}
result = append(result, item)
}
return result, nil
}
func (w *watchCache) notFresh(resourceVersion uint64) bool {
w.RLock()
defer w.RUnlock()

View File

@ -25,7 +25,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
)
// watchCacheInterval serves as an abstraction over a source
@ -133,7 +132,7 @@ func (s sortableWatchCacheEvents) Swap(i, j int) {
// returned by Next() need to be events from a List() done on the underlying store of
// the watch cache.
// The items returned in the interval will be sorted by Key.
func newCacheIntervalFromStore(resourceVersion uint64, store cache.Indexer, getAttrsFunc attrFunc, key string, matchesSingle bool) (*watchCacheInterval, error) {
func newCacheIntervalFromStore(resourceVersion uint64, store storeIndexer, getAttrsFunc attrFunc, key string, matchesSingle bool) (*watchCacheInterval, error) {
buffer := &watchCacheIntervalBuffer{}
var allItems []interface{}