Extract watch cache store to separate file and cover with tests

This commit is contained in:
Marek Siarkowicz 2024-08-28 13:31:02 +02:00
parent 7400d57943
commit c93d2e8fb1
4 changed files with 251 additions and 53 deletions

View File

@ -0,0 +1,91 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"fmt"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
)
type storeIndexer interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
Replace([]interface{}, string) error
ByIndex(indexName, indexedValue string) ([]interface{}, error)
}
func newStoreIndexer(indexers *cache.Indexers) storeIndexer {
return cache.NewIndexer(storeElementKey, storeElementIndexers(indexers))
}
// Computing a key of an object is generally non-trivial (it performs
// e.g. validation underneath). Similarly computing object fields and
// labels. To avoid computing them multiple times (to serve the event
// in different List/Watch requests), in the underlying store we are
// keeping structs (key, object, labels, fields).
type storeElement struct {
Key string
Object runtime.Object
Labels labels.Set
Fields fields.Set
}
func storeElementKey(obj interface{}) (string, error) {
elem, ok := obj.(*storeElement)
if !ok {
return "", fmt.Errorf("not a storeElement: %v", obj)
}
return elem.Key, nil
}
func storeElementObject(obj interface{}) (runtime.Object, error) {
elem, ok := obj.(*storeElement)
if !ok {
return nil, fmt.Errorf("not a storeElement: %v", obj)
}
return elem.Object, nil
}
func storeElementIndexFunc(objIndexFunc cache.IndexFunc) cache.IndexFunc {
return func(obj interface{}) (strings []string, e error) {
seo, err := storeElementObject(obj)
if err != nil {
return nil, err
}
return objIndexFunc(seo)
}
}
func storeElementIndexers(indexers *cache.Indexers) cache.Indexers {
if indexers == nil {
return cache.Indexers{}
}
ret := cache.Indexers{}
for indexName, indexFunc := range *indexers {
ret[indexName] = storeElementIndexFunc(indexFunc)
}
return ret
}

View File

@ -0,0 +1,157 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"
)
func TestStoreSingleKey(t *testing.T) {
store := newStoreIndexer(nil)
assertStoreEmpty(t, store, "foo")
require.NoError(t, store.Add(testStorageElement("foo", "bar", 1)))
assertStoreSingleKey(t, store, "foo", "bar", 1)
require.NoError(t, store.Update(testStorageElement("foo", "baz", 2)))
assertStoreSingleKey(t, store, "foo", "baz", 2)
require.NoError(t, store.Update(testStorageElement("foo", "baz", 3)))
assertStoreSingleKey(t, store, "foo", "baz", 3)
require.NoError(t, store.Replace([]interface{}{testStorageElement("foo", "bar", 4)}, ""))
assertStoreSingleKey(t, store, "foo", "bar", 4)
require.NoError(t, store.Delete(testStorageElement("foo", "", 0)))
assertStoreEmpty(t, store, "foo")
require.NoError(t, store.Delete(testStorageElement("foo", "", 0)))
}
func TestStoreIndexerSingleKey(t *testing.T) {
store := newStoreIndexer(testStoreIndexers())
items, err := store.ByIndex("by_val", "bar")
require.NoError(t, err)
assert.Empty(t, items)
require.NoError(t, store.Add(testStorageElement("foo", "bar", 1)))
items, err = store.ByIndex("by_val", "bar")
require.NoError(t, err)
assert.Equal(t, []interface{}{
testStorageElement("foo", "bar", 1),
}, items)
require.NoError(t, store.Update(testStorageElement("foo", "baz", 2)))
items, err = store.ByIndex("by_val", "bar")
require.NoError(t, err)
assert.Empty(t, items)
items, err = store.ByIndex("by_val", "baz")
require.NoError(t, err)
assert.Equal(t, []interface{}{
testStorageElement("foo", "baz", 2),
}, items)
require.NoError(t, store.Update(testStorageElement("foo", "baz", 3)))
items, err = store.ByIndex("by_val", "bar")
require.NoError(t, err)
assert.Empty(t, items)
items, err = store.ByIndex("by_val", "baz")
require.NoError(t, err)
assert.Equal(t, []interface{}{
testStorageElement("foo", "baz", 3),
}, items)
require.NoError(t, store.Replace([]interface{}{
testStorageElement("foo", "bar", 4),
}, ""))
items, err = store.ByIndex("by_val", "bar")
require.NoError(t, err)
assert.Equal(t, []interface{}{
testStorageElement("foo", "bar", 4),
}, items)
items, err = store.ByIndex("by_val", "baz")
require.NoError(t, err)
assert.Empty(t, items)
require.NoError(t, store.Delete(testStorageElement("foo", "", 0)))
items, err = store.ByIndex("by_val", "baz")
require.NoError(t, err)
assert.Empty(t, items)
require.NoError(t, store.Delete(testStorageElement("foo", "", 0)))
}
func assertStoreEmpty(t *testing.T, store storeIndexer, nonExistingKey string) {
item, ok, err := store.Get(testStorageElement(nonExistingKey, "", 0))
require.NoError(t, err)
assert.False(t, ok)
assert.Nil(t, item)
item, ok, err = store.GetByKey(nonExistingKey)
require.NoError(t, err)
assert.False(t, ok)
assert.Nil(t, item)
items := store.List()
assert.Empty(t, items)
}
func assertStoreSingleKey(t *testing.T, store storeIndexer, expectKey, expectValue string, expectRV int) {
item, ok, err := store.Get(testStorageElement(expectKey, "", expectRV))
require.NoError(t, err)
assert.True(t, ok)
assert.Equal(t, expectValue, item.(*storeElement).Object.(fakeObj).value)
item, ok, err = store.GetByKey(expectKey)
require.NoError(t, err)
assert.True(t, ok)
assert.Equal(t, expectValue, item.(*storeElement).Object.(fakeObj).value)
items := store.List()
assert.Equal(t, []interface{}{testStorageElement(expectKey, expectValue, expectRV)}, items)
}
func testStorageElement(key, value string, rv int) *storeElement {
return &storeElement{Key: key, Object: fakeObj{value: value, rv: rv}}
}
type fakeObj struct {
value string
rv int
}
func (f fakeObj) GetObjectKind() schema.ObjectKind { return nil }
func (f fakeObj) DeepCopyObject() runtime.Object { return nil }
var _ runtime.Object = (*fakeObj)(nil)
func testStoreIndexFunc(obj interface{}) ([]string, error) {
return []string{obj.(fakeObj).value}, nil
}
func testStoreIndexers() *cache.Indexers {
indexers := cache.Indexers{}
indexers["by_val"] = testStoreIndexFunc
return &indexers
}

View File

@ -83,55 +83,6 @@ type watchCacheEvent struct {
RecordTime time.Time
}
// Computing a key of an object is generally non-trivial (it performs
// e.g. validation underneath). Similarly computing object fields and
// labels. To avoid computing them multiple times (to serve the event
// in different List/Watch requests), in the underlying store we are
// keeping structs (key, object, labels, fields).
type storeElement struct {
Key string
Object runtime.Object
Labels labels.Set
Fields fields.Set
}
func storeElementKey(obj interface{}) (string, error) {
elem, ok := obj.(*storeElement)
if !ok {
return "", fmt.Errorf("not a storeElement: %v", obj)
}
return elem.Key, nil
}
func storeElementObject(obj interface{}) (runtime.Object, error) {
elem, ok := obj.(*storeElement)
if !ok {
return nil, fmt.Errorf("not a storeElement: %v", obj)
}
return elem.Object, nil
}
func storeElementIndexFunc(objIndexFunc cache.IndexFunc) cache.IndexFunc {
return func(obj interface{}) (strings []string, e error) {
seo, err := storeElementObject(obj)
if err != nil {
return nil, err
}
return objIndexFunc(seo)
}
}
func storeElementIndexers(indexers *cache.Indexers) cache.Indexers {
if indexers == nil {
return cache.Indexers{}
}
ret := cache.Indexers{}
for indexName, indexFunc := range *indexers {
ret[indexName] = storeElementIndexFunc(indexFunc)
}
return ret
}
// watchCache implements a Store interface.
// However, it depends on the elements implementing runtime.Object interface.
//
@ -173,7 +124,7 @@ type watchCache struct {
// history" i.e. from the moment just after the newest cached watched event.
// It is necessary to effectively allow clients to start watching at now.
// NOTE: We assume that <store> is thread-safe.
store cache.Indexer
store storeIndexer
// ResourceVersion up to which the watchCache is propagated.
resourceVersion uint64
@ -223,7 +174,7 @@ func newWatchCache(
upperBoundCapacity: defaultUpperBoundCapacity,
startIndex: 0,
endIndex: 0,
store: cache.NewIndexer(storeElementKey, storeElementIndexers(indexers)),
store: newStoreIndexer(indexers),
resourceVersion: 0,
listResourceVersion: 0,
eventHandler: eventHandler,

View File

@ -25,7 +25,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
)
// watchCacheInterval serves as an abstraction over a source
@ -133,7 +132,7 @@ func (s sortableWatchCacheEvents) Swap(i, j int) {
// returned by Next() need to be events from a List() done on the underlying store of
// the watch cache.
// The items returned in the interval will be sorted by Key.
func newCacheIntervalFromStore(resourceVersion uint64, store cache.Indexer, getAttrsFunc attrFunc, key string, matchesSingle bool) (*watchCacheInterval, error) {
func newCacheIntervalFromStore(resourceVersion uint64, store storeIndexer, getAttrsFunc attrFunc, key string, matchesSingle bool) (*watchCacheInterval, error) {
buffer := &watchCacheIntervalBuffer{}
var allItems []interface{}