diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/store.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/store.go new file mode 100644 index 00000000000..2167acffa3f --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/store.go @@ -0,0 +1,91 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cacher + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/cache" +) + +type storeIndexer interface { + Add(obj interface{}) error + Update(obj interface{}) error + Delete(obj interface{}) error + List() []interface{} + ListKeys() []string + Get(obj interface{}) (item interface{}, exists bool, err error) + GetByKey(key string) (item interface{}, exists bool, err error) + Replace([]interface{}, string) error + ByIndex(indexName, indexedValue string) ([]interface{}, error) +} + +func newStoreIndexer(indexers *cache.Indexers) storeIndexer { + return cache.NewIndexer(storeElementKey, storeElementIndexers(indexers)) +} + +// Computing a key of an object is generally non-trivial (it performs +// e.g. validation underneath). Similarly computing object fields and +// labels. To avoid computing them multiple times (to serve the event +// in different List/Watch requests), in the underlying store we are +// keeping structs (key, object, labels, fields). +type storeElement struct { + Key string + Object runtime.Object + Labels labels.Set + Fields fields.Set +} + +func storeElementKey(obj interface{}) (string, error) { + elem, ok := obj.(*storeElement) + if !ok { + return "", fmt.Errorf("not a storeElement: %v", obj) + } + return elem.Key, nil +} + +func storeElementObject(obj interface{}) (runtime.Object, error) { + elem, ok := obj.(*storeElement) + if !ok { + return nil, fmt.Errorf("not a storeElement: %v", obj) + } + return elem.Object, nil +} + +func storeElementIndexFunc(objIndexFunc cache.IndexFunc) cache.IndexFunc { + return func(obj interface{}) (strings []string, e error) { + seo, err := storeElementObject(obj) + if err != nil { + return nil, err + } + return objIndexFunc(seo) + } +} + +func storeElementIndexers(indexers *cache.Indexers) cache.Indexers { + if indexers == nil { + return cache.Indexers{} + } + ret := cache.Indexers{} + for indexName, indexFunc := range *indexers { + ret[indexName] = storeElementIndexFunc(indexFunc) + } + return ret +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/store_test.go new file mode 100644 index 00000000000..fbd6e819684 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/store_test.go @@ -0,0 +1,157 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cacher + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/tools/cache" +) + +func TestStoreSingleKey(t *testing.T) { + store := newStoreIndexer(nil) + assertStoreEmpty(t, store, "foo") + + require.NoError(t, store.Add(testStorageElement("foo", "bar", 1))) + assertStoreSingleKey(t, store, "foo", "bar", 1) + + require.NoError(t, store.Update(testStorageElement("foo", "baz", 2))) + assertStoreSingleKey(t, store, "foo", "baz", 2) + + require.NoError(t, store.Update(testStorageElement("foo", "baz", 3))) + assertStoreSingleKey(t, store, "foo", "baz", 3) + + require.NoError(t, store.Replace([]interface{}{testStorageElement("foo", "bar", 4)}, "")) + assertStoreSingleKey(t, store, "foo", "bar", 4) + + require.NoError(t, store.Delete(testStorageElement("foo", "", 0))) + assertStoreEmpty(t, store, "foo") + + require.NoError(t, store.Delete(testStorageElement("foo", "", 0))) +} + +func TestStoreIndexerSingleKey(t *testing.T) { + store := newStoreIndexer(testStoreIndexers()) + items, err := store.ByIndex("by_val", "bar") + require.NoError(t, err) + assert.Empty(t, items) + + require.NoError(t, store.Add(testStorageElement("foo", "bar", 1))) + items, err = store.ByIndex("by_val", "bar") + require.NoError(t, err) + assert.Equal(t, []interface{}{ + testStorageElement("foo", "bar", 1), + }, items) + + require.NoError(t, store.Update(testStorageElement("foo", "baz", 2))) + items, err = store.ByIndex("by_val", "bar") + require.NoError(t, err) + assert.Empty(t, items) + items, err = store.ByIndex("by_val", "baz") + require.NoError(t, err) + assert.Equal(t, []interface{}{ + testStorageElement("foo", "baz", 2), + }, items) + + require.NoError(t, store.Update(testStorageElement("foo", "baz", 3))) + items, err = store.ByIndex("by_val", "bar") + require.NoError(t, err) + assert.Empty(t, items) + items, err = store.ByIndex("by_val", "baz") + require.NoError(t, err) + assert.Equal(t, []interface{}{ + testStorageElement("foo", "baz", 3), + }, items) + + require.NoError(t, store.Replace([]interface{}{ + testStorageElement("foo", "bar", 4), + }, "")) + items, err = store.ByIndex("by_val", "bar") + require.NoError(t, err) + assert.Equal(t, []interface{}{ + testStorageElement("foo", "bar", 4), + }, items) + items, err = store.ByIndex("by_val", "baz") + require.NoError(t, err) + assert.Empty(t, items) + + require.NoError(t, store.Delete(testStorageElement("foo", "", 0))) + items, err = store.ByIndex("by_val", "baz") + require.NoError(t, err) + assert.Empty(t, items) + + require.NoError(t, store.Delete(testStorageElement("foo", "", 0))) +} + +func assertStoreEmpty(t *testing.T, store storeIndexer, nonExistingKey string) { + item, ok, err := store.Get(testStorageElement(nonExistingKey, "", 0)) + require.NoError(t, err) + assert.False(t, ok) + assert.Nil(t, item) + + item, ok, err = store.GetByKey(nonExistingKey) + require.NoError(t, err) + assert.False(t, ok) + assert.Nil(t, item) + + items := store.List() + assert.Empty(t, items) +} + +func assertStoreSingleKey(t *testing.T, store storeIndexer, expectKey, expectValue string, expectRV int) { + item, ok, err := store.Get(testStorageElement(expectKey, "", expectRV)) + require.NoError(t, err) + assert.True(t, ok) + assert.Equal(t, expectValue, item.(*storeElement).Object.(fakeObj).value) + + item, ok, err = store.GetByKey(expectKey) + require.NoError(t, err) + assert.True(t, ok) + assert.Equal(t, expectValue, item.(*storeElement).Object.(fakeObj).value) + + items := store.List() + assert.Equal(t, []interface{}{testStorageElement(expectKey, expectValue, expectRV)}, items) +} + +func testStorageElement(key, value string, rv int) *storeElement { + return &storeElement{Key: key, Object: fakeObj{value: value, rv: rv}} +} + +type fakeObj struct { + value string + rv int +} + +func (f fakeObj) GetObjectKind() schema.ObjectKind { return nil } +func (f fakeObj) DeepCopyObject() runtime.Object { return nil } + +var _ runtime.Object = (*fakeObj)(nil) + +func testStoreIndexFunc(obj interface{}) ([]string, error) { + return []string{obj.(fakeObj).value}, nil +} + +func testStoreIndexers() *cache.Indexers { + indexers := cache.Indexers{} + indexers["by_val"] = testStoreIndexFunc + return &indexers +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 66439e3334b..152c27daafd 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -83,55 +83,6 @@ type watchCacheEvent struct { RecordTime time.Time } -// Computing a key of an object is generally non-trivial (it performs -// e.g. validation underneath). Similarly computing object fields and -// labels. To avoid computing them multiple times (to serve the event -// in different List/Watch requests), in the underlying store we are -// keeping structs (key, object, labels, fields). -type storeElement struct { - Key string - Object runtime.Object - Labels labels.Set - Fields fields.Set -} - -func storeElementKey(obj interface{}) (string, error) { - elem, ok := obj.(*storeElement) - if !ok { - return "", fmt.Errorf("not a storeElement: %v", obj) - } - return elem.Key, nil -} - -func storeElementObject(obj interface{}) (runtime.Object, error) { - elem, ok := obj.(*storeElement) - if !ok { - return nil, fmt.Errorf("not a storeElement: %v", obj) - } - return elem.Object, nil -} - -func storeElementIndexFunc(objIndexFunc cache.IndexFunc) cache.IndexFunc { - return func(obj interface{}) (strings []string, e error) { - seo, err := storeElementObject(obj) - if err != nil { - return nil, err - } - return objIndexFunc(seo) - } -} - -func storeElementIndexers(indexers *cache.Indexers) cache.Indexers { - if indexers == nil { - return cache.Indexers{} - } - ret := cache.Indexers{} - for indexName, indexFunc := range *indexers { - ret[indexName] = storeElementIndexFunc(indexFunc) - } - return ret -} - // watchCache implements a Store interface. // However, it depends on the elements implementing runtime.Object interface. // @@ -173,7 +124,7 @@ type watchCache struct { // history" i.e. from the moment just after the newest cached watched event. // It is necessary to effectively allow clients to start watching at now. // NOTE: We assume that is thread-safe. - store cache.Indexer + store storeIndexer // ResourceVersion up to which the watchCache is propagated. resourceVersion uint64 @@ -223,7 +174,7 @@ func newWatchCache( upperBoundCapacity: defaultUpperBoundCapacity, startIndex: 0, endIndex: 0, - store: cache.NewIndexer(storeElementKey, storeElementIndexers(indexers)), + store: newStoreIndexer(indexers), resourceVersion: 0, listResourceVersion: 0, eventHandler: eventHandler, diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go index fa7d3894686..babd74e0c82 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go @@ -25,7 +25,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/tools/cache" ) // watchCacheInterval serves as an abstraction over a source @@ -133,7 +132,7 @@ func (s sortableWatchCacheEvents) Swap(i, j int) { // returned by Next() need to be events from a List() done on the underlying store of // the watch cache. // The items returned in the interval will be sorted by Key. -func newCacheIntervalFromStore(resourceVersion uint64, store cache.Indexer, getAttrsFunc attrFunc, key string, matchesSingle bool) (*watchCacheInterval, error) { +func newCacheIntervalFromStore(resourceVersion uint64, store storeIndexer, getAttrsFunc attrFunc, key string, matchesSingle bool) (*watchCacheInterval, error) { buffer := &watchCacheIntervalBuffer{} var allItems []interface{}